WAPT Setuphelpers Usage¶
- setuphelpers.all_files(rootdir: str, pattern: Optional[str] = None) List[str] [source]¶
Recursively return all files from rootdir and sub directories matching the (dos style) pattern (example: *.exe)
- setuphelpers.copytree2(src: str, dst: str, ignore: ~typing.Optional[~typing.Callable[[str, str], ~typing.List[str]]] = None, onreplace: ~typing.Callable[[str, str], bool] = <function default_skip>, oncopy: ~typing.Callable[[str, str, str], bool] = <function default_oncopy>, enable_replace_at_reboot: bool = True, follow_symlinks: bool = False)[source]¶
Copy src directory to dst directory. dst is created if it doesn’t exists src can be relative to installation temporary dir
oncopy is called for each file copy. if False is returned, copy is skipped onreplace is called when a file will be overwritten.
- Args:
src (str): path to source directory (absolute path or relative to package extraction tempdir) dst (str): path to target directory (created if not present) ignore (func) : callback func(root_dir,filenames) which returns names to ignore onreplace (func) : callback func(src,dst):boolean called when a file will be replaced to decide what to do.
default is to not replace if target exists. can be default_overwrite or default_overwrite_older or custom function.
- oncopy (func)callback func(msg,src,dst) called when a file is copied.
default is to log in debug level the operation
enable_replace_at_reboot (boolean): if True, files which are locked will be scheduled for replace at next reboot
Returns:
Raises:
>>> copytree2(r'c:\tranquilit\wapt\tests',r'c:\tranquilit\wapt\tests2') >>> isdir(r'c:\tranquilit\wapt\tests2') True >>> remove_tree(r'c:\tranquilit\wapt\tests2') >>> isdir(r'c:\tranquilit\wapt\tests2') False
- setuphelpers.currentdatetime()[source]¶
date/time as YYYYMMDD-hhmmss
>>> currentdatetime() '20161102-193600'
- setuphelpers.datetime2isodate(adatetime: Optional[datetime] = None) str [source]¶
Return an iso8601 representation of the date/time If adatetime is None, return current datetime in UTC
- setuphelpers.ensure_dir(filename: str)[source]¶
Be sure the directory of filename exists on disk. Create it if not
The intermediate directories are created either.
- Args:
filename (str): path to a future file for which to create directory.
- Returns:
None
- setuphelpers.ensure_list(csv_or_list, ignore_empty_args=True, allow_none=False)[source]¶
if argument is not a list, return a list from a csv string
- Args:
csv_or_list (list or str): ignore_empty_args (bool): if True, empty string found in csv are not appended to the list. allow_none (bool): if True, if csv_or_list is None, return None, else return empty list/
- Returns:
list
- setuphelpers.ensure_unicode(data)[source]¶
Return a unicode string from data object It is sometimes difficult to know in advance what we will get from command line application output.
This is to ensure we get a (not always accurate) representation of the data mainly for logging purpose.
- Args:
data: either str or unicode or object having a __str__ or WindowsError or Exception
- Returns:
unicode: unicode string representing the data
>>> ensure_unicode(str('éé')) u'éé' >>> ensure_unicode(u'éé') u'éé' >>> ensure_unicode(Exception("test")) u'Exception: test' >>> ensure_unicode(Exception()) u'Exception: '
- setuphelpers.file_is_locked(path: str, timeout: int = 5) bool [source]¶
Check if a file is locked. waits timout seconds for the release
- setuphelpers.filecopyto(filename: str, target: str)[source]¶
Copy file from absolute or package temporary directory to target directory
If file is dll or exe, logs the original and new version.
- Args:
- filename (str): absolute path to file to copy,
or relative path to temporary package install content directory.
target (str) : absolute path to target directory where to copy file.
target is either a full filename or a directory name if filename is .exe or .dll, logger prints version numbers
>>> if not os.path.isfile('c:/tmp/fc.test'): ... with open('c:/tmp/fc.test','wb') as f: ... f.write('test') >>> if not os.path.isdir('c:/tmp/target'): ... os.mkdir('c:/tmp/target') >>> if os.path.isfile('c:/tmp/target/fc.test'): ... os.unlink('c:/tmp/target/fc.test') >>> filecopyto('c:/tmp/fc.test','c:/tmp/target') >>> os.path.isfile('c:/tmp/target/fc.test') True
- setuphelpers.fileisodate(filename: str) str [source]¶
Returns last update date time from filename in local time
- setuphelpers.find_all_files(rootdir: str, include_patterns: Optional[Union[str, List[str]]] = None, exclude_patterns: Optional[Union[str, List[str]]] = None, include_dirs: Optional[Union[str, List[str]]] = None, exclude_dirs: Optional[Union[str, List[str]]] = None, excludes_full: Optional[List[str]] = None)[source]¶
Generator which recursively find all files from rootdir and sub directories matching the (dos style) patterns (example: *.exe)
- Args;
rootdir (str): root dir where to start looking for files include_patterns (str or list) : list of glob pattern of files to return exclude_patterns (str or list) : list of glob pattern of files to exclude
(if a file is both in include and exclude, it is excluded)
include_dirs (str or list) : list of glob directory patterns to return exclude_dirs (str or list) : list of glob directory patterns to exclude
(if a dir is both in include and exclude, it is excluded)
excludes_full (list) : list of exact (relative to package root) filepathes to exclude from manifest.
>>> for fn in find_all_files('c:\tmp','*.txt'): print(fn) >>>
- setuphelpers.find_processes(process_name)[source]¶
Return list of Process names process_name
- Args:
process_name (str): process name to lookup
- Returns:
list: list of processes (Process) named process_name or process_name.exe
>>> [p.pid for p in find_processes('explorer')] [2756, 4024]
- setuphelpers.get_battery_infos(idx: int) dict ¶
Informations on battery. Args:
idx (int): battery index
- Returns:
dict
- setuphelpers.get_computer_groups()[source]¶
Try to find the computer in the Active Directory and return the list of groups
- setuphelpers.get_current_user()[source]¶
Get the login name for the current user. >>> get_current_user() u’htouvet’
- setuphelpers.get_disk_free_space(filepath: str) int [source]¶
Returns the number of free bytes on the drive that filepath is on
- setuphelpers.get_domain_from_socket()[source]¶
Return main DNS domain of the computer
- Returns:
str: domain name
>>> get_domain_from_socket() u'tranquilit.local'
- setuphelpers.get_domain_info(force_tgt=True, hostname=None, domain=None)[source]¶
Return dict ad_site , ou and groups
- setuphelpers.get_file_properties(fname, ignore_warning=True)[source]¶
Read all properties of the given file return them as a dictionary.
- Args:
fname : path to Windows executable or DLL
- Returns:
dict: properties of executable
>>> xp = get_file_properties(r'c:\windows\explorer.exe') >>> 'FileVersion' in xp and 'FileDescription' in xp True
- setuphelpers.get_language(full_locale=False, separator='_')[source]¶
Get the os default locale (example: fr, en, pl, etc.)
>>> get_language() 'fr' >>> get_language(full_locale=True) 'fr_FR' >>> get_language(full_locale=True, separator='-').lower() 'fr-fr'
- setuphelpers.get_os_name()[source]¶
Get the name of the current running operating system
- Returns:
str: Windows, Linux, Darwin
>>> get_os_name() 'Windows'
- setuphelpers.get_proxies()[source]¶
Return system proxy with the urllib python library
>>> get_proxies() {'http': 'http://srvproxy.ad.domain.lan:8080', 'https': 'http://srvproxy.ad.domain.lan:8080'}
- setuphelpers.host_info_common_unix()[source]¶
Read main workstation informations, returned as a dict
- Returns:
dict: main properties of host, networking and windows system
Changed in version 1.4.1: returned keys changed : dns_domain -> dnsdomain
>>> hi = host_info() >>> 'computer_fqdn' in hi and 'connected_ips' in hi and 'computer_name' in hi and 'mac' in hi True
- setuphelpers.httpdatetime2isodate(httpdate: str, localtime: bool = False) str [source]¶
Convert a date string as returned in http headers or mail headers to isodate (UTC)
>>> import requests >>> last_modified = requests.head('http://wapt/wapt/Packages',headers={'cache-control':'no-cache','pragma':'no-cache'}).headers['last-modified'] >>> len(httpdatetime2isodate(last_modified)) == 19 True
- setuphelpers.inifile_deleteoption(inifilename: str, section: str, key: str)[source]¶
Remove a key within the section of the inifile
- Args:
inifilename (str): Path to the ini file section (str): section key (str): value key of option to remove
- Returns:
boolean : True if the key/option has been removed
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version') True >>> print inifile_deleteoption('c:/tranquilit/wapt/tests/test.ini','global','version') False
- setuphelpers.inifile_deletesection(inifilename: str, section: str)[source]¶
Remove a section within the inifile
- Args:
inifilename (str): Path to the ini file section (str): section to remove
- Returns:
boolean : True if the section has been removed
- setuphelpers.inifile_hasoption(inifilename: str, section: str, key: str) bool [source]¶
Check if an option is present in section of the inifile
- Args:
inifilename (str): Path to the ini file section (str): section key (str): value key to check
- Returns:
boolean : True if the key exists
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version') True >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','dontexist') False
- setuphelpers.inifile_hassection(inifilename: str, section: str) bool [source]¶
Check if an option is present in section of the inifile
- Args:
inifilename (str): Path to the ini file section (str): section
- Returns:
boolean : True if the key exists
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hassection('c:/tranquilit/wapt/tests/test.ini','global') True
- setuphelpers.inifile_readstring(inifilename: str, section: str, key: str, default: Optional[str] = None)[source]¶
Read a string parameter from inifile
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version') 1.1.2 >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','undefaut','defvalue') defvalue
- setuphelpers.inifile_writestring(inifilename: str, section: str, key: str, value: str)[source]¶
Write a string parameter to inifile
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.1') >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version') 1.1.1
- setuphelpers.install_apt(package, allow_unauthenticated=False)[source]¶
Install package from APT repositories
- setuphelpers.installed_softwares(keywords=None, name=None, ignore_empty_names=True)[source]¶
Return list of installed software from apt or rpm
- Args:
keywords (str or list): string to lookup in key, display_name or publisher fields
- Returns:
- list of dicts: [{‘key’, ‘name’, ‘version’, ‘install_date’, ‘install_location’
‘uninstall_string’, ‘publisher’,’system_component’}]
- setuphelpers.isLinux64()¶
- setuphelpers.is_debian_based()¶
- setuphelpers.is_redhat_based()¶
- setuphelpers.is_rhel_based()¶
- setuphelpers.isrunning(processname)[source]¶
Check if a process is running,
>>> isrunning('explorer') True
- setuphelpers.json_load_file(json_file: str, encoding: str = 'utf-8') Union[list, dict] [source]¶
Load content from a JSON file.
- Args:
json_file: Path to the JSON file. encoding: File encoding.
- Returns:
Loaded JSON content as a dictionary or a list.
- setuphelpers.json_write_file(json_file: str, data: Union[list, dict], indent: int = 4, sort_keys: bool = False, encoding: str = 'utf-8', newline: str = '\n')[source]¶
Write dictionary or list to a JSON file.
- Args:
json_file: Path to the JSON file. data: Dictionary or list content to be written. indent: Tabulation size for indentation. sort_keys: Sort the keys alphabetically or not. encoding: File encoding. newline: Newline character(s) to use, default is Line Feed (LF).
- setuphelpers.killalltasks(process_names, include_children=True)[source]¶
Kill the task by their process_names
>>> killalltasks('firefox')
- setuphelpers.makepath(*p) str [source]¶
Create a path given the components passed, but with saner defaults than os.path.join - In particular, removes ending path separators (backslashes) from components. Path functions will be called automatically
>>> makepath("c:", "Windows", "system32") 'c:\\Windows\\system32' >>> makepath(system32()) 'C:\\WINDOWS\\system32' >>> system32() 'C:\\WINDOWS\\system32' >>> system32 <function system32 at 0x063EBE79> >>> makepath(system32) 'C:\\WINDOWS\\system32'
- setuphelpers.mkdirs(path: str)[source]¶
Create directory path if it doesn’t exists yet Creates intermediate directories too.
>>> mkdirs("C:\Program Files (x86)\wapt") u'C:\Program Files (x86)\wapt'
- setuphelpers.processes_for_file(filepath, open_files=True, dll=True)[source]¶
Generator returning processes currently having a open file descriptor for filepath
If not running as System account, can not access system processes.
- Args:
filepath (str): file path or pattern (glob *)
- Returns:
iterator psutil.Process
- setuphelpers.remove_file(path: str)[source]¶
Try to remove a single file or symlink log a warning msg if file doesn’t exist log a critical msg if file can’t be removed
- Args:
path (str): path to file
>>> remove_file(r'c:\tmp\fc.txt')
- setuphelpers.remove_tree(*args, **kwargs)[source]¶
Convenience function to delete a directory tree, with any error not ignored by default. Pass ignore_errors=False to access possible errors.
- Args:
path (str): path to directory to remove ignore_errors (boolean) : default to False. Set it to True to ignore exceptions on children deletion onerror (func) : hook called with (func, path, exc)
on each delete exception. Should raise if stop is required.
- setuphelpers.rsa_decrypt()¶
data, rsakeypem, password:bytes -> bytes return decrypted data
- setuphelpers.rsa_encrypt()¶
data, x509certPEM: bytes -> bytes. returns encrypted data with public key in X509 certificate
- setuphelpers.run(cmd, shell=True, timeout=600, accept_returncodes=[0, 3010], on_write=None, pidlist=None, return_stderr=True, **kwargs)[source]¶
Run the command cmd in a shell and return the output and error text as string
- Args:
cmd : command and arguments, either as a string or as a list of arguments shell (boolean) : True is assumed timeout (int) : maximum time to wait for cmd completion is second (default = 600)
a TimeoutExpired exception is raised if tiemout is reached.
- on_writecallback when a new line is printed on stdout or stderr by the subprocess
func(unicode_line). arg is enforced to unicode
accept_returncodes (list) : list of return code which are considered OK default = (0, 3010) pidlist (list): external list where to append the pid of the launched process. return_stderr (bool or list) : if True, the error lines are returned to caller in result.
if a list is provided, the error lines are appended to this list
all other parameters from the psutil.Popen constructor are accepted
- Returns:
- RunOutputbytes like output of stdout and optionnaly stderr streams.
returncode attribute
- Raises:
CalledProcessError: if return code of cmd is not in accept_returncodes list TimeoutExpired: if process is running for more than timeout time.
Changed in version 1.3.9: return_stderr parameters to disable stderr or get it in a separate list return value has a returncode attribute to
Changed in version 1.4.0: output is not forced to unicode
Changed in version 1.4.1: error code 1603 is no longer accepted by default.
Changed in version 1.5.1: If cmd is unicode, encode it to default filesystem encoding before running it.
>>> run(r'dir /B c:\windows\explorer.exe') 'explorer.exe\r\n'
>>> out = [] >>> pids = [] >>> def getlines(line): ... out.append(line) >>> run(r'dir /B c:\windows\explorer.exe',pidlist=pids,on_write=getlines) u'explorer.exe\r\n'
>>> print out ['explorer.exe\r\n'] >>> try: ... run(r'ping /t 127.0.0.1',timeout=3) ... except TimeoutExpired: ... print('timeout') timeout
- setuphelpers.run_notfatal(*cmd, **args)[source]¶
Runs the command and wait for it termination, returns output Ignore exit status code of command, return ‘’ instead
Changed in version 1.4.0: output is now enforced to unicode
- setuphelpers.shell_launch(cmd)[source]¶
Launch a command (without arguments) but doesn’t wait for its termination
>>> with open('c:/tmp/test.txt','w') as f: ... f.write('Test line') >>> shell_launch('c:/tmp/test.txt')
- setuphelpers.unzip(zipfn: str, target: Optional[str] = None, filenames: Optional[List[str]] = None, extract_with_full_paths: bool = True)[source]¶
Unzip the files from zipfile with patterns in filenames to target directory
- Args:
zipfn (str) : path to zipfile. (can be relative to temporary unzip location of package) target (str) : target location. Defaults to dirname(zipfile) + basename(zipfile) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is
- Returns:
list : list of extracted files
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt') ['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames=['*.msi','*.py']) ['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target='test', filenames=['*.msi','*.py']) ['C:\\example\\test\\7z920-x64.msi', 'C:\\example\\test\\7z920.msi', 'C:\\example\\test\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/*') ['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control') ['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target='.', filenames='WAPT/control') ['.\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target=r'C:\example\', filenames='WAPT/control') ['C:\\example\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target=basedir, filenames='WAPT/control') ['C:\\example\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control', extract_with_full_paths=False) ['C:\\example\\control']
New in version 1.3.11.
Changed in version 2.2: added extract_with_full_paths parameter
- setuphelpers.user_appdata()¶
Return the local appdata profile of current user
- Returns:
str: path like u’/home/user/.config/’
- setuphelpers.user_local_appdata()[source]¶
Return the local appdata profile of current user
- Returns:
str: path like u’/home/user/.config/’
- setuphelpers.wget(url, target=None, printhook=None, proxies=None, connect_timeout=10, download_timeout=None, verify_cert=None, referer=None, user_agent=None, cert=None, resume=False, md5=None, sha1=None, sha256=None, cache_dir=None, requests_session=None, limit_bandwidth=None)[source]¶
Copy the contents of a file from a given URL to a local file.
- Args:
url (str): URL to document target (str) : full file path of downloaded file. If None, put in a temporary dir with supplied url filename (final part of url) proxies (dict) : proxies to use. eg {‘http’:’http://wpad:3128’,’https’:’http://wpad:3128’} timeout (int) : seconds to wait for answer before giving up auth (list) : (user,password) to authenticate with basic auth verify_cert (bool or str) : either False, True (verify with embedded CA list), or path to a directory or PEM encoded CA bundle file
to check https certificate signature against.
cert (list) : tuple/list of (x509certfilename,pemkeyfilename,key password) for authenticating the client. If key is not encrypted, password must be None referer (str): user_agent: resume (bool): md5 (str) : sha1 (str) : sha256 (str) : cache_dir (str) : if file exists here, and md5 matches, copy from here instead of downloading. If not, put a copy of the file here after downloading.
requests_session (request.Session) : predefined request session to use instead of building one from scratch from proxies, cert, verfify_cert
- Returns:
str : path to downloaded file
>>> respath = wget('http://wapt.tranquil.it/wapt/tis-firefox_28.0.0-1_all.wapt','c:\\tmp\\test.wapt',proxies={'http':'http://proxy:3128'}) ??? >>> os.stat(respath).st_size>10000 True >>> respath = wget('http://localhost:8088/runstatus','c:\\tmp\\test.json') ???
- setuphelpers.wgets(url, proxies: Optional[dict] = None, verify_cert=None, referer=None, user_agent=None, timeout=None, cert=None, requests_session=None, as_json=False) str [source]¶
Return the content of a remote resource as a string / bytes or dict with a http get request.
Raise an exception if remote data can’t be retrieved.
- Args:
url (str): http(s) url proxies (dict): proxy configuration as requests requires it {‘http’: url, ‘https’:url} verify_cert (bool or str) : verfiy server certificate, path to trusted CA bundle cert (tuple of 3 str) : (cert_path, key_path, key password) client side authentication.
requests_session (request.Session) : predefined request session to use instead of building one from scratch
- Returns:
str or bytes or dict : content of remote resource. str or bytes or json depending of the encoding and the Content-Type.
>>> data = wgets('https://wapt/ping') >>> "msg" in data True
Version Class¶
- class setuphelpers.Version(version, members_count: Optional[int] = None)[source]¶
Version object of form 0.0.0 can compare with respect to natural numbering and not alphabetical
- Args:
version (str) : version string member_count (int) : number of version memebers to take in account.
If actual members in version is less, add missing memeber with 0 value If actual members count is higher, removes last ones.
>>> Version('0.10.2') > Version('0.2.5') True >>> Version('0.1.2') < Version('0.2.5') True >>> Version('0.1.2') == Version('0.1.2') True >>> Version('7') < Version('7.1') True
Changed in version 1.6.2.5: truncate version members list to members_count if provided.
Setupdevhelpers¶
- setupdevhelpers.ask_directory(title='Select folder', initial_dir='', raise_error=False)[source]¶
Asks the user to select a folder using a directory dialog and returns the path of that folder.
- Args:
title (str): Specifies the title of the directory dialog window. Default value is “Select folder”. initial_dir (str): Specifies the initial path of the directory dialog. Default is last provided. raise_error (bool): Whether to raise an error if no folder is selected.
- Returns:
The path of the selected folder.
- setupdevhelpers.ask_filename(title='Select file', initial_dir='', filename='*.*', filter='All files (*.*)|*.*', raise_error=False)[source]¶
This function asks the user to select a file using a filename dialog and returns the path of that file.
- Args:
title (str): The title for the dialog. initialdir (str): The initial directory path. Default is last provided. filename (str): The default file name. Default is “.”. filter (str): The filter by description and extension. Default is “All files (.)|*.*”. raise_error (bool): Whether to raise an error if the user cancels the dialog. Default is True.
- Returns:
str: The path of the selected file.
- Raises:
ValueError: If the user cancels the dialog and raise_error is True.
- setupdevhelpers.ask_grid(title, dict_input, result_type=0, metadata='', search_label='Search', stay_on_top=False, raise_error=False)[source]¶
Opens a dialog box with an action input.
- Args:
title (str): The name of the title to display in the dialog box. dict_input (dict): A dictionary of input values to display in the dialog box. result_type (str or int): The type of result to display in the dialog box. Currently supported options are:
“GRT_ALL” or (0) to return the whole grid
“GRT_SELECTED” or (1) to return the selected rows
metadata (str of a json): Additional metadata to display in the dialog box. search_label (str): The label for the search button in the dialog box. stay_on_top (bool): Whether to keep the dialog box on top of other windows. raise_error (bool): Whether to raise an error if no response is given by the user.
- Returns:
The result of the dialog box.
- setupdevhelpers.ask_input(title, text, default='', stay_on_top=False, raise_error=False)[source]¶
Opens a dialog box with an action input.
- Args:
title (str): The title for the dialog. text (str): Indicates which value the user should type. default (str): The default value if the user does not want to type anything. stay_on_top (bool): Indicates if the dialog box will always stay on top. Default is False. raise_error (bool): Whether to raise an error if no response is given by the user.
- Returns:
The response from the dialog box.
- setupdevhelpers.ask_message(title, text, flags=None, raise_error=False)[source]¶
Opens a message box with customizable buttons and icons.
- Args:
title (str): The title to display in the message box. text (str): The message text to display in the message box. flags (int): Options for buttons and icons. You can combine a message and an icon by adding their corresponding
integer values together. Possible message options: - MB_OK (0): OK button - MB_OKCANCEL (1): OK and Cancel buttons - MB_ABORTRETRYIGNORE (2): Abort, Retry, and Ignore buttons - MB_YESNOCANCEL (3): Yes, No, and Cancel buttons - MB_YESNO (4): Yes and No buttons
Possible icon options: - MB_ICONERROR (16): Error icon - MB_ICONQUESTION (32): Question icon - MB_ICONWARNING (48): Warning icon - MB_ICONINFORMATION (64): Information icon
To combine a message and an icon, add their integer values together. For example, to display an OK button with an information icon, use: 0 + 64 (MB_OK + MB_ICONINFORMATION).
Note: For more details and additional options, refer to the documentation at: https://www.functionx.com/delphi/msgboxes/messagebox.htm
raise_error (bool): Whether to raise an error if no response is given by the user.
- Returns:
The response code indicating the user’s choice. Possible return values: - ID_OK (1) - ID_CANCEL (2) - ID_ABORT (3) - ID_RETRY (4) - ID_IGNORE (5) - ID_YES (6) - ID_NO (7)
- setupdevhelpers.bs_find(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs)[source]¶
Parse an HTML or XML web page with BeautifulSoup and retrieve the first matching result.
- Args:
url (str): URL of the web page or string to parse. element (str): Searched element. attribute (str): Selected attribute of the element. value (str): Value of the selected attribute. user_agent (str): Specify a user-agent if needed. proxies (dict): Specify proxies if needed. features (str): BeautifulSoup feature to use. **kwargs: Additional parameters for the requests library.
- Returns:
bs4.element.Tag or None: The first matching element, or None if no match is found.
- Examples:
>>> bs_find('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')['href'] 'https://web-platform-tests.org/'
>>> bs_find('https://www.w3.org/', 'span', 'class', 'alt-logo').string 'W3C'
New in version 2.0.
Changed in version 2.5: Function can now parse string content of a page or reparse a “bs_result”. It is now possible to parse a specific attribute.
- setupdevhelpers.bs_find_all(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs)[source]¶
Parse an HTML or XML web page with BeautifulSoup and retrieve a list of all matching results.
- Args:
url (str): URL of the web page or string to parse. element (str): Searched element. attribute (str): Selected attribute of the element. value (str): Value of the selected attribute. user_agent (str): Specify a user-agent if needed. proxies (dict): Specify proxies if needed. features (str): BeautifulSoup feature to use. **kwargs: Additional parameters for the requests library.
- Returns:
list: List of bs4.element.Tag objects representing the matching elements.
- Examples:
>>> bs_find_all('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')[0]['href'] 'https://web-platform-tests.org/'
>>> bs_find_all('https://www.w3.org/', 'span', 'class', 'alt-logo')[0].string 'W3C'
New in version 2.0.
Changed in version 2.5: Function can now parse string content of a page or reparse a “bs_result”. It is now possible to parse a specific attribute.
- setupdevhelpers.complete_control_categories(control)[source]¶
Requesting that the user supply package categories for the control.categories field if empty or Template is selected Args:
control (str or waptpackage.PackageEntry): The path of control file or his PackageEntry call
- setupdevhelpers.complete_control_description(control, control_description='', silent=False, blank_str='', maximum_length=None)[source]¶
Updates the control description field with the provided description.
- Args:
control (str or waptpackage.PackageEntry): The path of control file or his PackageEntry call control_description (str): The description to be set for the control.description field. silent (bool): If True, no user input dialog will be displayed. (default: False) blank_str (str): If the description contains this string, it will be cleared. (default: “” => do not clear) maximum_length (int): If the description has more characters than the defined value, the function will try to reduce it.
- Returns:
str: The updated control description.
- setupdevhelpers.complete_control_descriptions(control, control_descriptions_dict={'description': '', 'description_de': '', 'description_es': '', 'description_fr': '', 'description_it': '', 'description_nl': '', 'description_pl': '', 'description_pt': '', 'description_ru': ''}, silent=False, blank_str='')[source]¶
Updates the control description field with the provided description.
- Args:
control_description (str): The description to be set for the control.description field. silent (bool): If True, no user input dialog will be displayed. (default: False) blank_str (str): If the description contains this string, it will be cleared. (default: “”)
- Returns:
str: The updated control description.
- setupdevhelpers.complete_control_impacted_process(control, program_dir='')[source]¶
Complete package control.impacted_process.
- setupdevhelpers.complete_control_min_wapt_version(control, min_wapt_version='2.3', silent=False)[source]¶
Complete package control.min_wapt_version.
- setupdevhelpers.complete_control_name(control, control_name='', silent=False)[source]¶
Requesting that the user provide a package name to be entered into control.name field
- Args:
control (str or waptpackage.PackageEntry): The path of control file or his PackageEntry call control_name (str) : Prefilled control_name (default: control.name whitout “template” word) silent (bool) : If True, no user input dialog will be displayed. (default: False)
- setupdevhelpers.complete_control_package(control, control_package='', silent=False, remove_template_base_files=False)[source]¶
Requesting that the user provide a package name to be entered into the control.package field, and offering the possibility of removing the base files (icon.png and changelog.txt) for template package usage
- Args:
control (str or waptpackage.PackageEntry): The path of control file or his PackageEntry call control_package (str) : Prefilled control_package (default: actual control_package) silent (bool) : If True, no user input dialog will be displayed. (default: False) remove_template_base_files (bool): Removes base files if parameter is True and str(“template”) in control.package (default: False)
- setupdevhelpers.complete_control_version(control, version, inc_build=False)[source]¶
Complete package control.version. Return if the package have been updated
- setupdevhelpers.convert_control_path_to_dir(any_path)[source]¶
Converts the control file path to a directory path by removing the ‘WAPT/control’ suffix.
- Args:
any_path (str): The input file path.
- Returns:
str: The converted directory path with the ‘WAPT/control’ suffix removed.
- Example:
>>> convert_control_path_to_dir('/path/to/some_directory/WAPT/control') '/path/to/some_directory'
- setupdevhelpers.format_grid_to_dict(dict_list_data, key_name='parameter', value_name='value')[source]¶
- setupdevhelpers.get_7z_path()[source]¶
Check if the 7zip binary path exists and returns it.
- Returns:
str: path to the 7zip binary
- setupdevhelpers.get_control_dict(control)[source]¶
Load the control information of a package as dict based on its PackageEntry, its control file, or its package dir.
- Args:
control (PackageEntry or str): The PackageEntry, the control file path, or the package dir path.
- Returns:
dict: A dictionary containing the control information of the package.
- setupdevhelpers.get_proxies_from_wapt_console()[source]¶
Return proxy information from the current user WAPT console
>>> get_proxies_from_wapt_console() {'http': 'http://srvproxy.ad.domain.lan:8080', 'https': 'http://srvproxy.ad.domain.lan:8080'}
- setupdevhelpers.get_public_persistent_dir()[source]¶
Get the path to the public persistent directory.
- Returns:
str: Path to the public persistent directory.
- setupdevhelpers.get_public_persistent_file(fname)[source]¶
Get the path to a file in the public persistent directory.
- Args:
fname (str): Filename.
- Returns:
str: Path to the file in the public persistent directory.
- setupdevhelpers.get_size(any_path)[source]¶
Calculate the total size of files and directories in a path, including subdirectories.
- Args:
any_path (str): The directory or file path to start calculating the size from.
- Returns:
int: Total size in bytes.
- Raises:
FileNotFoundError: If the specified path does not exist.
- setupdevhelpers.remove_outdated_binaries(version, list_extensions=['exe', 'msi', 'deb', 'rpm', 'dmg', 'pkg', 'part'], filename_contains=None)[source]¶
Remove files based on the version contained in his filename, failing over on file version on compatible OSes
- Args:
version (str): version number of keeped files list_extensions (str or list of str): file extensions of compared files filename_contains (str or list of str): Part of the filename that must be contained (useful for distinguishing architecture and os)
- Returns:
list: list of deleted files
New in version 2.0.
Changed in version 2.2: Now returns removed files, now checking .exe and .msi file versions
- setupdevhelpers.remove_tree_for_all_users(user_folder_relative_path, ignored_users=None, ignore_system_users=True)[source]¶
Remove a specific folder or folders for all user profiles.
- Args:
user_folder_relative_path (str): Relative path to the user folder. Glob patterns can be used. ignored_users (str or list of str): Ignore specified users. ignore_system_users (bool): Ignore default, public, all users, etc. (True by default).
- Returns:
list: List of deleted folders.
>>> print(remove_tree_for_all_users(makepath(".vscode", "extensions", "ms-toolsai.jupyter-*"))) ['C:\Users\username\.vscode\extensions\ms-toolsai.jupyter-2022.2.1001817079', 'C:\Users\username\.vscode\extensions\ms-toolsai.jupyter-keymap-1.0.0', 'C:\Users\username\.vscode\extensions\ms-toolsai.jupyter-renderers-1.0.6']
>>> print(remove_tree_for_all_users(makepath(".vscode", "extensions", "ms-toolsai.jupyter-"))) []
>>> print(remove_tree_for_all_users(makepath(".vscode", "extensions", "ms-toolsai.jupyter-[a-z]*"))) ['C:\Users\username\.vscode\extensions\ms-toolsai.jupyter-keymap-1.0.0', 'C:\Users\username\.vscode\extensions\ms-toolsai.jupyter-renderers-1.0.6']
>>> print(remove_tree_for_all_users(makepath(".vscode", "extensions", "ms-toolsai.jupyter-1.0.0"))) ['/home/username/.vscode/extensions/ms-toolsai.jupyter-keymap-1.0.0']
- setupdevhelpers.unzip_with_7zip(filename, target=None, filenames=[], extract_with_full_paths=True, recursive=True)[source]¶
Extract the files from an archive file with 7zip with patterns in filenames to target directory
- Args:
filename (str): path to archive file. (can be relative to temporary unzip location of package) target (str): archive content to target dir location (dir will be created if needed). Default: dirname(archive file) + basename(archive file) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is recursive (bool): looking or not for filenames recursively
- Returns:
None
New in version 2.0.
Changed in version 2.2: changed default target location to make it correspond with unzip()
Waptpackage¶
- exception waptpackage.EWaptBadControl[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptBadPackageAttribute[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptBadSetup[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptBadSignature[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptConfigurationError[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptConflictingPackage(msg, install_status='ERROR', retry_count=None)[source]¶
Bases:
EWaptInstallError
- exception waptpackage.EWaptCorruptedFiles[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptDiskSpace[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptDownloadError[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptInstallError(msg, install_status='ERROR', retry_count=None)[source]¶
Bases:
EWaptException
Exception raised during installation of package msg is logged in local install database if retry_count is None, install will be retried indefinitely until success else install is retried at most retry_count times.
- exception waptpackage.EWaptInstallPostponed(msg, install_status='POSTPONED', retry_count=5, grace_delay=3600)[source]¶
Bases:
EWaptInstallError
- exception waptpackage.EWaptMissingLocalWaptFile[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNeedsNewerAgent[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNotAPackage[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNotSigned[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNotSourcesDirPackage[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptPackageSignError[source]¶
Bases:
EWaptException
Bases:
EWaptInstallError
- class waptpackage.HostCapabilities(from_dict=None, from_string_filter=None, **kwargs)[source]¶
Bases:
BaseObjectClass
- class waptpackage.PackageRequest(request=None, copy_from=None, **kwargs)[source]¶
Bases:
BaseObjectClass
Package and version request / condition The request is the basic packagename(=version) request Additional filters can be specified as arguments The list filters are ordered from most preferred to least preferred options
- Args:
request (str): packagename(<=>version) architectures (list) : list of x64, x86, arm, arm64, armhf locales (list) : list of 2 letters lki
- property architectures¶
List of accepted architecturs
- get_package_compare_key(pe1: PackageEntry) Tuple [source]¶
Compute a key for a package to compare it with other in the context of this request. This takes in account the preferences from filter like order of locale, architecture, or maturities which define preferences.
- Args:
pe1 (PackageEntry)
- Returns:
tuple
- is_matched_by(package_entry: PackageEntry, errors_list=None)[source]¶
Check if package_entry is matching this request
- property locales: List¶
- property maturities: List¶
List of accepted maturities
- property max_os_version¶
- property min_os_version¶
- property package¶
- property request¶
- property tags: list¶
List of accepted tags for OS
- property version¶
- waptpackage.PackageVersion(package_or_versionstr) Tuple [source]¶
Splits a version string 1.2.3.4-567 software version is clipped to 4 members if ‘-packaging’ is not provided, the second member will be 0 for safe comparison
- Args:
package_or_versionstr (str): package version string
- Returns:
tuple: (Version,int) : soft version on 4 members / packaging as an int
- waptpackage.PackageVersionStr(package_or_versionstr) Tuple [source]¶
Splits a version string 1.2.3.4-567 software version is clipped to 4 members if ‘-packaging’ is not provided, the second member will be 0 for safe comparison
- Args:
package_or_versionstr (str): package version string
- Returns:
tuple: (Version,int) : soft version on 4 members / packaging as an int
- class waptpackage.WaptBaseRepo(name='abstract', cabundle=None, config=None, section=None)[source]¶
Bases:
BaseObjectClass
Base abstract class for a Wapt Packages repository
- property authorized_certificates: Sequence[dict]¶
List of authorized signers certificates attributes
- Returns:
list [dict]
- property cabundle¶
- get_certificates(packages_zipfile=None)[source]¶
Download signers certificates and crl from Package index on remote repository.
These certificates and CRL are appended to Packages index when scanning packages.
- Args:
packages_zipfile (zipfile): if None, donwload it from repo
- Returns :
SSLCABundle or None if Packages does not exists
- get_package_entries(packages_names)[source]¶
Return most up to date packages entries for packages_names packages_names is either a list or a string Returns:
dict: a dictionnary with {‘packages’:[],’missing’:[]}
>>> r = WaptRemoteRepo() >>> r.load_config_from_file('c:/wapt/wapt-get.ini') >>> res = r.get_package_entries(['tis-firefox','tis-putty']) >>> isinstance(res['missing'],list) and isinstance(res['packages'][0],PackageEntry) True
- invalidate_packages_cache()[source]¶
Reset in memory packages index Returns the old content of cached (packages, packages index date, discarded packages)
- Returns:
dict : old cache status dict(_packages=self._packages,_packages_date=self._packages_date,discarded=self.discarded)
- is_locally_allowed_package(package)[source]¶
Return True if package is not in blacklist and is in whitelist if whitelist is not None packages_whitelist and packages_blacklist are list of package name wildcards (file style wildcards) blacklist is taken in account first if defined. whitelist is taken in acoount if not None, else all not blacklisted package names are allowed.
- load_config(config=None, section=None)[source]¶
Load configuration from inifile section. Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file Value not defined in ini file are taken from class _default_config dict
load_config is called at __init__, eventually with config = None. In this case, all parameters are initialized from defaults
- Args:
config (RawConfigParser): ini configuration section (str) : section where to loads parameters
defaults to name of repository
- Returns:
self: return itself to chain calls.
- load_config_from_file(config_filename, section=None)[source]¶
Load repository configuration from an inifile located at config_filename
- Args:
config_filename (str) : path to wapt inifile section (str): ini section from which to get parameters. default to repo name
- Returns:
WaptBaseRepo: self
- need_update(last_modified=None)[source]¶
Check if packages index has changed on repo and local index needs an update
- Compare date on local package index DB with the Packages file on remote
repository with a HEAD http request.
- Args:
last_modified (str): iso datetime of last known update of packages.
- Returns
- bool: True if either Packages was never read or remote date of Packages is
more recent than the provided last_modifed date.
>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4) >>> waptdb = WaptDB('c:/wapt/db/waptdb.sqlite') >>> res = repo.need_update(waptdb.read_param('last-%s'% repo.url)) >>> isinstance(res,bool) True
- packages()[source]¶
Return list of packages, load it from repository if not yet available in memory To force the reload, call invalidate_index_cache() first or update()
- packages_date()[source]¶
Date of last known packages index
- Returns:
str: date/time of Packages index in iso format (string)
- packages_matching(package_cond: Optional[Union[str, PackageRequest]] = None, **kwargs) list [source]¶
Return an ordered list of available packages entries which match the condition “packagename[([=<>]version)]?” version ascending
- Args:
package_cond (str or PackageRequest): package name with optional version specifier.
- Returns:
list of PackageEntry
>>> from waptpackage import * >>> r = WaptRemoteRepo('http://wapt.tranquil.it/wapt') >>> r.packages_matching('tis-firefox(>=20)') [PackageEntry('tis-firefox','20.0.1-02'), PackageEntry('tis-firefox','21.0.0-00'), ...]
- property public_certs_dir¶
- property repo_url¶
- search(searchwords=[], sections=[], newest_only=False, exclude_sections=[], description_locale=None, host_capabilities=None, package_request=None, dependencies_list=[])[source]¶
- Return list of package entries
with description or name matching all the searchwords and section in provided sections list
- Args:
searchwords (list or csv) : list of word to lookup in description and package names sections (list or csv) : list of package sections to use when searching newest_only (bool) : returns only highest version of package exclude_sections (list or csv): list of package sections to exclude when searching description_locale (str): if not None, search in description using this locale host_capabilities (HostCapabilities or dict): restrict output to these capabilities (os version locales, arch etc..) package_request (PackageRequest or dict) : restrict output to these filters, and sort output based on them
- Returns:
list of PackageEntry with additional _localized_description added if description_locale is provided
>>> r = WaptRemoteRepo(name='test',url='http://wapt.tranquil.it/wapt') >>> r.search('test')
- class waptpackage.WaptLocalRepo(localpath=None, name='waptlocal', cabundle=None, config=None, section=None)[source]¶
Bases:
WaptBaseRepo
- Index of Wapt local repository.
- Index of packages is located in a Packages zip file, having one
Packages file, containing the concatenated content of “control” files of the packages.
A blank line means new package.
>>> localrepo = WaptLocalRepo('c:/wapt/cache') >>> localrepo.update()
- is_available(url=None)[source]¶
Check if repo is reachable an return creation date of Packages.
- Returns:
str: Iso creation date of remote Package file as returned in http headers
- load_config(config=None, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file
- Args:
config (RawConfigParser): ini configuration section (str) : section where to loads parameters
defaults to name of repository
- Returns:
WaptRemoteRepo: return itself to chain calls.
- property packages_path¶
- property repo_url¶
- update_packages_index(force_all=False, proxies=None, canonical_filenames=False, include_host_packages=False, include_certificates=True, include_crls=True, extract_icons=True)[source]¶
Scan self.localpath directory for WAPT packages and build a Packages (utf8) zip file with control data and MD5 hash
Extract icons from packages (WAPT/icon.png) and stores them in <repo path>/icons/<package name>.png Extract certificate and add it to Packages zip file in ssl/<fingerprint.crt> Append CRL for certificates.
- Returns:
dict : {‘processed’:processed,’kept’:kept,’errors’:errors,’packages_filename’:packages_fname}
- class waptpackage.WaptRemoteRepo(url=None, name='', verify_cert=None, http_proxy=None, timeout=None, cabundle=None, config=None, section=None)[source]¶
Bases:
WaptBaseRepo
Gives access to a remote http repository, with a zipped Packages packages index
>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4) >>> last_modified = repo.is_available() >>> isinstance(last_modified,str) True
- client_auth()[source]¶
Return SSL trio filenames for client side SSL auth
- Returns:
tuple: (cert filename,key filename,key pwd)
- download_icons(package_requests, target_dir=None, usecache=True, printhook=None)[source]¶
Download a list of icons from packages (requests are of the form packagename (>version) ) returns a dict of {“downloaded,”skipped”,”errors”}
If package_requests is a list of PackageEntry, update localpath of entry to match downloaded file.
- Args:
package_requests (list) : list of PackageEntry to download or list of package with optional version
- Returns:
dict: ‘packages’, ‘downloaded’, ‘skipped’, ‘errors’
>>> repo = WaptRemoteRepo(url='http://wapt.tranquil.it/wapt') >>> wapt.download_packages(['tis-firefox','tis-waptdev'],printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- download_packages(package_requests, target_dir=None, usecache=True, printhook=None)[source]¶
- Download a list of packages (requests are of the form packagename (>version) )
returns a dict of {“downloaded,”skipped”,”errors”}
If package_requests is a list of PackageEntry, update localpath of entry to match downloaded file.
- Args:
package_requests (list) : list of PackageEntry to download or list of package with optional version
- Returns:
dict: ‘packages’, ‘downloaded’, ‘skipped’, ‘errors’
>>> repo = WaptRemoteRepo(url='http://wapt.tranquil.it/wapt') >>> wapt.download_packages(['tis-firefox','tis-waptdev'],printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- get_requests_session(url=None, http_proxy=None)[source]¶
Returns a requests session object with optional ssl client side auth and proxies
- Returns:
requests.Session
- is_available(url=None, http_proxy=None)[source]¶
Check if repo is reachable an return creation date of Packages.
Try to access the repo and return last modified date of repo index or None if not accessible
- Returns:
str: Iso creation date of remote Package file as returned in http headers
>>> repo = WaptRemoteRepo(name='main',url='https://wapt/wapt',timeout=1) >>> repo.is_available() <= datetime2isodate() True >>> repo = WaptRemoteRepo(name='main',url='https://badwapt/wapt',timeout=1) >>> repo.is_available() is None True
- load_config(config=None, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided.
Use ‘global’ if no section named section in ini file
- Args:
config (RawConfigParser): ini configuration section (str) : section where to loads parameters
defaults to name of repository
- Returns:
WaptRemoteRepo: return itself for chain calls.
- packages_url(url=None)[source]¶
return url of Packages index file
>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4) >>> repo.packages_url 'http://wapt/wapt/Packages'
hardcoded path to the Packages index.
- property proxies¶
dict for http proxies url suitable for requests based on the http_proxy repo attribute
- Returns:
dict: {‘http’:’http://proxy:port’,’https’:’http://proxy:port’}
- property repo_url¶
- waptpackage.control_to_dict(control, int_params=('size', 'installed_size'), out_excluded_control_keys=['filename', 'size', 'md5sum', 'repo_url', 'repo'], out_control_lines=None)[source]¶
Convert a control file like object key1: value1 key2: value2 … list of lines into a dict
Multilines strings begins with a space
Breaks when an empty line is reached (limit between 2 package in Packages indexes)
- Args:
control (file,str or list): file like object to read control from (until an empty line is reached) int_params (list): attributes which must be converted to int out_control_lines (list): if not None, decoded raw control lines are appended to this list.
- Returns:
dict
- waptpackage.parse_major_minor_patch_build(version)[source]¶
Parse version to major, minor, patch, pre-release, build parts.
- waptpackage.update_packages(adir, force=False, proxies=None, canonical_filenames=False)[source]¶
Helper function to update a local packages index
- This function is used on repositories to rescan all packages and
update the Packages index.
>>> if os.path.isdir('c:\wapt\cache'): ... repopath = 'c:\wapt\cache' ... else: ... repopath = '/var/www/wapt' >>> p = PackageEntry() >>> p.package = 'test' >>> p.version = '10' >>> new_package_fn = os.path.join(repopath,p.make_package_filename()) >>> if os.path.isfile(new_package_fn): ... os.unlink(new_package_fn) >>> res = update_packages(repopath) >>> os.path.isfile(res['packages_filename']) True >>> r = WaptLocalRepo(localpath=repopath) >>> l1 = r.packages() >>> res = r.update_packages_index() >>> l2 = r.packages() >>> [p for p in l2 if p not in l1] ["test (=10)"]
Common¶
- exception common.EWaptBadServerAuthentication[source]¶
Bases:
EWaptException
- class common.Wapt(config_filename=None, defaults=None, disable_update_server_status=True, wapt_base_dir=None, dbpath=None)[source]¶
Bases:
BaseObjectClass
Global WAPT engine
- add_hosts_repo() WaptHostRepo [source]¶
Add an automatic host repository, remove existing WaptHostRepo last one before
- add_pyscripter_project(target_directory)[source]¶
Add a pyscripter project file to package development directory.
- Args:
target_directory (str): path to location where to create the wapt.psproj file.
- Returns:
None
- add_vscode_project(target_directory)[source]¶
Add .vscode folder with project files to the package development directory
- Args:
target_directory (str): path to the package development directory.
- Returns:
None
- audit(package, force=False, audited_by=None) str [source]¶
Run the audit hook for the installed package” Source setup.py from database, filename, or packageEntry Stores the result and log into “wapt_localstatus” table
- Args:
package (PackageEntry or directory or package name or {package_uuid} ): package to audit
- Returns:
str : iso datetime of this audit as stored in packages status table
- audit_data_expired(section, key, value)[source]¶
Check if the latest value associated with section/key is expired
- Returns:
bool: True is data exists and has expires or if data does not exists.
- authorized_certificates()[source]¶
return a list of autorized package certificate issuers for this host check_certificates_validity enable date checking.
- build_package(directoryname, target_directory=None, excludes=[])[source]¶
Build the WAPT package from a directory
- Args:
directoryname (str): source root directory of package to build inc_package_release (boolean): increment the version of package in control file. set_maturity (str): if not None, change package maturity to this. Can be something like DEV, PROD etc..
- Returns:
str: Filename of built WAPT package
- build_upload(sources_directories, private_key_passwd=None, wapt_server_user=None, wapt_server_passwd=None, inc_package_release=False, target_directory=None, set_maturity=None)[source]¶
Build a list of packages and upload the resulting packages to the main repository. if section of package is group or host, user specific wapt-host or wapt-group
- Returns
list: list of filenames of built WAPT package
- property cabundle¶
- call_python_code(python_filename, func_name, package_entry=None, force=None, params=None, working_dir=None, import_modules=[])[source]¶
Calls a function in python_filename. Set basedir, control, and run context within the function context.
- Args:
python_filename : python filename mith module to load. func_name (str): name of function to call in setuppy package_entry (PackageEntry): if not None, use it to set environment
- Returns:
output of hook.
- check_all_depends_conflicts()[source]¶
Check the whole dependencies/conflicts tree for installed packages
- check_depends(apackages, forceupgrade=False, force=False, assume_removed=[], package_request_filter=None)[source]¶
Given a list of packagename or requirement “name (=version)”, return a dictionnary of {‘additional’ ‘upgrade’ ‘install’ ‘skipped’ ‘unavailable’,’remove’} of [packagerequest,matching PackageEntry]
- Args:
apackages (str or list): list of packages for which to check missing dependencies. forceupgrade (boolean): if True, check if the current installed packages is the latest available force (boolean): if True, install the latest version even if the package is already there and match the requirement assume_removed (list): list of packagename which are assumed to be absent even if they are actually installed to check the
consequences of removal of packages, implies force=True
- package_request_filter (PackageRequest): additional filter to apply to packages to sort by locales/arch/mat preferences
if None, get active host filter
- Returns:
dict : {‘additional’ ‘upgrade’ ‘install’ ‘skipped’ ‘unavailable’, ‘remove’} with list of [packagerequest,matching PackageEntry]
- check_downloads(apackages=None, usecache=True)[source]¶
Return list of available package entries to match supplied packages requirements
- Args:
apackages (list or str): list of packages usecache (bool) : returns only PackageEntry not yet in cache
- Returns:
list: list of PackageEntry to download
- check_install(apackages=None, force=True, forceupgrade=True)[source]¶
Return a list of actions required for install of apackages list of packages if apackages is None, check for all pending updates.
- Args:
apackages (str or list): list of packages or None to check pending install/upgrades force (boolean): if True, already installed package listed in apackages
will be considered to be reinstalled
- forceupgrade: if True, all dependencies are upgraded to latest version,
even if current version comply with depends requirements
- Returns:
- dict: with keys [‘skipped’, ‘additional’, ‘remove’, ‘upgrade’, ‘install’, ‘unavailable’] and list of
(package requirements, PackageEntry)
- check_install_running(max_ttl=60)[source]¶
Check if an install is in progress, return list of pids of install in progress Kill old stucked wapt-get processes/children and update db status max_ttl is maximum age of wapt-get in minutes
- check_remove(apackages)[source]¶
Return a list of additional package to remove if apackages are removed
- Args:
apackages (str or list of req or PackageRequest): list of packages for which parent dependencies will be checked.
- Returns:
list: list of PackageRequest with broken dependencies
- cleanup(obsolete_only=False)[source]¶
Remove cached WAPT files from local disk
- Args:
- obsolete_only (boolean): If True, remove packages which are either no more available,
or installed at a equal or newer version
- Returns:
list: list of filenames of removed packages
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> l = wapt.download_packages(wapt.check_downloads()) >>> res = wapt.cleanup(True)
- cleanup_session_setup()[source]¶
Remove all current user session_setup informations for removed packages
- create_or_update_host_certificate(force_recreate=False)[source]¶
- Create a rsa key pair for the host and a x509 certiticate.
Location of key is <wapt_root>private Should be kept secret restricted access to system account and administrators only.
- Args:
force_recreate (bool): recreate key pair even if already exists for this FQDN.
- Returns:
str: x509 certificate of this host.
- property dbpath¶
- dependencies(packagename, expand=False)[source]¶
Return all dependecies of a given package >>> w = Wapt(config_filename=’c:/wapt/wapt-get.ini’) >>> dep = w.dependencies(‘tis-waptdev’) >>> isinstance(dep,list) and isinstance(dep[0],PackageEntry) True
- download_icons(package_requests, usecache=True, printhook=None)[source]¶
Download a list of package icons (requests are of the form packagename (>version) ) returns a dict of {“downloaded,”skipped”,”errors”}
- Args:
package_requests (str or list): list of packages to prefetch usecache (boolean) : if True, don’t download package if already in cache printhook (func) : callback with signature report(received,total,speed,url) to display progress
- Returns:
dict: with keys {“downloaded,”skipped”,”errors”,”packages”} and list of PackageEntry.
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> def nullhook(*args): ... pass >>> wapt.download_packages(['tis-firefox','tis-waptdev'],usecache=False,printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- download_packages(package_requests, usecache=True, printhook=None)[source]¶
Download a list of packages (requests are of the form packagename(=version) ) If several packages are matching a request, the highest/latest only is kept.
- Args:
package_requests (str or list): list of packages to prefetch usecache (boolean) : if True, don’t download package if already in cache printhook (func) : callback with signature report(received,total,speed,url) to display progress
- Returns:
dict: with keys {“downloaded,”skipped”,”errors”,”packages”} and list of PackageEntry.
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> def nullhook(*args): ... pass >>> wapt.download_packages(['tis-firefox','tis-waptdev'],usecache=False,printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- duplicate_package(packagename, newname=None, newversion=None, newmaturity=None, target_directory=None, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, auto_inc_version=True, usecache=True, printhook=None, cabundle=None)[source]¶
Duplicate an existing package. Duplicate an existing package from declared repostory or file into targetdirectory with
optional newname and version.
- Args:
packagename (str) : packagename to duplicate, or filepath to a local package or package development directory. newname (str): name of target package newversion (str): version of target package. if None, use source package version target_directory (str): path where to put development files. If None, use temporary. If empty, use default development dir append_depends (list): comma str or list of depends to append. remove_depends (list): comma str or list of depends to remove. auto_inc_version (bool): if version is less than existing package in repo, set version to repo version+1 usecache (bool): If True, allow to use cached package in local repo instead of downloading it. printhook (func): hook for download progress cabundle (SSLCABundle): list of authorized ca certificate (SSLPublicCertificate) to check authenticity of source packages. If None, no check is performed.
- Returns:
PackageEntry : new packageEntry with sourcespath = target_directory
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> wapt.dbpath = ':memory:' >>> r= wapt.update() >>> def nullhook(*args): ... pass >>> tmpdir = 'c:/tmp/testdup-wapt' >>> if os.path.isdir(tmpdir): ... import shutil ... shutil.rmtree(tmpdir) >>> p = wapt.duplicate_package('tis-wapttest', ... newname='testdup', ... newversion='20.0-0', ... target_directory=tmpdir, ... excludes=['.svn','.git','.gitignore','*.pyc','src'], ... append_depends=None, ... auto_inc_version=True, ... usecache=False, ... printhook=nullhook) >>> print repr(p['package']) PackageEntry('testdup','20.0-0') >>> if os.path.isdir(tmpdir): ... import shutil ... shutil.rmtree(tmpdir) >>> p = wapt.duplicate_package('tis-wapttest', ... target_directory=tempfile.mkdtemp('wapt'), ... auto_inc_version=True, ... append_depends=['tis-firefox','tis-irfanview'], ... remove_depends=['tis-wapttestsub'], ... ) >>> print repr(p['package']) PackageEntry('tis-wapttest','120')
- edit_host(hostname, target_directory=None, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, printhook=None, description=None, cabundle=None)[source]¶
Download and extract a host package from host repositories into target_directory for modification
- Args:
hostname (str) : fqdn of the host to edit target_directory (str) : where to place the developments files. if empty, use default one from wapt-get.ini configuration append_depends (str or list) : list or comma separated list of package requirements remove_depends (str or list) : list or comma separated list of package requirements to remove cabundle (SSLCA Bundle) : authorized ca certificates. If None, use default from current wapt.
- Returns:
PackageEntry
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> tmpdir = 'c:/tmp/dummy' >>> wapt.edit_host('dummy.tranquilit.local',target_directory=tmpdir,append_depends='tis-firefox') >>> import shutil >>> shutil.rmtree(tmpdir) >>> host = wapt.edit_host('htlaptop.tranquilit.local',target_directory=tmpdir,append_depends='tis-firefox') >>> 'package' in host True >>> shutil.rmtree(tmpdir)
- edit_package(packagerequest, target_directory='', use_local_sources=True, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, auto_inc_version=True, cabundle=None)[source]¶
Download an existing package from repositories into target_directory for modification if use_local_sources is True and no newer package exists on repos, updates current local edited data else if target_directory exists and is not empty, raise an exception
- Args:
packagerequest (str) : path to existing wapt file, or package request use_local_sources (boolean) : don’t raise an exception if target exist and match package version append_depends (list of str) : package requirements to add to depends remove_depends (list or str) : package requirements to remove from depends auto_inc_version (bool) : cabundle (SSLCABundle) : list of authorized certificate filenames. If None, use default from current wapt.
- Returns:
PackageEntry : edit local package with sourcespath attribute populated
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> wapt.dbpath = ':memory:' >>> r= wapt.update() >>> tmpdir = tempfile.mkdtemp('wapt') >>> res = wapt.edit_package('tis-wapttest',target_directory=tmpdir,append_depends='tis-firefox',remove_depends='tis-7zip') >>> res['target'] == tmpdir and res['package'].package == 'tis-wapttest' and 'tis-firefox' in res['package'].depends True >>> import shutil >>> shutil.rmtree(tmpdir)
- forget_packages(packages_list)[source]¶
Remove install status for packages from local database without actually uninstalling the packages
- Args:
packages_list (list): list of installed package names to forget
- Returns:
list: list of package names actually forgotten
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> res = wapt.install('tis-test') ??? >>> res = wapt.is_installed('tis-test') >>> isinstance(res,PackageEntry) True >>> wapt.forget_packages('tis-test') ['tis-test'] >>> wapt.is_installed('tis-test') >>> print wapt.is_installed('tis-test') None
- generate_host_uuid()[source]¶
Regenerate a random UUID for this host or force with supplied one.
Normally, the UUID is taken from BIOS through wmi.
In case bios returns some duplicates or garbage, it can be useful to force a random uuid. This is stored as uuid key in wapt-get.ini.
In case we want to link th host with a an existing record on server, we can force a old UUID.
- get_default_development_dir(packagecond, section='base')[source]¶
Returns the default development directory for package named <packagecond> based on default_sources_root ini parameter if provided
- Args:
packagecond (PackageEntry or str): either PackageEntry or a “name(=version)” string
- Returns:
unicode: path to local proposed development directory
- get_host_certificate()[source]¶
Return the current host certificate. If the certificate does not yet exist, it is created as a self signed certificate and stored in get_host_certificate_filename path
- Returns:
SSLCertificate: host public certificate.
- get_host_certificate_filename()[source]¶
Full filepath of the host own certificate and RSA public key
- Returns:
str
- get_host_certificate_signing_request()[source]¶
Return a CSR with CN and AltSubjectNames pointing to this host uuid
- Returns:
SSLCertificateSigningRequest: host public certificate sigbinbg request.
- get_host_key(create=True)[source]¶
Return private key used to sign uploaded data from host Create key if it does not exists yet.
- Returns:
SSLPrivateKey: Private key used to sign data posted by host.
- get_host_packages()[source]¶
Return list of implicit available host packages based on computer UUID and AD Org Units
- Returns:
list: list of PackageEntry.
- get_host_packages_names()[source]¶
Return list of implicit host package names based on computer UUID and AD Org Units
- Returns:
list: list of str package names.
- get_installed_host_packages()[source]¶
Get the implicit package names (host and unit packages) which are installed but no longer relevant
- Returns:
list: of installed package names
- get_installer_defaults(installer_path)[source]¶
Returns guessed default values for package templates based on installer binary
- Args:
installer_path (str): filepath to installer
- Returns:
dict:
>>> get_installer_defaults(r'c: ranquilit\wapt ests\SumatraPDF-3.1.1-install.exe') {'description': u'SumatraPDF Installer (Krzysztof Kowalczyk)', 'filename': 'SumatraPDF-3.1.1-install.exe', 'silentflags': '/VERYSILENT', 'simplename': u'sumatrapdf-installer', 'type': 'UnknownExeInstaller', 'version': u'3.1.1'}
>>> get_installer_defaults(r'c: ranquilit\wapt estsz920.msi') {'description': u'7-Zip 9.20 (Igor Pavlov)', 'filename': '7z920.msi', 'silentflags': '/q /norestart', 'simplename': u'7-zip-9.20', 'type': 'MSI', 'version': u'9.20.00.0'}
- get_json_config_filename(config_name)[source]¶
Returns the filename for a json config named <config_name>
- get_last_update_status()[source]¶
Get update status of host as stored at the end of last operation.
- Returns:
- dict:
‘date’: timestamp of last operation ‘runstatus’: last printed message of wapt core ‘running_tasks’: list of tasks ‘errors’: list of packages not installed properly ‘upgrades’: list of packages which need to be upgraded
- get_next_audit_datetime()[source]¶
Return next datetime for next audit loop = minimum(next_audit_date)
- get_outdated_host_packages()[source]¶
Check and return the available host packages available and not installed
- get_package_attribute(package, key, default_value=None)[source]¶
Store in local db a key/value pair for later use
- get_package_entries(packages_names)[source]¶
Return most up to date packages entries for packages_names packages_names is either a list or a string.
‘missing’ key lists the package requirements which are not available in the package index.
- Args;
packages_names (list or str): list of package requirements
- Returns:
dict : {‘packages’:[PackageEntries,],’missing’:[str,]}
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> res = wapt.get_package_entries(['tis-firefox','tis-putty']) >>> isinstance(res['missing'],list) and isinstance(res['packages'][0],PackageEntry) True
- get_previous_package_params(package_entry)[source]¶
Return the params used when previous install of package_entry.package If no previous install, return {} The params are stored as json string in local package status table.
- Args:
package_entry (PackageEntry): package request to lookup.
- Returns:
dict
- get_sources(package)[source]¶
Download sources of package (if referenced in package as a https svn) in the current directory
- Args:
package (str or PackageRequest): package to get sources for
- Returns:
str : checkout directory path
- get_unrelevant_host_packages()[source]¶
Get the implicit package names (host and unit packages) which are installed but no longer relevant
- Returns:
list: of installed package names
- global_attributes = ['wapt_base_dir', 'waptserver', 'config_filename', 'proxies', 'repositories', 'personal_certificate_path', 'public_certs_dir', 'packages_cache_dir', 'dbpath', 'http_proxy', 'use_http_proxy_for_repo', 'use_http_proxy_for_server', 'limit_bandwidth', 'waptservice_user', 'waptservice_password', 'waptservice_admin_filter', 'waptservice_port', 'waptservice_poll_timeout', 'locales', 'custom_tags', 'packages_whitelist', 'packages_blacklist', 'maturities', 'host_uuid', 'use_fqdn_as_uuid', 'use_hostpackages', 'use_ad_groups', 'use_repo_rules', 'host_profiles', 'host_organizational_unit_dn', 'host_ad_site', 'allow_user_service_restart', 'allow_remote_shutdown', 'allow_remote_reboot', 'ldap_auth_server', 'ldap_auth_base_dn', 'ldap_auth_ssl_enabled', 'verify_cert_ldap', 'loglevel', 'loglevel_waptcore', 'loglevel_waptservice', 'loglevel_wapttasks', 'loglevel_waptws', 'loglevel_waptdb', 'loglevel_websocket', 'loglevel_waitress', 'log_to_windows_events', 'download_after_update_with_waptupdate_task_period', 'websockets_ping', 'websockets_retry_delay', 'websockets_check_config_interval', 'websockets_hurry_interval', 'notify_user', 'waptaudit_task_period', 'signature_clockskew', 'wol_relay', 'hiberboot_enabled', 'max_gpo_script_wait', 'pre_shutdown_timeout', 'minimum_battery_percent', 'uninstallkey_timeout', 'check_certificates_validity', 'token_lifetime', 'repositories', 'trust_all_certs_in_pems', 'include_dmi_inventory', 'include_wmi_inventory', 'wapt_temp_dir']¶
- property hiberboot_enabled¶
get HiberbootEnabled.
- host_capabilities()[source]¶
Return the current capabilities of host taken in account to determine packages list and whether update should be forced (when filter criteria are updated) This includes host certificate,architecture,locale,authorized certificates
- Returns:
dict
- host_capabilities_fingerprint()[source]¶
Return a fingerprint representing the current capabilities of host This includes host certificate,architecture,locale,authorized certificates
- Returns:
str
- property host_dn¶
- property host_organizational_unit_dn¶
Get host org unit DN from wapt-get.ini [global] host_organizational_unit_dn if defined or from registry as supplied by AD / GPO process
- property host_profiles¶
- property host_site¶
- property host_uuid: str¶
- http_upload_package(packages, wapt_server_user=None, wapt_server_passwd=None, progress_hook=None)[source]¶
Upload a package or host package to the waptserver.
- Args:
packages (str or list): list of filepaths or PackageEntry to wapt packages to upload wapt_server_user (str) : user for basic auth on waptserver wapt_server_passwd (str) : password for basic auth on waptserver
Returns:
>>> from common import * >>> wapt = Wapt(config_filename = r'C:\tranquilit\wapt\tests\wapt-get.ini') >>> r = wapt.update() >>> d = wapt.duplicate_package('tis-wapttest','toto') >>> print d {'target': u'c:\\users\\htouvet\\appdata\\local\\temp\\toto.wapt', 'package': PackageEntry('toto','119')} >>> wapt.http_upload_package(d['package'],wapt_server_user='admin',wapt_server_passwd='password')
- install(apackages=None, force=False, params_dict={}, download_only=False, usecache=True, printhook=None, installed_by=None, only_priorities=None, only_if_not_process_running=False, process_dependencies=True)[source]¶
Install a list of packages and its dependencies removes first packages which are in conflicts package attribute
Returns a dictionary of (package requirement,package) with ‘install’,’skipped’,’additional’
- Args:
apackages (list or str): list of packages requirements “packagename(=version)” or list of PackageEntry. force (bool) : reinstalls the packages even if it is already installed params_dict (dict) : parameters passed to the install() procedure in the packages setup.py of all packages
as params variables and as “setup module” attributes
download_only (bool) : don’t install package, but only download them usecache (bool) : use the already downloaded packages if available in cache directory printhook (func) : hook for progress print
- Returns:
- dict: with keys [‘skipped’, ‘additional’, ‘remove’, ‘upgrade’, ‘install’, ‘unavailable’] and list of
(package requirements, PackageEntry)
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> def nullhook(*args): ... pass >>> res = wapt.install(['tis-wapttest'],usecache=False,printhook=nullhook,params_dict=dict(company='toto')) >>> isinstance(res['upgrade'],list) and isinstance(res['errors'],list) and isinstance(res['additional'],list) and isinstance(res['install'],list) and isinstance(res['unavailable'],list) True >>> res = wapt.remove('tis-wapttest') >>> res == {'removed': ['tis-wapttest'], 'errors': []} True
- install_immediate(force=False, only_priorities=None, only_if_not_process_running=False)[source]¶
Install pending packages which must be forcibly installed at a specific time.
- Args:
only_if_not_process_running: install package only if impacted_process are not running
- Returns:
- dict: {‘upgrade’: [], ‘additional’: [], ‘downloads’:
{‘downloaded’: [], ‘skipped’: [], ‘errors’: []},
‘remove’: [], ‘skipped’: [], ‘install’: [], ‘errors’: [], ‘unavailable’: []}
- install_json_config(conf, config_name=None, priority=None)[source]¶
Add a dynamic configuration from dict conf with name config_name and priority
- install_json_config_file(json_config_file, config_name=None, priority=None)[source]¶
Install a json config stored in a file
- install_json_config_from_url(url=None, config_hash=None, config_name='default_config', priority=None)[source]¶
Load a json config from the remote wapt server (default location is /var/www/wapt/conf.d/<config_name>-<config_hash>.json) given its name and hash.
- install_json_configb64(json_config_b64, config_name=None, priority=None)[source]¶
Install a json config encoded as a base64 string
- install_wapt(fname, params_dict={}, explicit_by=None, force=None)[source]¶
Install a single wapt package given its WAPT filename. return install status
- Args:
fname (str): Path to wapt Zip file or unzipped development directory params (dict): custom parameters for the install function explicit_by (str): identify who has initiated the install
- Returns:
str: ‘OK’,’ERROR’
Raises:
EWaptMissingCertificate EWaptNeedsNewerAgent EWaptUnavailablePackage EWaptConflictingPackage EWaptBadPackageAttribute EWaptException various Exception depending on setup script
- installed(include_errors=False)[source]¶
Returns all installed packages with their status
- Args:
include_errors (boolean): include packages wnot installed successfully
- Returns:
list: list of PackageEntry merged with local install status.
- inventory()[source]¶
Return full inventory of the computer as a dictionary.
- Returns:
dict: {‘host_info’,’wapt_status’,’dmi’,’installed_softwares’,’installed_packages’}
- …changed:
1.4.1: renamed keys 1.6.2.4: removed setup.py from packages inventory.
- is_available(packagename)[source]¶
Check if a package (with optional version condition) is available in repositories.
- Args:
packagename (str) : package name to lookup or package requirement ( packagename(=version) )
- Returns:
list : of PackageEntry sorted by package version ascending
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> l = wapt.is_available('tis-wapttest') >>> l and isinstance(l[0],PackageEntry) True
- is_installed(packagename, include_errors=False)[source]¶
Checks if a package is installed. Return package entry and additional local status or None
- Args:
packagename (str): name / {package_uuid}/ package request to query
- Returns:
- PackageEntry: None en PackageEntry merged with local install_xxx fields
install_date
install_output
install_params
install_status
- is_locally_allowed_package(package)[source]¶
Return True if package is not in blacklist and is in whitelist if whitelist is not None packages_whitelist and packages_blacklist are list of package name wildcards (file style wildcards) blacklist is taken in account first if defined. whitelist is taken in acoount if not None, else all not blacklisted package names are allowed.
- is_wapt_package_development_dir(directory)[source]¶
Return PackageEntry if directory is a wapt developement directory (a WAPT/control file exists) or False
- is_wapt_package_file(filename)[source]¶
Return PackageEntry if filename is a wapt package or False True if file ends with .wapt and control file can be loaded and decoded from zip file
- Args:
filename (str): path to a file
- Returns:
False or PackageEntry
- last_install_log(packagename)[source]¶
Get the printed output of the last install of package named packagename
- Args:
packagename (str): name of package to query
- Returns:
dict: {status,log} of the last install of a package
>>> w = Wapt() >>> w.last_install_log('tis-7zip') ??? {'install_status': u'OK', 'install_output': u'Installing 7-Zip 9.38.0-1\n7-Zip already installed, skipping msi install\n',install_params: ''}
- list(searchwords=[])[source]¶
Returns a list of installed packages which have the searchwords in their description
- Args:
- searchwords (list): list of words to llokup in package name and description
only entries which have words in the proper order are returned.
- Returns:
list: list of PackageEntry matching the search words
>>> w = Wapt() >>> w.list('zip') [PackageEntry('tis-7zip','16.4-8') ]
- list_upgrade(current_datetime=None)[source]¶
Returns a list of package requirements for packages which should be installed / upgraded / removed
- Returns:
dict: {‘additional’: [], ‘install’: [], ‘remove’: [], ‘upgrade’: []}
- load_config(config_filename=None, merge_config_packages=None)[source]¶
Load configuration parameters from supplied inifilename
- make_group_template(packagename='', maturity=None, depends=None, conflicts=None, directoryname=None, section='group', description=None)[source]¶
Creates or updates on disk a skeleton of a WAPT group package. If the a package skeleton already exists in directoryname, it is updated.
sourcespath attribute of returned PackageEntry is populated with the developement directory of group package.
- Args:
packagename (str): group name depends : conflicts directoryname section description
- Returns:
PackageEntry
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> tmpdir = 'c:/tmp/dummy' >>> if os.path.isdir(tmpdir): ... import shutil ... shutil.rmtree(tmpdir) >>> p = wapt.make_group_template(packagename='testgroupe',directoryname=tmpdir,depends='tis-firefox',description=u'Test de groupe') >>> print p >>> print p['package'].depends tis-firefox >>> import shutil >>> shutil.rmtree(tmpdir)
- make_host_template(packagename='', depends=None, conflicts=None, directoryname=None, description=None)[source]¶
- make_package_template(installer_path='', packagename='', directoryname='', section='base', description=None, depends='', version=None, silentflags=None, uninstallkey=None, maturity=None, architecture='all', target_os='all', product_name=None)[source]¶
- Build a skeleton of WAPT package based on the properties of the supplied installer
Return the path of the skeleton
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> wapt.dbpath = ':memory:' >>> files = 'c:/tmp/files' >>> if not os.path.isdir(files): ... os.makedirs(files) >>> tmpdir = 'c:/tmp/dummy' >>> devdir = wapt.make_package_template(files,packagename='mydummy',directoryname=tmpdir,depends='tis-firefox') >>> os.path.isfile(os.path.join(devdir,'WAPT','control')) True >>> p = wapt.build_package(devdir) >>> 'filename' in p and isinstance(p['files'],list) and isinstance(p['package'],PackageEntry) True >>> import shutil >>> shutil.rmtree(tmpdir)
- property max_gpo_script_wait¶
get / set the MaxGPOScriptWait.
- property merged_config_hash¶
- packages_filter_for_host()[source]¶
Returns a PackageRequest object based on host capabilities to filter applicable packages from a repo
- Returns:
PackageRequest
- packages_matching(package_request=None, query=None, args=())[source]¶
Returns the list of known packages Entries matching a PackageRequest
- Args:
package_request (PackageRequest): request
- Returns:
list (of PackageEntry)
- personal_certificate()[source]¶
Returns the personal certificates chain
- Returns:
list (of SSLCertificate). The first one is the personal certificate. The other are useful if intermediate CA are used.
- property pre_shutdown_timeout¶
get / set the pre shutdown timeout shutdown tasks.
- private_key(private_key_password=None)[source]¶
SSLPrivateKey matching the personal_certificate When key has been found, it is kept in memory for later use.
- Args:
private_key_password : password to use to decrypt key. If None, passwd_callback is called.
- Returns:
SSLPrivateKey
- Raises:
EWaptMissingPrivateKey if ket can not be decrypted or found.
- property private_key_password_callback¶
- reachable_ip()[source]¶
Return the local IP which is most probably reachable by wapt server
- In case there are several network connections, returns the local IP
which Windows choose for sending packets to WaptServer.
This can be the most probable IP which would get packets from WaptServer.
- Returns:
str: Local IP
- read_audit_data(section, key, default=None, ptype=None, include_expired_data=True)[source]¶
Retrieve the latest value associated with section/key from database
- read_audit_data_set(section, key, as_dict=False, raw_data=False, descending=True)[source]¶
Retrieve all the values associated with section/key from database
- read_audit_data_since(last_query_date=None, raw_data=False)[source]¶
Retrieve all the values associated with section/key from database
- read_param(name, default=None, ptype=None)[source]¶
read a param value from local db >>> wapt = Wapt(config_filename=’c:/wapt/wapt-get.ini’) >>> wapt.read_param(‘db_version’) u’20140410’
- read_upgrade_status()[source]¶
Return last stored pending updates status
- Returns:
dict: {running_tasks errors pending (dict) upgrades (list)}
- register_computer(description=None, retry_with_password=False)[source]¶
Send computer informations to WAPT Server if description is provided, updates local registry with new description
- Returns:
dict: response from server.
>>> wapt = Wapt() >>> s = wapt.register_computer() >>>
- registry_uninstall_snapshot()[source]¶
Return list of uninstall ID from registry launched before and after an installation to capture uninstallkey
- reload_config_if_updated()[source]¶
Check if config file has been updated, Return None if config has not changed or date of new config file if reloaded
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> wapt.reload_config_if_updated()
- remove(packages_list, force=False, only_priorities=None, only_if_not_process_running=False)[source]¶
Removes a package giving its package name, unregister from local status DB
- Args:
- packages_list (str or list or path): packages to remove (package name,
list of package requirement, package entry or development directory)
force : if True, unregister package from local status database, even if uninstall has failed
- Returns:
dict: {‘errors’: [], ‘removed’: [], ‘not_allowed’: []}
- property repositories¶
- reset_host_organizational_unit_dn()[source]¶
Reset forced host_organizational_unit_dn to AD / GPO registry defaults. If it was forced in ini file, remove setting from ini file.
- reset_host_uuid(new_uuid=None)[source]¶
Reset host uuid to bios provided UUID. If it was forced in ini file, remove setting from ini file.
- run_notfatal(*cmd, **args)[source]¶
Runs the command and wait for it termination returns output, don’t raise exception if exitcode is not null but return ‘’
- property runstatus: str¶
returns the current run status for tray display
- search(searchwords=[], exclude_host_repo=True, section_filter=None, newest_only=False)[source]¶
Returns a list of packages which have the searchwords in their description
- Args:
searchwords (str or list): words to search in packages name or description exclude_host_repo (boolean): if True, don’t search in host repoisitories. section_filter (str or list): restrict search to the specified package sections/categories
- Returns:
list: list of PackageEntry
- self_service_auth(login, password, host_uuid, groups)[source]¶
Sends login and password to server, who then checks if it’s a valid user
- Returns:
list: groups the user belongs to.
- session_setup(package, force=False)[source]¶
Setup the user session for a specific system wide installed package” Source setup.py from database or filename
- set_client_cert_auth(connection, force=False)[source]¶
Set client side ssl authentication for a waptserver or a waptrepo using host_certificate if client_certificate is not yet set in config and host certificate is able to do client_auth
- Args:
connection: object with client_certificate, client_private_key and client_auth
- set_local_password(user='admin', pwd='password')[source]¶
Set admin/password local auth for waptservice in ini file as a sha256 hex hash
- set_package_attribute(package, key, value)[source]¶
Store in local db a key/value pair for later use
- show_progress(show_box=False, msg='Loading...', progress=None, progress_max=None)[source]¶
Global hook to report progress feedback to the user
- Args:
show_box (bool): indicate to display or hide the notification msg (str): A status message to display. If None, nothing is changed. progress (float): Completion progress_max (float): Target of completion.
- sign_host_content(data)[source]¶
Sign data str with host private key with sha256 + RSA Args:
data (bytes) : data to sign
- Returns
bytes: signature of sha256 hash of data.
- sign_package(zip_or_directoryname, certificate=None, private_key_password=None, private_key=None, set_maturity=None, inc_package_release=False, keep_signature_date=False, excludes=[])[source]¶
- Calc the signature of the WAPT/manifest.sha256 file and put/replace it in ZIP or directory.
if directory, creates WAPT/manifest.sha256 and add it to the content of package create a WAPT/signature file and it to directory or zip file.
- known issueif zip file already contains a manifest.sha256 file, it is not removed, so there will be
2 manifest files in zip / wapt package.
- Args:
zip_or_directoryname: filename or path for the wapt package’s content certificate (list): certificates chain of signer. private_key (SSLPrivateKey): the private key to use private_key_password (str) : passphrase to decrypt the private key. If None provided, use self.private_key_password_callback
- Returns:
str: base64 encoded signature of manifest.sha256 file (content
- store_upgrade_status(upgrades=None)[source]¶
Stores in DB the current pending upgrades and running installs for query by waptservice
- trust_package_signer(cert)[source]¶
Add a certificate to the list of trusted package signers certificates Stores the PEM encoded data in public_certs_dir directory. The certificate is added to this current Wapt.cabundle instance for immediate use (until config is reloaded)
- uninstall(packagename, params_dict={}, force=False)[source]¶
Launch the uninstall script of an installed package” Source setup.py from database or filename
- uninstall_cmd(guid)[source]¶
return the (quiet) command stored in registry to uninstall a software given its registry key
- unregister_computer()[source]¶
Remove computer informations from WAPT Server
- Returns:
dict: response from server.
>>> wapt = Wapt() >>> s = wapt.unregister_computer() >>>
- untrust_package_signer(cert)[source]¶
Removes a certificate from the list of trusted package signers certificates based on the fingerprint of the certificate. Certs in ssl public_certs_dir with matching fingerprint are deleted.
- update(force=False, register=True)[source]¶
Update local database with packages definition from repositories
- Args:
- force (boolean): update even if Packages index on repository has not been
updated since last update (based on http headers)
register (boolean): Send informations about status of local packages to waptserver
- Returns;
list of (host package entry,entry date on server)
- Returns:
dict: {“added”,”removed”,”count”,”repos”,”upgrades”,”date”}
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> updates = wapt.update() >>> 'count' in updates and 'added' in updates and 'upgrades' in updates and 'date' in updates and 'removed' in updates True
- update_server_status(force=False, include_wmi=None, include_dmi=None)[source]¶
- Send host_info, installed packages and installed softwares,
and last update status informations to WAPT Server, but don’t send register info like dmi or wmi.
Changed in version 1.4.3: if last status has been properly sent to server and data has not changed, don’t push data again to server. the hash is stored in memory, so is not pass across threads or processes.
>>> wapt = Wapt() >>> s = wapt.update_server_status() >>>
- upgrade(only_priorities=None, only_if_not_process_running=False)[source]¶
Install “well known” host package from main repository if not already installed then query localstatus database for packages with a version older than repository and install all newest packages
- Args:
priorities (list of str): If not None, upgrade only packages with these priorities.
- Returns:
- dict: {‘upgrade’: [], ‘additional’: [], ‘downloads’:
{‘downloaded’: [], ‘skipped’: [], ‘errors’: []},
‘remove’: [], ‘skipped’: [], ‘install’: [], ‘errors’: [], ‘unavailable’: []}
- upload_package(filenames, wapt_server_user=None, wapt_server_passwd=None)[source]¶
Method to upload a package using Shell command (like scp) instead of http upload You must define first a command in inifile with the form :
upload_cmd=”c:Program Files”puttypscp -v -l waptserver %(waptfile)s srvwapt:/var/www/%(waptdir)s/
- or
upload_cmd=”C:Program FilesWinSCPWinSCP.exe” root@wapt.tranquilit.local /upload %(waptfile)s
- You can define a “after_upload” shell command. Typical use is to update the Packages index
after_upload=”c:Program Files”puttyplink -v -l waptserver srvwapt.tranquilit.local “python /opt/wapt/wapt-scanpackages.py /var/www/%(waptdir)s/”
- wapt_status()[source]¶
Wapt configuration and version informations
- Returns:
- dict: versions of main main files, waptservice config,
repos and waptserver config
>>> w = Wapt() >>> w.wapt_status() { 'setuphelpers-version': '1.1.1', 'waptserver': { 'wapt_server': u'tranquilit.local', 'proxies': { 'http': None, 'https': None }, 'server_url': 'https: //wapt.tranquilit.local' }, 'waptservice_protocol': 'http', 'repositories': [{ 'wapt_server': u'tranquilit.local', 'proxies': { 'http': None, 'https': None }, 'name': 'global', 'repo_url': 'http: //wapt.tranquilit.local/wapt' }, { 'wapt_server': u'tranquilit.local', 'proxies': { 'http': None, 'https': None }, 'name': 'wapt-host', 'repo_url': 'http: //srvwapt.tranquilit.local/wapt-host' }], 'common-version': '1.1.1', 'wapt-exe-version': u'1.1.1.0', 'waptservice_port': 8088, 'wapt-py-version': '1.1.1' }
- property waptserver¶
- waptserver_available()[source]¶
Test reachability of waptserver.
If waptserver is defined and available, return True, else False
- Returns:
boolean: True if server is defined and actually reachable
- property waptsessiondb: WaptSessionDB¶
Wapt user session database
- write_audit_data(section, key, value, ptype=None, value_date=None, expiration_date=None, max_count=2, keep_days=None)[source]¶
Stores in database a metrics, removes expired ones
- Args:
section (str) key (str) value (any) value_date (str or datetime): value date (utc). By default datetime.datetime.utcnow() expiration_date (str) : expiration date of the new value max_count (int) : keep at most max_count value. remove oldest one. keep_days (int) : set the expiration date to now + keep_days days. override expiration_date arg if not None
- Returns:
None
- write_audit_data_if_changed(section, key, value, ptype=None, value_date=None, expiration_date=None, max_count=2, keep_days=None)[source]¶
Write data only if different from last one
- Returns:
previous value
- property wua_repository¶
- class common.WaptBaseDB(dbpath)[source]¶
Bases:
BaseObjectClass
- curr_db_version = None¶
- property db¶
- property db_version¶
- property dbpath¶
- get_param(name, default=None, ptype=None)[source]¶
Retrieve the value associated with name from database
- query(query, args=(), one=False, as_dict=True)[source]¶
execute la requete query sur la db et renvoie un tableau de dictionnaires
- set_package_attribute(install_id, key, value)[source]¶
Store permanently a (key/value) pair in database for a given package, replace existing one
- class common.WaptDB(dbpath)[source]¶
Bases:
WaptBaseDB
Class to manage SQLite database with local installation status
- add_start_install(package_entry, params_dict={}, explicit_by=None)[source]¶
Register the start of installation in local db
- Args:
params_dict (dict) : dictionary of parameters provided on command line with –param or by the server explicit_by (str) : username of initiator of the install.
if not None, install is not a dependencie but an explicit manual install
- setuppy (str)python source code used for install, uninstall or session_setup
code used for uninstall or session_setup must use only wapt self library as package content is no longer available at this step.
- Returns:
int : rowid of the inserted install status row
- build_depends(packages, packages_filter=None)[source]¶
Given a list of packages conditions (packagename (optionalcondition)) return a list of dependencies (packages conditions) to install
- Args:
packages (list of str): list of packages requirements ( package_name(=version) )
- Returns:
(list depends,list conflicts,list missing) : tuple of (all_depends,missing_depends)
TODO : choose available dependencies in order to reduce the number of new packages to install
>>> waptdb = WaptDB(':memory:') >>> office = PackageEntry('office','0') >>> firefox22 = PackageEntry('firefox','22') >>> firefox22.depends = 'mymissing,flash' >>> firefox24 = PackageEntry('firefox','24') >>> thunderbird = PackageEntry('thunderbird','23') >>> flash10 = PackageEntry('flash','10') >>> flash12 = PackageEntry('flash','12') >>> office.depends='firefox(<24),thunderbird,mymissing' >>> firefox22.depends='flash(>=10)' >>> firefox24.depends='flash(>=12)' >>> waptdb.add_package_entry(office) >>> waptdb.add_package_entry(firefox22) >>> waptdb.add_package_entry(firefox24) >>> waptdb.add_package_entry(flash10) >>> waptdb.add_package_entry(flash12) >>> waptdb.add_package_entry(thunderbird) >>> waptdb.build_depends('office') ([u'flash(>=10)', u'firefox(<24)', u'thunderbird'], [u'mymissing'])
- create_db_structure()[source]¶
Initialize current sqlite db with empty table and return structure version
- curr_db_version = '20230601'¶
- install_status(id)[source]¶
Return the local install status for id
- Args:
id: sql rowid
- Returns:
dict : merge of package local install, audit and package attributes.
- installed(include_errors=False, include_setup=True)[source]¶
Return a list of installed packages on this host (status ‘OK’ or ‘UNKNWON’)
- Args:
- include_errors (bool)if False, only packages with status ‘OK’ and ‘UNKNOWN’ are returned
if True, all packages are installed.
include_setup (bool) : if True, setup.py files content is in the result rows
- Returns:
list: of installed PackageEntry
- installed_matching(package_cond, include_errors=False, include_setup=True)[source]¶
Return a list of PackageEntry if one properly installed (if include_errors=False) package match the package condition ‘tis-package (>=version)’
- Args:
package_cond (str): package requirement to lookup include_errors
- Returns:
list of PackageEntry merge with localstatus attributes WITH setuppy
- installed_packages_inventory(since_status_revision=None)[source]¶
Return a list of installed packages status suitable for inventory
List of list. First line is the header (fields names)
- Returns:
list (of list): of installed packages
- installed_search(searchwords=[], include_errors=False)[source]¶
Return a list of installed package entries based on search keywords
- Returns:
list of PackageEntry merge with localstatus attributes without setuppy
- known_packages()[source]¶
Return a dict of all known packages PackageKey(s) indexed by package_uuid
- Returns:
dict {‘package_uuid’:PackageKey(package)}
- packages_audit_inventory(after_date=None)[source]¶
Return a list of packages audit status suitable for inventory
List of list. First line is the header (fields names)
- Returns:
list (of list): of installed packages
- packages_matching(package_cond)[source]¶
Return an ordered list of available packages entries which match the condition “packagename[([=<>]version)]?” version ascending
- Args:
package_cond (PackageRequest or str): filter packages and determine the ordering
- Returns:
list of PakcageEntry
- packages_search(searchwords=[], exclude_host_repo=True, section_filter=None, packages_filter=None)[source]¶
Return a list of package entries matching the search words
- Args:
searchwords (list): list of words which must be in package name or description exclude_host (bool): don’t take in account packages comming from a repo named ‘wapt-host” section_filter (list): list of packages sections to take in account packages_filter (PackageRequest): additional filters (arch, locale, maturities etc…)
to take in account for filter and sort
- Returns:
list of PackageEntry
- purge_repo(repo_name)[source]¶
remove references to repo repo_name
>>> waptdb = WaptDB('c:/wapt/db/waptdb.sqlite') >>> waptdb.purge_repo('main')
- query_package_entry(query, args=(), one=False, package_request=None)[source]¶
Execute the query on the db try to map result on PackageEntry attributes Fields which don’t match attributes are added as attributes (and listed in _calc_attributes list)
- Args:
query (str): sql query args (list): parameters of the sql query package_request (PackageRequest): keep only entries which match package_request one(bool): if True, return the highest available version (if package_request is not None, use it to compare packages)
- Result:
PackageEntry or list of PackageEntry
>>> waptdb = WaptDB(':memory:') >>> waptdb.add_package_entry(PackageEntry('toto','0',repo='main')) >>> waptdb.add_package_entry(PackageEntry('dummy','2',repo='main')) >>> waptdb.add_package_entry(PackageEntry('dummy','1',repo='main')) >>> waptdb.query_package_entry("select * from wapt_package where package=?",["dummy"]) [PackageEntry('dummy','2'), PackageEntry('dummy','1')] >>> waptdb.query_package_entry("select * from wapt_package where package=?",["dummy"],one=True) PackageEntry('dummy','2')
- remove_install_status(package=None, package_uuid=None)[source]¶
Remove status of package installation from localdb
- store_setuppy(rowid, setuppy=None, install_params={})[source]¶
Update status of package installation on localdb
- switch_to_explicit_mode(package, user_id)[source]¶
Set package install mode to manual so that package is not removed when meta packages don’t require it anymore
- update_audit_status(rowid, set_status=None, set_output=None, append_line=None, set_last_audit_on=None, set_next_audit_on=None)[source]¶
Update status of package installation on localdb
- update_install_status(rowid, set_status=None, append_line=None, uninstall_key=None, persistent_dir=None)[source]¶
Update status of package installation on localdb
- update_install_status_pid(pid, set_status='ERROR')[source]¶
Update status of package installation on localdb
- upgradeable(include_errors=True, check_version_only=True, include_setup=False)[source]¶
Return a dictionary of upgradable Package entries
- Returns:
dict ‘package’: [candidates] order by mot adequate to more generic and
- upgradeable_status()[source]¶
Return a list of upgradable Package entries. (faster than upgradeable() )
- Returns:
list of (installed PackageEntry, candidate PackageEntry)
- wapt_package_columns = {'architecture': 'varchar', 'audit_schedule': 'varchar', 'categories': 'varchar', 'changelog': 'varchar', 'conflicts': 'varchar', 'depends': 'varchar', 'description': 'varchar', 'editor': 'varchar', 'filename': 'varchar', 'forced_install_on': 'varchar', 'homepage': 'varchar', 'icon_sha256sum': 'varchar', 'id': 'INTEGER PRIMARY KEY AUTOINCREMENT', 'impacted_process': 'varchar', 'installed_size': 'integer', 'keywords': 'varchar', 'licence': 'varchar', 'locale': 'varchar', 'maintainer': 'varchar', 'maturity': 'varchar', 'max_os_version': 'varchar', 'md5sum': 'varchar', 'min_os_version': 'varchar', 'min_wapt_version': 'varchar', 'name': 'varchar', 'package': 'varchar', 'package_uuid': 'varchar', 'priority': 'varchar', 'repo': 'varchar', 'repo_url': 'varchar', 'section': 'varchar', 'signature': 'varchar', 'signature_date': 'varchar', 'signed_attributes': 'varchar', 'signer': 'varchar', 'signer_fingerprint': 'varchar', 'size': 'integer', 'sources': 'varchar', 'target_os': 'varchar', 'valid_from': 'varchar', 'valid_until': 'varchar', 'version': 'varchar'}¶
- class common.WaptHostRepo(url=None, name='wapt-host', verify_cert=None, http_proxy=None, timeout=None, host_id=None, cabundle=None, config=None, section=None, host_key=None, WAPT=None)[source]¶
Bases:
WaptRepo
Dummy http repository for host packages
>>> host_repo = WaptHostRepo(name='wapt-host',host_id=['0D2972AC-0993-0C61-9633-529FB1A177E3','4C4C4544-004E-3510-8051-C7C04F325131']) >>> host_repo.load_config_from_file(r'C:\Users\htouvet\AppData\Local\waptconsole\waptconsole.ini') >>> host_repo.packages() [PackageEntry('0D2972AC-0993-0C61-9633-529FB1A177E3','10') , PackageEntry('4C4C4544-004E-3510-8051-C7C04F325131','30') ]
- download_packages(package_requests, target_dir=None, usecache=True, printhook=None)[source]¶
Download a list of packages from repo
- Args:
package_request (list,PackageEntry): a list of PackageEntry to download target_dir (str): where to store downloaded Wapt Package files usecache (bool): wether to try to use cached Wapt files if checksum is ok printhook (callable): to show progress of download
- Returns:
dict: {“downloaded”:[local filenames],”skipped”:[filenames in cache],”errors”:[],”packages”:self.packages()}
- property host_id¶
- is_available()[source]¶
Check if repo is reachable an return creation date of Packages.
Try to access the repo and return last modified date of repo index or None if not accessible
- Returns:
str: Iso creation date of remote Package file as returned in http headers
>>> repo = WaptRemoteRepo(name='main',url='https://wapt/wapt',timeout=1) >>> repo.is_available() <= datetime2isodate() True >>> repo = WaptRemoteRepo(name='main',url='https://badwapt/wapt',timeout=1) >>> repo.is_available() is None True
- load_config(config, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file
- property repo_url¶
Repository URL
Fixed url if none is set in wapt-get.ini by querying the server.
The URL is queried once and then cached into a local property.
- Returns:
str: url to the repository
>>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> print repo.repo_url http://srvwapt.tranquilit.local/wapt
- class common.WaptPackageAuditLogger(console, wapt_context=None, install_id=None, user=None, running_status='RUNNING', exit_status=None, error_status='ERROR')[source]¶
Bases:
LogOutput
Context handler to log all print messages to a wapt package audit log
- Args:
console (file) : sys.stderr wapt_context (Wapt): Wapt instance install_id (int): name of running or installed package local status where to log status and output
>>>
- class common.WaptPackageInstallLogger(console, wapt_context=None, install_id=None, user=None, running_status='RUNNING', exit_status='OK', error_status='ERROR')[source]¶
Bases:
LogOutput
Context handler to log all print messages to a wapt package install log
- Args:
wapt_context (Wapt): Wapt instance package_name (str): name of running or installed package local status where to log status and output
>>>
- class common.WaptPackageSessionSetupLogger(console, waptsessiondb, install_id, running_status='RUNNING', exit_status=None, error_status='ERROR')[source]¶
Bases:
LogOutput
Context handler to log all print messages to a wapt package install log
- Args:
wapt_context (Wapt): Wapt instance package_name (str): name of running or installed package local status where to log status and output
>>>
- class common.WaptRepo(url=None, name='wapt', verify_cert=None, http_proxy=None, timeout=None, cabundle=None, config=None, section=None, WAPT=None)[source]¶
Bases:
WaptRemoteRepo
Gives access to a remote http repository, with a zipped Packages packages index Find its repo_url based on * repo_url explicit setting in ini config section [<name>] * if there is some rules use rules >>> repo = WaptRepo(name=’main’,url=’http://wapt/wapt’,timeout=4) >>> packages = repo.packages() >>> len(packages)
- property WAPT¶
- property cached_wapt_repo_url¶
- load_config(config, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file
- property proxies¶
dict for http proxies url suitable for requests based on the http_proxy repo attribute
- Returns:
dict: {‘http’:’http://proxy:port’,’https’:’http://proxy:port’}
- property repo_url¶
Repository URL
Fixed url if none is set in wapt-get.ini by querying the server.
The URL is queried once and then cached into a local property.
- Returns:
str: url to the repository
>>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> print repo.repo_url http://srvwapt.tranquilit.local/wapt
- property rules¶
- class common.WaptServer(url=None, proxies={'http': None, 'https': None}, timeout=5.0, dnsdomain=None, name='waptserver')[source]¶
Bases:
BaseObjectClass
Manage connection to waptserver
- auth(action=None, enable_password_callback=True)[source]¶
Return an authenticator for the action. This is basically a tuple (user,password) or a more sophisticated AuthBase descendent. The motivation for action is to not request password from user if not needed. Or enable kerberos for specific endpoint.
- Args:
action (str): for which an authenticator is requested
- Returns:
tuple or requests.auth.AuthBase descendant instance
- client_auth()[source]¶
Return SSL pair (cert,key) filenames for client side SSL auth
- Returns:
tuple: (cert path,key path,strkeypassword)
- get(action, auth=None, timeout=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True, decode_json=True)[source]¶
Make a get http request to the server, and return result decoded from json This assuems remo
- Args:
action (str): doc part of the url auth (tuple): authentication passed to requests (user,password) enable_capture_external_ip (bool) : if true and callback is defined, try to get external IP from X-Remote-IP header
and set it though self.capture_external_ip_callback
enable_password_callback (bool) : if true, password callback will be called if needed.
- Returns:
dict : response returned from server as json data.
- head(action, auth=None, timeout=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True)[source]¶
- load_config_from_file(config_filename, section='global')[source]¶
Load waptserver configuration from an inifile located at config_filename
- Args:
config_filename (str) : path to wapt inifile section (str): ini section from which to get parameters. default to ‘global’
- Returns:
WaptServer: self
- post(action, data=None, files=None, auth=None, timeout=None, signature=None, signer=None, content_length=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True, max_retry_count=3)[source]¶
Post data to waptserver using http POST method
Add a signature to the posted data using host certificate.
Posted Body is gzipped
- Args:
action (str): doc part of the url data (str) : posted data body files (list or dict) : list of filenames
- Returns:
dict : response returned from server as json data.
- save_server_certificate(server_ssl_dir=None, overwrite=False)[source]¶
Retrieve certificate of https server for further checks
- Args:
server_ssl_dir (str): Directory where to save x509 certificate file
- Returns:
str : full path to x509 certificate file.
- property server_url¶
Return fixed url if any
>>> server = WaptServer(timeout=4) >>> print server.server_url https://wapt.tranquil.it
- class common.WaptSessionDB(username='')[source]¶
Bases:
WaptBaseDB
- add_start_install(package_entry)[source]¶
Register the start of installation in local db
- Returns:
int : rowid of the inserted record
- create_db_structure()[source]¶
Initialize current sqlite db with empty table and return structure version
- curr_db_version = '20201217'¶
- remove_install_status(package)[source]¶
Remove status of package installation from localdb
>>> wapt = Wapt() >>> wapt.forget_packages('tis-7zip') ???
- remove_obsolete_install_status(installed_packages)[source]¶
Remove local user status of packages no more installed
- common.check_is_member_of(huser, group_name)[source]¶
Check if a user is a member of a group
- Args:
huser (handle) : pywin32 group_name (str) : group
>>> from win32security import LogonUser >>> hUser = win32security.LogonUser ('technique','tranquilit','xxxxxxx',win32security.LOGON32_LOGON_NETWORK,win32security.LOGON32_PROVIDER_DEFAULT) >>> check_is_member_of(hUser,'domain admins') False
- common.check_user_membership(user_name, password, domain_name, group_name)[source]¶
Check if a user is a member of a group
- Args:
user_name (str): user password (str): domain_name (str) : If empty, check local then domain group_name (str): group
>>> from win32security import LogonUser >>> hUser = win32security.LogonUser ('technique','tranquilit','xxxxxxx',win32security.LOGON32_LOGON_NETWORK,win32security.LOGON32_PROVIDER_DEFAULT) >>> check_is_member_of(hUser,'domain admins') False
- common.get_domain_admins_group_name()[source]¶
- Return localized version of domain admin group (ie “domain admins” or
“administrateurs du domaine” with RID -512)
>>> get_domain_admins_group_name() u'Domain Admins'
- common.lookup_name_from_rid(domain_controller, rid)[source]¶
- return username or group name from RID (with localization if applicable)
from https://mail.python.org/pipermail/python-win32/2006-May/004655.html domain_controller : should be a DC rid : integer number (512 for domain admins, 513 for domain users, etc.)
>>> lookup_name_from_rid('srvads', DOMAIN_GROUP_RID_ADMINS) u'Domain Admins'
- common.sid_from_rid(domain_controller, rid)[source]¶
Return SID structure based on supplied domain controller’s domain and supplied rid rid can be for example DOMAIN_GROUP_RID_ADMINS, DOMAIN_GROUP_RID_USERS
- common.wapt_sources_edit(wapt_sources_dir: str, editor_for_packages: Optional[str] = None) str [source]¶
- Utility to open PyScripter or the configured editor with package sources if it is installed
else open the wapt package development directory in System Shell Explorer.
- Args
wapt_sources_dir (str): directory path of the wapt package sources
- Returns:
str: sources path