WAPT Setuphelpers Usage¶
- setuphelpers.all_files(rootdir, pattern=None)[source]¶
Recursively return all files from rootdir and sub directories matching the (dos style) pattern (example: *.exe)
- setuphelpers.copytree2(src, dst, ignore=None, onreplace=<function default_skip>, oncopy=<function default_oncopy>, enable_replace_at_reboot=True, follow_symlinks=False)[source]¶
Copy src directory to dst directory. dst is created if it doesn’t exists src can be relative to installation temporary dir
oncopy is called for each file copy. if False is returned, copy is skipped onreplace is called when a file will be overwritten.
- Args:
src (str): path to source directory (absolute path or relative to package extraction tempdir) dst (str): path to target directory (created if not present) ignore (func) : callback func(root_dir,filenames) which returns names to ignore onreplace (func) : callback func(src,dst):boolean called when a file will be replaced to decide what to do.
default is to not replace if target exists. can be default_overwrite or default_overwrite_older or custom function.
- oncopy (func)callback func(msg,src,dst) called when a file is copied.
default is to log in debug level the operation
enable_replace_at_reboot (boolean): if True, files which are locked will be scheduled for replace at next reboot
Returns:
Raises:
>>> copytree2(r'c:\tranquilit\wapt\tests',r'c:\tranquilit\wapt\tests2') >>> isdir(r'c:\tranquilit\wapt\tests2') True >>> remove_tree(r'c:\tranquilit\wapt\tests2') >>> isdir(r'c:\tranquilit\wapt\tests2') False
- setuphelpers.currentdatetime()[source]¶
date/time as YYYYMMDD-hhmmss
>>> currentdatetime() '20161102-193600'
- setuphelpers.ensure_dir(filename)[source]¶
Be sure the directory of filename exists on disk. Create it if not
The intermediate directories are created either.
- Args:
filename (str): path to a future file for which to create directory.
- Returns:
None
- setuphelpers.ensure_list(csv_or_list, ignore_empty_args=True, allow_none=False)[source]¶
if argument is not a list, return a list from a csv string
- Args:
csv_or_list (list or str): ignore_empty_args (bool): if True, empty string found in csv are not appended to the list. allow_none (bool): if True, if csv_or_list is None, return None, else return empty list/
- Returns:
list
- setuphelpers.ensure_unicode(data)[source]¶
Return a unicode string from data object It is sometimes difficult to know in advance what we will get from command line application output.
This is to ensure we get a (not always accurate) representation of the data mainly for logging purpose.
- Args:
data: either str or unicode or object having a __str__ or WindowsError or Exception
- Returns:
unicode: unicode string representing the data
>>> ensure_unicode(str('éé')) u'éé' >>> ensure_unicode(u'éé') u'éé' >>> ensure_unicode(Exception("test")) u'Exception: test' >>> ensure_unicode(Exception()) u'Exception: '
- setuphelpers.file_is_locked(path, timeout=5)[source]¶
Check if a file is locked. waits timout seconds for the release
- setuphelpers.filecopyto(filename, target)[source]¶
Copy file from absolute or package temporary directory to target directory
If file is dll or exe, logs the original and new version.
- Args:
- filename (str): absolute path to file to copy,
or relative path to temporary package install content directory.
target (str) : absolute path to target directory where to copy file.
target is either a full filename or a directory name if filename is .exe or .dll, logger prints version numbers
>>> if not os.path.isfile('c:/tmp/fc.test'): ... with open('c:/tmp/fc.test','wb') as f: ... f.write('test') >>> if not os.path.isdir('c:/tmp/target'): ... os.mkdir('c:/tmp/target') >>> if os.path.isfile('c:/tmp/target/fc.test'): ... os.unlink('c:/tmp/target/fc.test') >>> filecopyto('c:/tmp/fc.test','c:/tmp/target') >>> os.path.isfile('c:/tmp/target/fc.test') True
- setuphelpers.fileisodate(filename)[source]¶
Returns last update date time from filename in local time
- setuphelpers.find_all_files(rootdir, include_patterns=None, exclude_patterns=None, include_dirs=None, exclude_dirs=None, excludes_full=None)[source]¶
Generator which recursively find all files from rootdir and sub directories matching the (dos style) patterns (example: *.exe)
- Args;
rootdir (str): root dir where to start looking for files include_patterns (str or list) : list of glob pattern of files to return exclude_patterns (str or list) : list of glob pattern of files to exclude
(if a file is both in include and exclude, it is excluded)
include_dirs (str or list) : list of glob directory patterns to return exclude_dirs (str or list) : list of glob directory patterns to exclude
(if a dir is both in include and exclude, it is excluded)
excludes_full (list) : list of exact (relative to package root) filepathes to exclude from manifest.
>>> for fn in find_all_files('c:\tmp','*.txt'): print(fn) >>>
- setuphelpers.find_processes(process_name)[source]¶
Return list of Process names process_name
- Args:
process_name (str): process name to lookup
- Returns:
list: list of processes (Process) named process_name or process_name.exe
>>> [p.pid for p in find_processes('explorer')] [2756, 4024]
- setuphelpers.get_applications_info_files()[source]¶
Returns a list of the Info.plist files in the /Applications folder.
- setuphelpers.get_computer_groups()[source]¶
Try to find the computer in the Active Directory and return the list of groups
- setuphelpers.get_current_user()[source]¶
Get the login name for the current user. >>> get_current_user() u’htouvet’
- setuphelpers.get_disk_free_space(filepath)[source]¶
Returns the number of free bytes on the drive that filepath is on
- setuphelpers.get_domain_from_socket()[source]¶
Return main DNS domain of the computer
- Returns:
str: domain name
>>> get_domain_from_socket() u'tranquilit.local'
- setuphelpers.get_domain_info(ldap_auth_server=None, use_ssl=True, force_tgt=True, hostname=None, domain=None, verify_cert_ldap=False)[source]¶
Return dict ad_site , ou and groups Warning : Please note that the search for gssapi does not work if the reverse dns recording is not available for ad
- setuphelpers.get_file_properties(fname, ignore_warning=True)[source]¶
Read all properties of the given file return them as a dictionary.
- Args:
fname : path to Windows executable or DLL
- Returns:
dict: properties of executable
>>> xp = get_file_properties(r'c:\windows\explorer.exe') >>> 'FileVersion' in xp and 'FileDescription' in xp True
- setuphelpers.get_info_plist_path(app_dir)[source]¶
Applications typically contain an Info.plist file that shows information about the app. It’s typically located at {APPDIR}/Contents/Info.plist .
- setuphelpers.get_installed_pkgs()[source]¶
Returns the list of the IDs of the already installed packages.
- setuphelpers.get_language(full_locale=False, separator='_')[source]¶
Get the os default locale (example: fr, en, pl, etc.)
>>> get_language() 'fr' >>> get_language(full_locale=True) 'fr_FR' >>> get_language(full_locale=True, separator='-').lower() 'fr-fr'
- setuphelpers.get_os_name()[source]¶
Get the name of the current running operating system
- Returns:
str: Windows, Linux, Darwin
>>> get_os_name() 'Windows'
- setuphelpers.get_pkg_info(pkg_id)[source]¶
Gets an installed pkg’s info, given its ID.
Returns: a dict made from data in plist format
- setuphelpers.get_plist_obj(plist_file)[source]¶
Returns a plist obj when given the path to a plist file.
- setuphelpers.get_proxies()[source]¶
Return system proxy with the urllib python library
>>> get_proxies() {'http': 'http://srvproxy.ad.domain.lan:8080', 'https': 'http://srvproxy.ad.domain.lan:8080'}
- setuphelpers.host_info_common_unix()[source]¶
Read main workstation informations, returned as a dict
- Returns:
dict: main properties of host, networking and windows system
Changed in version 1.4.1: returned keys changed : dns_domain -> dnsdomain
>>> hi = host_info() >>> 'computer_fqdn' in hi and 'connected_ips' in hi and 'computer_name' in hi and 'mac' in hi True
- setuphelpers.httpdatetime2isodate(httpdate, localtime=False)[source]¶
Convert a date string as returned in http headers or mail headers to isodate (UTC)
>>> import requests >>> last_modified = requests.head('http://wapt/wapt/Packages',headers={'cache-control':'no-cache','pragma':'no-cache'}).headers['last-modified'] >>> len(httpdatetime2isodate(last_modified)) == 19 True
- setuphelpers.inifile_deleteoption(inifilename, section, key)[source]¶
Remove a key within the section of the inifile
- Args:
inifilename (str): Path to the ini file section (str): section key (str): value key of option to remove
- Returns:
boolean : True if the key/option has been removed
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version') True >>> print inifile_deleteoption('c:/tranquilit/wapt/tests/test.ini','global','version') False
- setuphelpers.inifile_deletesection(inifilename, section)[source]¶
Remove a section within the inifile
- Args:
inifilename (str): Path to the ini file section (str): section to remove
- Returns:
boolean : True if the section has been removed
- setuphelpers.inifile_hasoption(inifilename, section, key)[source]¶
Check if an option is present in section of the inifile
- Args:
inifilename (str): Path to the ini file section (str): section key (str): value key to check
- Returns:
boolean : True if the key exists
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version') True >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','dontexist') False
- setuphelpers.inifile_hassection(inifilename, section)[source]¶
Check if an option is present in section of the inifile
- Args:
inifilename (str): Path to the ini file section (str): section
- Returns:
boolean : True if the key exists
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hassection('c:/tranquilit/wapt/tests/test.ini','global') True
- setuphelpers.inifile_readstring(inifilename, section, key, default=None)[source]¶
Read a string parameter from inifile
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version') 1.1.2 >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','undefaut','defvalue') defvalue
- setuphelpers.inifile_writestring(inifilename, section, key, value)[source]¶
Write a string parameter to inifile
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.1') >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version') 1.1.1
- setuphelpers.install_app(app_dir, key='', min_version='', get_version=None, killbefore=None, force=False, uninstallkeylist=None)[source]¶
Installs an app given a path to it. Copies the app directory to /Applications.
- setuphelpers.install_dmg(dmg_path, key='', min_version='', get_version=None, force=False, killbefore=None, uninstallkeylist=None)[source]¶
Installs a .dmg if it isn’t already installed on the system.
- Arguments:
dmg_path : the path to the dmg file
- Returns:
True if it succeeded, False otherwise
- setuphelpers.install_pkg(pkg_path, key='', min_version='', get_version=None, killbefore=None, force=False, uninstallkeylist=None)[source]¶
Installs a pkg file, given its name or a path to it.
- setuphelpers.installed_softwares(keywords=None, name=None, ignore_empty_names=True)[source]¶
Return list of every application in the /Applications folder.
- Args:
keywords (str or list): string to lookup in key, display_name or publisher fields
- Returns:
- list of dicts: [{‘key’, ‘name’, ‘version’, ‘install_date’, ‘install_location’
‘uninstall_string’, ‘publisher’,’system_component’}]
- setuphelpers.is_dmg_installed(dmg_path, check_version=False)[source]¶
Checks whether or not a .dmg is already installed, given a path to it. Arguments:
dmg_path The path to the .dmg file check_version If true, also checks if the local package’s version is
equal or superior to its possibly already installed version.
- Returns:
True if it’s already installed, False if it isn’t. If check_version is specified, will also return False if it is already installed AND its version is inferior to the local package’s version.
- setuphelpers.is_local_app_installed(appdir, check_version=True)[source]¶
Checks whether or not an application is already installed on the machine. Arguments:
appdir The path to the .app directory check_version If true, also checks if the local package’s version is
equal or superior to its possibly already installed version.
- Returns:
True if it’s already installed, False if it isn’t. If check_version is specified, will also return False if it is already installed AND its version is inferior to the local package’s version.
- setuphelpers.is_local_pkg_installed(pkg_path, check_version=False)[source]¶
Checks whether or not a package file is already installed on the machine. Arguments:
pkg_path The path to the .pkg file check_version If true, also checks if the local package’s version is
equal or superior to its possibly already installed version.
- Returns:
True if it’s already installed, False if it isn’t. If check_version is specified, will also return False if it is already installed AND its version is inferior to the local package’s version.
- setuphelpers.isrunning(processname)[source]¶
Check if a process is running,
>>> isrunning('explorer') True
- setuphelpers.json_load_file(json_file, encoding='utf-8')[source]¶
Get content of a json file
- Args:
path (str): path to the file
- Returns:
dictionary (dict)
- setuphelpers.json_write_file(json_file, data, indent=4, sort_keys=False, encoding='utf-8')[source]¶
Write dictionary to a json file
- Args:
json_file (str): path to json file data (dict): dictionary content indent (str or list of str): tabulation size ; default: 4 spaces sort_keys (bool): alphabetical order or not encoding (bool): file encoding
- setuphelpers.killalltasks(process_names, include_children=True)[source]¶
Kill the task by their process_names
>>> killalltasks('firefox')
- setuphelpers.makepath(*p)[source]¶
Create a path given the components passed, but with saner defaults than os.path.join - In particular, removes ending path separators (backslashes) from components. Path functions will be called automatically
>>> makepath("c:", "Windows", "system32") 'c:\\Windows\\system32' >>> makepath(system32()) 'C:\\WINDOWS\\system32' >>> system32() 'C:\\WINDOWS\\system32' >>> system32 <function system32 at 0x063EBE79> >>> makepath(system32) 'C:\\WINDOWS\\system32'
- setuphelpers.mkdirs(path)[source]¶
Create directory path if it doesn’t exists yet Creates intermediate directories too.
>>> mkdirs("C:\Program Files (x86)\wapt") u'C:\Program Files (x86)\wapt'
- setuphelpers.processes_for_file(filepath, open_files=True, dll=True)[source]¶
Generator returning processes currently having a open file descriptor for filepath
If not running as System account, can not access system processes.
- Args:
filepath (str): file path or pattern (glob *)
- Returns:
iterator psutil.Process
- setuphelpers.remove_file(path)[source]¶
Try to remove a single file or symlink log a warning msg if file doesn’t exist log a critical msg if file can’t be removed
- Args:
path (str): path to file
>>> remove_file(r'c:\tmp\fc.txt')
- setuphelpers.remove_tree(*args, **kwargs)[source]¶
Convenience function to delete a directory tree, with any error not ignored by default. Pass ignore_errors=False to access possible errors.
- Args:
path (str): path to directory to remove ignore_errors (boolean) : default to False. Set it to True to ignore exceptions on children deletion onerror (func) : hook called with (func, path, exc)
on each delete exception. Should raise if stop is required.
- setuphelpers.rsa_decrypt()¶
data, rsakeypem, password:bytes -> bytes return decrypted data
- setuphelpers.rsa_encrypt()¶
data, x509certPEM: bytes -> bytes. returns encrypted data with public key in X509 certificate
- setuphelpers.run(cmd, shell=True, timeout=600, accept_returncodes=[0, 3010], on_write=None, pidlist=None, return_stderr=True, **kwargs)[source]¶
Run the command cmd in a shell and return the output and error text as string
- Args:
cmd : command and arguments, either as a string or as a list of arguments shell (boolean) : True is assumed timeout (int) : maximum time to wait for cmd completion is second (default = 600)
a TimeoutExpired exception is raised if tiemout is reached.
- on_writecallback when a new line is printed on stdout or stderr by the subprocess
func(unicode_line). arg is enforced to unicode
accept_returncodes (list) : list of return code which are considered OK default = (0, 3010) pidlist (list): external list where to append the pid of the launched process. return_stderr (bool or list) : if True, the error lines are returned to caller in result.
if a list is provided, the error lines are appended to this list
all other parameters from the psutil.Popen constructor are accepted
- Returns:
- RunOutputbytes like output of stdout and optionnaly stderr streams.
returncode attribute
- Raises:
CalledProcessError: if return code of cmd is not in accept_returncodes list TimeoutExpired: if process is running for more than timeout time.
Changed in version 1.3.9: return_stderr parameters to disable stderr or get it in a separate list return value has a returncode attribute to
Changed in version 1.4.0: output is not forced to unicode
Changed in version 1.4.1: error code 1603 is no longer accepted by default.
Changed in version 1.5.1: If cmd is unicode, encode it to default filesystem encoding before running it.
>>> run(r'dir /B c:\windows\explorer.exe') 'explorer.exe\r\n'
>>> out = [] >>> pids = [] >>> def getlines(line): ... out.append(line) >>> run(r'dir /B c:\windows\explorer.exe',pidlist=pids,on_write=getlines) u'explorer.exe\r\n'
>>> print out ['explorer.exe\r\n'] >>> try: ... run(r'ping /t 127.0.0.1',timeout=3) ... except TimeoutExpired: ... print('timeout') timeout
- setuphelpers.run_notfatal(*cmd, **args)[source]¶
Runs the command and wait for it termination, returns output Ignore exit status code of command, return ‘’ instead
Changed in version 1.4.0: output is now enforced to unicode
- setuphelpers.shell_launch(cmd)[source]¶
Launch a command (without arguments) but doesn’t wait for its termination
>>> open('c:/tmp/test.txt','w').write('Test line') >>> shell_launch('c:/tmp/test.txt')
- setuphelpers.uninstall_app(app_name)[source]¶
Uninstalls an app given its name.
DELETES EVERY FILE. Should not save the user’s configuration.
- setuphelpers.uninstall_pkg(pkg_name)[source]¶
Uninstalls a pkg by its name.
DELETES EVERY FILE. Should not save the user’s configuration.
Returns: True if it succeeded, False otherwise.
- setuphelpers.unmount_dmg(dmg_mount_path)[source]¶
Unmounts a dmg file, given the path to the mount point.
Returns the value of the ‘hdiutil unmount’ command ran.
- setuphelpers.unzip(zipfn, target=None, filenames=None, extract_with_full_paths=True)[source]¶
Unzip the files from zipfile with patterns in filenames to target directory
- Args:
zipfn (str) : path to zipfile. (can be relative to temporary unzip location of package) target (str) : target location. Defaults to dirname(zipfile) + basename(zipfile) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is
- Returns:
list : list of extracted files
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt') ['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames=['*.msi','*.py']) ['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target='test', filenames=['*.msi','*.py']) ['C:\\example\\test\\7z920-x64.msi', 'C:\\example\\test\\7z920.msi', 'C:\\example\\test\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/*') ['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control') ['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target='.', filenames='WAPT/control') ['.\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target=r'C:\example\', filenames='WAPT/control') ['C:\\example\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target=basedir, filenames='WAPT/control') ['C:\\example\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control', extract_with_full_paths=False) ['C:\\example\\control']
New in version 1.3.11.
Changed in version 2.2: added extract_with_full_paths parameter
- setuphelpers.user_appdata()¶
Return the local appdata profile of current user
- Returns:
str: path like u’/home/user/.config/’
- setuphelpers.user_local_appdata()[source]¶
Return the local appdata profile of current user
- Returns:
str: path like u’/home/user/.config/’
- setuphelpers.wget(url, target=None, printhook=None, proxies=None, connect_timeout=10, download_timeout=None, verify_cert=None, referer=None, user_agent=None, cert=None, resume=False, md5=None, sha1=None, sha256=None, cache_dir=None, requests_session=None, limit_bandwidth=None)[source]¶
Copy the contents of a file from a given URL to a local file.
- Args:
url (str): URL to document target (str) : full file path of downloaded file. If None, put in a temporary dir with supplied url filename (final part of url) proxies (dict) : proxies to use. eg {‘http’:’http://wpad:3128’,’https’:’http://wpad:3128’} timeout (int) : seconds to wait for answer before giving up auth (list) : (user,password) to authenticate with basic auth verify_cert (bool or str) : either False, True (verify with embedded CA list), or path to a directory or PEM encoded CA bundle file
to check https certificate signature against.
cert (list) : tuple/list of (x509certfilename,pemkeyfilename,key password) for authenticating the client. If key is not encrypted, password must be None referer (str): user_agent: resume (bool): md5 (str) : sha1 (str) : sha256 (str) : cache_dir (str) : if file exists here, and md5 matches, copy from here instead of downloading. If not, put a copy of the file here after downloading.
requests_session (request.Session) : predefined request session to use instead of building one from scratch from proxies, cert, verfify_cert
- Returns:
str : path to downloaded file
>>> respath = wget('http://wapt.tranquil.it/wapt/tis-firefox_28.0.0-1_all.wapt','c:\\tmp\\test.wapt',proxies={'http':'http://proxy:3128'}) ??? >>> os.stat(respath).st_size>10000 True >>> respath = wget('http://localhost:8088/runstatus','c:\\tmp\\test.json') ???
- setuphelpers.wgets(url, proxies: Optional[dict] = None, verify_cert=None, referer=None, user_agent=None, timeout=None, cert=None, requests_session=None, as_json=False) str [source]¶
Return the content of a remote resource as a string / bytes or dict with a http get request.
Raise an exception if remote data can’t be retrieved.
- Args:
url (str): http(s) url proxies (dict): proxy configuration as requests requires it {‘http’: url, ‘https’:url} verify_cert (bool or str) : verfiy server certificate, path to trusted CA bundle cert (tuple of 3 str) : (cert_path, key_path, key password) client side authentication.
requests_session (request.Session) : predefined request session to use instead of building one from scratch
- Returns:
str or bytes or dict : content of remote resource. str or bytes or json depending of the encoding and the Content-Type.
>>> data = wgets('https://wapt/ping') >>> "msg" in data True
PackageEntry Class¶
Version Class¶
- class setuphelpers.Version(version, members_count=None)[source]¶
Version object of form 0.0.0 can compare with respect to natural numbering and not alphabetical
- Args:
version (str) : version string member_count (int) : number of version memebers to take in account.
If actual members in version is less, add missing memeber with 0 value If actual members count is higher, removes last ones.
>>> Version('0.10.2') > Version('0.2.5') True >>> Version('0.1.2') < Version('0.2.5') True >>> Version('0.1.2') == Version('0.1.2') True >>> Version('7') < Version('7.1') True
Changed in version 1.6.2.5: truncate version members list to members_count if provided.
Setupdevhelpers¶
- setupdevhelpers.add_double_quotes_around(string)[source]¶
Return the string with double quotes around
- Args:
string (str): a string
- setupdevhelpers.bs_find(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs)[source]¶
Parse html web page with BeautifulSoup and get the first result
- Args:
url (str): url of the web page to parse element (str): searched element attribute (str): selected attribute of the element value (str): value of the selected attribute user_agent (str): specify a user-agent if needed proxies (dict): specify your proxy if needed **kwargs (str): joker for requests parameters features (str): bs feature to use
>>> bs_find('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')['href'] 'https://web-platform-tests.org/'
>>> bs_find('https://www.w3.org/', 'span', 'class', 'alt-logo').string 'W3C'
New in version 2.0.
- setupdevhelpers.bs_find_all(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs)[source]¶
Parse html web page with BeautifulSoup and get a list of the result
- Args:
url (str): url of the web page to parse element (str): searched element attribute (str): selected attribute of the element value (str): value of the selected attribute user_agent (str): specify a user-agent if needed proxies (dict): specify your proxy if needed **kwargs (str): joker for requests parameters features (str): bs feature to use
>>> bs_find_all('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')[0]['href'] 'https://web-platform-tests.org/'
>>> bs_find_all('https://www.w3.org/', 'span', 'class', 'alt-logo')[0].string 'W3C'
New in version 2.0.
- setupdevhelpers.get_proxies_from_wapt_console()[source]¶
Return proxy information from the current user WAPT console
>>> get_proxies_from_wapt_console() {'http': 'http://srvproxy.ad.domain.lan:8080', 'https': 'http://srvproxy.ad.domain.lan:8080'}
- setupdevhelpers.remove_outdated_binaries(version, list_extensions=['exe', 'msi', 'deb', 'rpm', 'dmg', 'pkg'], filename_contains=None)[source]¶
Remove files based on the version contained in his filename, failing over on file version on compatible OSes
- Args:
version (str): version number of keeped files list_extensions (str or list of str): file extensions of compared files filename_contains (str or list of str): Part of the filename that must be contained (useful for distinguishing architecture and os)
- Returns:
list: list of deleted files
New in version 2.0.
Changed in version 2.2: Now returns removed files, now checking .exe and .msi file versions
- setupdevhelpers.unzip_with_7zip(filename, target=None, filenames=[], extract_with_full_paths=True, recursive=True)[source]¶
Extract the files from an archive file with 7zip with patterns in filenames to target directory
- Args:
filename (str): path to archive file. (can be relative to temporary unzip location of package) target (str): archive content to target dir location (dir will be created if needed). Default: dirname(archive file) + basename(archive file) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is recursive (bool): looking or not for filenames recursively
- Returns:
None
New in version 2.0.
Changed in version 2.2: changed default target location to make it correspond with unzip()
Waptpackage¶
- exception waptpackage.EWaptBadControl[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptBadPackageAttribute[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptBadSetup[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptBadSignature[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptConfigurationError[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptConflictingPackage(msg, install_status='ERROR', retry_count=None)[source]¶
Bases:
EWaptInstallError
- exception waptpackage.EWaptCorruptedFiles[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptDiskSpace[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptDownloadError[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptInstallError(msg, install_status='ERROR', retry_count=None)[source]¶
Bases:
EWaptException
Exception raised during installation of package msg is logged in local install database if retry_count is None, install will be retried indefinitely until success else install is retried at most retry_count times.
- exception waptpackage.EWaptInstallPostponed(msg, install_status='POSTPONED', retry_count=5, grace_delay=3600)[source]¶
Bases:
EWaptInstallError
- exception waptpackage.EWaptMissingLocalWaptFile[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNeedsNewerAgent[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNotAPackage[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNotSigned[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptNotSourcesDirPackage[source]¶
Bases:
EWaptException
- exception waptpackage.EWaptPackageSignError[source]¶
Bases:
EWaptException
Bases:
EWaptInstallError
- class waptpackage.HostCapabilities(from_dict=None, **kwargs)[source]¶
Bases:
BaseObjectClass
- class waptpackage.PackageEntry(package='', version='0', repo='', waptfile=None, section='base', **kwargs)[source]¶
Bases:
BaseObjectClass
Manage package attributes coming from either control files in WAPT package, local DB, or developement dir.
Methods to build, unzip, sign or check a package. Methods to sign the control attributes and check them.
>>> pe = PackageEntry('testgroup','0') >>> pe.depends = 'tis-7zip' >>> pe.section = 'group' >>> pe.description = 'A test package' >>> print(pe.ascontrol()) package : testgroup version : 0 architecture : all section : group priority : optional maintainer : description : A test package depends : tis-7zip conflicts : maturity : locale : min_wapt_version : sources : installed_size : signer : signer_fingerprint: signature_date : signed_attributes :
>>>
- property all_attributes¶
- as_control_bytes()[source]¶
Return this package entry metadata as bytes as saved in package control file
- Return:
bytes: lines with key: value
- ascontrol(with_repo_attributes=False, with_empty_attributes=False)[source]¶
Return control attributes and values as stored in control packages file
Each attribute on a line with key : value If value is multiline, new line begin with a space.
- Args:
with_repo_attributes (bool) : if True, include md5sum and filename (for Packages index only) with_empty_attributes (bool) : weither to include attribute with empty value too or only
non empty and/or signed attributes
- Returns:
str: lines of attr: value
- asrequirement()[source]¶
Return package and version for designing this package in depends or install actions
- Returns:
str: “packagename (=version)”
- build_management_package(target_directory=None)[source]¶
Build the WAPT package from attributes only, without setup.py stores the result in target_directory.
self.sourcespath must be None. Package will contain only a control file.
- Args:
- target_directory (str): where to create Zip wapt file.
if None, temp dir will be used.
- Returns:
str: path to zipped Wapt file. It is unsigned.
>>> pe = PackageEntry('testgroup','0',description='Test package',maintainer='Hubert',sources='https://dev/svn/testgroup',architecture='x86') >>> waptfn = pe.build_management_package() >>> key = SSLPrivateKey('c:/private/htouvet.pem',password='monmotdepasse') >>> crt = SSLCertificate('c:/private/htouvet.crt') >>> pe.sign_package(crt,key) >>> pe.unzip_package() 'c:\users\htouvet\appdata\local\temp\waptob4gcd' >>> ca = SSLCABundle('c:/wapt/ssl') >>> pe.check_control_signature(ca) <SSLCertificate cn=u'htouvet' issuer=u'tranquilit-ca-test' validity=2017-06-28 - 2027-06-26 Code-Signing=True CA=True>
- build_manifest(exclude_filenames=None, block_size=1048576, forbidden_files=[], waptzip=None, excludes=[])[source]¶
Calc the manifest of a wapt package
- Args:
- forbidden_files (list): list of relative files which must not be present for the manifest to be built
(if one is found, build fails)
exclude_filenames (list) : list of exact (relative to package root with forward slashes) filepathes to exclude from manifest. excludes (list) : list of file / dir patterns to exclude, whatever level they are in the file hierarchy
- Returns:
dict: {filepath:shasum,}
- build_package(excludes=[], target_directory=None, excludes_full=['.svn', '.git', 'setup.pyc', 'update_package.pyc', '__pycache__/setup.cpython-38.pyc', '__pycache__/update_package.cpython-38.pyc'])[source]¶
Build the WAPT package, stores the result in target_directory Zip the content of self.sourcespath directory into a zipfile named with default package filename based on control attributes.
Update filename attribute. Update localpath attribute with result filepath.
- Args:
excludes (list) : list of patterns for source files to exclude from built package. target_directory (str): target directory where to store built package.
If None, use parent directory of package sources dircetory.
excludes_full (list) : list of exact (relative to package root) filepathes to exclude from Zip.
- Returns:
str: full filepath to built wapt package
- call_setup_hook(hook_name='session_setup', wapt_context=None, params=None, force=None, user=None)[source]¶
Calls a hook in setuppy given a wapt_context
Set basedir, control, and run context within the function context.
- Args:
hook_name (str): name of function to call in setuppy wapt_context (Wapt) : run context
- Returns:
output of hook.
Changes:
1.6.2.1: the called hook is run with Disabled win6432 FileSystem redirection
- change_depends_conflicts_prefix(new_prefix)[source]¶
Change prefix of package name to new_prefix in depends and conflicts csv lists and return True if it was really changed.
- Args:
new_prefix (str): new prefix to put in package names
- Returns:
bool
- change_prefix(new_prefix)[source]¶
Change prefix of package name to new_prefix and return True if it was really changed.
- check_control_signature(trusted_bundle, signers_bundle=None)[source]¶
Check in memory control signature against a list of public certificates
- Args:
trusted_bundle (SSLCABundle): Trusted certificates. : packages certificates must be signed by one of this bundle. signers_bundle : Optional. List of potential packages signers certificates chains.
When checking Packages index, actual packages are not available, only certificates embedded in Packages index. Package signature are checked against these certificates looking here for potential intermediate CA too. and matching certificate is checked against trusted_bundle.
- Returns:
SSLCertificate : matching trusted package’s signers SSLCertificate
>>> from waptpackage import * >>> from common import SSLPrivateKey,SSLCertificate >>> k = SSLPrivateKey('c:/private/test.pem') >>> c = SSLCertificate('c:/private/test.crt')
>>> p = PackageEntry('test',version='1.0-0') >>> p.depends = 'test' >>> p._sign_control(k,c) >>> p.check_control_signature(c)
>>> p.check_control_signature(SSLCABundle('c:/wapt/ssl'))
- check_package_signature(trusted_bundle, ignore_missing_files=False)[source]¶
Check - hash of files in unzipped package_dir with list in package’s manifest file - try to decrypt manifest signature with package’s certificate - check that the package certificate is issued by a know CA or the same as one the authorized certitificates.
- Args:
trusted_bundle (SSLCABundle) : Local certificates store. Certificates in trusted_bundle.trusted_certificates are trusted. ignore_missing_files (bool): whether to raise exception for missing files. Useful to check stripped down packages when remote resigning
- Returns:
SSLCertificate : matching certificate
- Raises:
Exception if no certificate match is found.
- get(name, default=None)[source]¶
Get PackageEntry property.
- Args:
name (str): property to get. name is forced to lowercase. default (any) : value to return in case the property doesn’t not exist.
- Returns:
any : property value
- get_impacted_process_list()[source]¶
Return a list containing the impacted process
- Returns:
List[str] impacted process list
- get_localized_description(locale_code=None)[source]¶
locale_code is a 2 chars code like fr or en or de
- get_software_version()[source]¶
Return the software version only (without the build number of the package)
- Returns:
str: “software_version”
- get_stripped_package()[source]¶
Build a package keeping only Wapt stuff… Keeps only WAPT directory and setup.py
- Returns:
bytes: zipped data
- has_file(fname)[source]¶
Return None if fname is not in package, else return file datetime
- Args:
fname (unicode): file path like WAPT/signature
- Returns:
datetime : last modification datetime of file in Wapt archive if zipped or local sources if unzipped
- invalidate_signature()[source]¶
Remove all signature informations from control and unzipped package directory Package must be in unzipped state.
- list_corrupted_files(ignore_missing_files=False, remove_extra_files=False)[source]¶
Check hexdigest sha for the files in manifest. Package must be already unzipped.
- Returns:
list: non matching files (corrupted files)
- load_control_from_dict(adict)[source]¶
Fill in members of entry with keys from supplied dict
adict members which are not a registered control attribute are set too and attribute name is put in list of “calculated” attributes.
- Args:
adict (dict): key,value to put in this entry
- Returns:
PackageEntry: self
- load_control_from_wapt(fname=None, calc_md5=False, keep_control_lines=False)[source]¶
Load package attributes from the control file (utf8 encoded) included in WAPT zipfile fname
- Args:
- fname (str or unicode): Package file/directory path
If None, try to load entry attributes from self.sourcespath or self.localpath If fname is a file path, it must be Wapt zipped file, and try to load control data from it If fname is a directory path, it must be root dir of unzipped package file and load control from <fname>/WAPT/control
calc_md5 (boolean): if True and fname is a zipped file, initialize md5sum attribute with md5 part of filename or calc from Zipped file
- Returns:
PackageEntry: self
- local_attributes = ['sourcespath', 'repo', 'localpath', 'repo_url']¶
- make_package_edit_directory()[source]¶
Return the standard package directory to edit the package based on current attributes
- Returns:
- str: standard package filename
{package}_{version}_{architecture}_{OS}_{Min-OS-Version}_{Max-OS-Version}_{maturity}_{locale}-wapt
- make_package_filename(with_md5sum=False)[source]¶
Return the standard package filename based on current attributes parts of control which are either ‘all’ or empty are not included in filename
- Returns:
- str: standard package filename
packagename.wapt for host
packagename_arch_maturity_locale.wapt for group
packagename_version_arch_maturity_locale.wapt for others
- manifest_filename_excludes = ['WAPT/signature', 'WAPT/signature.sha256', 'WAPT/manifest.sha256']¶
- match(match_expr)[source]¶
Return True if package entry match a package string like ‘tis-package (>=1.0.1-00)
- match_search(search)[source]¶
Check if entry match search words
- Args:
search (str): words to search for separated by spaces
- Returns:
boolean: True if entry contains the words in search in correct order and at word boundaries
- match_version(version_expr)[source]¶
Return True if package entry match a version string condition like ‘>=1.0.1-00’
- merge_stripped_package(stripped_package_zip_data=None, stripped_package_filename=None)[source]¶
Use the files from stripped_package_zip_data and include it in current unzipped package, remove files not in manifest, and recheck file hashes.
- not_duplicated_attributes = ['signer', 'signer_fingerprint', 'signature', 'signature_date', 'signed_attributes']¶
- optional_attributes = ['name', 'categories', 'maintainer', 'description', 'depends', 'conflicts', 'maturity', 'locale', 'target_os', 'min_wapt_version', 'sources', 'installed_size', 'impacted_process', 'description_fr', 'description_pl', 'description_de', 'description_es', 'description_pt', 'description_it', 'description_nl', 'description_ru', 'audit_schedule', 'editor', 'keywords', 'licence', 'homepage', 'package_uuid', 'valid_from', 'valid_until', 'forced_install_on', 'changelog', 'min_os_version', 'max_os_version', 'icon_sha256sum']¶
- package_certificates()[source]¶
Return certificates from package. If package is built, take it from Zip else take the certificates from unzipped directory
- Returns:
- list: list of embedded certificates when package was signed or None if not provided or signed.
First one of the list is the signer, the others are optional intermediate CA
- repo_attributes = ['filename', 'size', 'md5sum']¶
- required_attributes = ['package', 'version', 'architecture', 'section', 'priority']¶
- save_control_to_wapt(fname=None, force=True)[source]¶
Save package attributes to the control file (utf8 encoded)
Update self.locapath or self.sourcespath if not already set.
- Args:
- fname (str)base directoy of waptpackage or filepath of Zipped Packges.
If None, use self.sourcespath if exists, or self.localpath if exists
force (bool) : write control in wapt zip file even if it already exist
- Returns:
PackageEntry : None if nothing written, or previous PackageEntry if new data written
- Raises:
Exception: if fname is None and no sourcespath and no localpath Exception: if control exists and force is False
- set_software_version(version, inc_build=False)[source]¶
Set the software version only inc_build will increment the buildnumber
- sign_package(certificate, private_key, keep_signature_date=False, excludes_full=['.svn', '.git', 'setup.pyc', 'update_package.pyc', '__pycache__/setup.cpython-38.pyc', '__pycache__/update_package.cpython-38.pyc'], excludes=[])[source]¶
Sign a package source directory or an already built (zipped) package. Should follow immediately the build_package step.
Append signed control, manifest.sha256 and signature to zip wapt package If these files are already in the package, they are first removed.
Use the self.localpath attribute to get location of waptfile build file.
- Args:
certificate (SSLCertificate or list): signer certificate chain private_key (SSLPrivateKey): signer private key keep_signature_date (bool): If true, previous date fo signature is kept (useful when resigning is needed, but no meaningful change has been done) excludes_full (list) : list of exact (relative to package root) filepathes to exclude from manifest. excludes (list) : list of file / dir patterns to exclude, whatever level they are in the file hierarchy
- Returns:
str: signature
- sign_stripped_package(certificate, private_key, excludes_full=['.svn', '.git', 'setup.pyc', 'update_package.pyc', '__pycache__/setup.cpython-38.pyc', '__pycache__/update_package.cpython-38.pyc'], excludes=[], sign_setuppy=False)[source]¶
Sign an unzipped source package assuming digests in manifest file are OK except for control and certificate.crt * remove signature files -> WAPT/certificate.crt, WAPT/signature.sha256, * resign WAPT/control * update control hash in WAPT/manifest.sha256 * resign WAPT/manifest.sha256 and put in WAPT/signature.sha256
- Args:
certificate (list of SSLCertificate) : certificates chain of the signer. First certificate is the signer’s one. other are intermediate CA private_key (SSLPrivateKey) : key to sign the control and manifest files. excludes_full excludes
- Returns:
str: signature of manifest.sha256
- signature_attributes = ['signer', 'signer_fingerprint', 'signature', 'signature_date', 'signed_attributes']¶
- property tags¶
- property target_os_list¶
- unzip_package(target_dir=None, cabundle=None, ignore_missing_files=False)[source]¶
Unzip package and optionnally check content
- Args:
target_dir (str): where to unzip package content. If None, a temp dir is created cabundle (list) : list of Certificates to check content. If None, no check is done
- Returns:
str : path to unzipped packages files
- Raises:
EWaptNotAPackage, EWaptBadSignature,EWaptCorruptedFiles if check is not successful, unzipped files are deleted.
- class waptpackage.PackageRequest(request=None, copy_from=None, **kwargs)[source]¶
Bases:
BaseObjectClass
Package and version request / condition The request is the basic packagename(=version) request Additional filters can be sepcified as arguments The list filters are ordered from most preferred to least preferred options
- Args:
request (str): packagename(<=>version) architectures (list) : list of x64, x86, arm, arm64, armhf locales (list) : list of 2 letters lki
- property architectures¶
List of accepted architecturs
- get_package_compare_key(pe1: PackageEntry) Tuple [source]¶
Compute a key for a package to compare it with other in the context of this request. This takes in account the preferences from filter like order of locale, architecture, or maturities which define preferences.
- Args:
pe1 (PackageEntry)
- Returns:
tuple
- is_matched_by(package_entry: PackageEntry)[source]¶
Check if package_entry is matching this request
- property locales: List¶
- property maturities: List¶
List of accepted maturities
- property max_os_version¶
- property min_os_version¶
- property package¶
- property request¶
- property tags: list¶
List of accepted tags for OS
- property version¶
- waptpackage.PackageVersion(package_or_versionstr) Tuple [source]¶
Splits a version string 1.2.3.4-567 software version is clipped to 4 members if ‘-packaging’ is not provided, the second member will be 0 for safe comparison
- Args:
package_or_versionstr (str): package version string
- Returns:
tuple: (Version,int) : soft version on 4 members / packaging as an int
- waptpackage.PackageVersionStr(package_or_versionstr) Tuple [source]¶
Splits a version string 1.2.3.4-567 software version is clipped to 4 members if ‘-packaging’ is not provided, the second member will be 0 for safe comparison
- Args:
package_or_versionstr (str): package version string
- Returns:
tuple: (Version,int) : soft version on 4 members / packaging as an int
- class waptpackage.WaptBaseRepo(name='abstract', cabundle=None, config=None, section=None)[source]¶
Bases:
BaseObjectClass
Base abstract class for a Wapt Packages repository
- property authorized_certificates: Sequence[dict]¶
List of authorized signers certificates attributes
- Returns:
list [dict]
- property cabundle¶
- get_certificates(packages_zipfile=None)[source]¶
Download signers certificates and crl from Package index on remote repository.
These certificates and CRL are appended to Packages index when scanning packages.
- Args:
packages_zipfile (zipfile): if None, donwload it from repo
- Returns :
SSLCABundle or None if Packages does not exists
- get_package_by_uuid(package_uuid: str) PackageEntry [source]¶
- get_package_entries(packages_names)[source]¶
Return most up to date packages entries for packages_names packages_names is either a list or a string Returns:
dict: a dictionnary with {‘packages’:[],’missing’:[]}
>>> r = WaptRemoteRepo() >>> r.load_config_from_file('c:/wapt/wapt-get.ini') >>> res = r.get_package_entries(['tis-firefox','tis-putty']) >>> isinstance(res['missing'],list) and isinstance(res['packages'][0],PackageEntry) True
- invalidate_packages_cache()[source]¶
Reset in memory packages index Returns the old content of cached (packages, packages index date, discarded packages)
- Returns:
dict : old cache status dict(_packages=self._packages,_packages_date=self._packages_date,discarded=self.discarded)
- is_locally_allowed_package(package)[source]¶
Return True if package is not in blacklist and is in whitelist if whitelist is not None packages_whitelist and packages_blacklist are list of package name wildcards (file style wildcards) blacklist is taken in account first if defined. whitelist is taken in acoount if not None, else all not blacklisted package names are allowed.
- load_config(config=None, section=None)[source]¶
Load configuration from inifile section. Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file Value not defined in ini file are taken from class _default_config dict
load_config is called at __init__, eventually with config = None. In this case, all parameters are initialized from defaults
- Args:
config (RawConfigParser): ini configuration section (str) : section where to loads parameters
defaults to name of repository
- Returns:
self: return itself to chain calls.
- load_config_from_file(config_filename, section=None)[source]¶
Load repository configuration from an inifile located at config_filename
- Args:
config_filename (str) : path to wapt inifile section (str): ini section from which to get parameters. default to repo name
- Returns:
WaptBaseRepo: self
- need_update(last_modified=None)[source]¶
Check if packages index has changed on repo and local index needs an update
- Compare date on local package index DB with the Packages file on remote
repository with a HEAD http request.
- Args:
last_modified (str): iso datetime of last known update of packages.
- Returns
- bool: True if either Packages was never read or remote date of Packages is
more recent than the provided last_modifed date.
>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4) >>> waptdb = WaptDB('c:/wapt/db/waptdb.sqlite') >>> res = repo.need_update(waptdb.read_param('last-%s'% repo.url)) >>> isinstance(res,bool) True
- packages()[source]¶
Return list of packages, load it from repository if not yet available in memory To force the reload, call invalidate_index_cache() first or update()
- packages_date()[source]¶
Date of last known packages index
- Returns:
str: date/time of Packages index in iso format (string)
- packages_matching(package_cond: Optional[Union[str, PackageRequest]] = None, **kwargs) list [source]¶
Return an ordered list of available packages entries which match the condition “packagename[([=<>]version)]?” version ascending
- Args:
package_cond (str or PackageRequest): package name with optional version specifier.
- Returns:
list of PackageEntry
>>> from waptpackage import * >>> r = WaptRemoteRepo('http://wapt.tranquil.it/wapt') >>> r.packages_matching('tis-firefox(>=20)') [PackageEntry('tis-firefox','20.0.1-02'), PackageEntry('tis-firefox','21.0.0-00'), ...]
- property public_certs_dir¶
- property repo_url¶
- search(searchwords=[], sections=[], newest_only=False, exclude_sections=[], description_locale=None, host_capabilities=None, package_request=None, dependencies_list=[])[source]¶
- Return list of package entries
with description or name matching all the searchwords and section in provided sections list
- Args:
searchwords (list or csv) : list of word to lookup in description and package names sections (list or csv) : list of package sections to use when searching newest_only (bool) : returns only highest version of package exclude_sections (list or csv): list of package sections to exclude when searching description_locale (str): if not None, search in description using this locale host_capabilities (HostCapabilities or dict): restrict output to these capabilities (os version locales, arch etc..) package_request (PackageRequest or dict) : restrict output to these filters, and sort output based on them
- Returns:
list of PackageEntry with additional _localized_description added if description_locale is provided
>>> r = WaptRemoteRepo(name='test',url='http://wapt.tranquil.it/wapt') >>> r.search('test')
- class waptpackage.WaptLocalRepo(localpath=None, name='waptlocal', cabundle=None, config=None, section=None)[source]¶
Bases:
WaptBaseRepo
- Index of Wapt local repository.
- Index of packages is located in a Packages zip file, having one
Packages file, containing the concatenated content of “control” files of the packages.
A blank line means new package.
>>> localrepo = WaptLocalRepo('c:/wapt/cache') >>> localrepo.update()
- is_available(url=None)[source]¶
Check if repo is reachable an return creation date of Packages.
- Returns:
str: Iso creation date of remote Package file as returned in http headers
- load_config(config=None, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file
- Args:
config (RawConfigParser): ini configuration section (str) : section where to loads parameters
defaults to name of repository
- Returns:
WaptRemoteRepo: return itself to chain calls.
- property packages_path¶
- property repo_url¶
- update_packages_index(force_all=False, proxies=None, canonical_filenames=False, include_host_packages=False, include_certificates=True, include_crls=True, extract_icons=True)[source]¶
Scan self.localpath directory for WAPT packages and build a Packages (utf8) zip file with control data and MD5 hash
Extract icons from packages (WAPT/icon.png) and stores them in <repo path>/icons/<package name>.png Extract certificate and add it to Packages zip file in ssl/<fingerprint.crt> Append CRL for certificates.
- Returns:
dict : {‘processed’:processed,’kept’:kept,’errors’:errors,’packages_filename’:packages_fname}
- class waptpackage.WaptRemoteRepo(url=None, name='', verify_cert=None, http_proxy=None, timeout=None, cabundle=None, config=None, section=None)[source]¶
Bases:
WaptBaseRepo
Gives access to a remote http repository, with a zipped Packages packages index
>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4) >>> last_modified = repo.is_available() >>> isinstance(last_modified,str) True
- client_auth()[source]¶
Return SSL trio filenames for client side SSL auth
- Returns:
tuple: (cert filename,key filename,key pwd)
- download_icons(package_requests, target_dir=None, usecache=True, printhook=None)[source]¶
Download a list of icons from packages (requests are of the form packagename (>version) ) returns a dict of {“downloaded,”skipped”,”errors”}
If package_requests is a list of PackageEntry, update localpath of entry to match downloaded file.
- Args:
package_requests (list) : list of PackageEntry to download or list of package with optional version
- Returns:
dict: ‘packages’, ‘downloaded’, ‘skipped’, ‘errors’
>>> repo = WaptRemoteRepo(url='http://wapt.tranquil.it/wapt') >>> wapt.download_packages(['tis-firefox','tis-waptdev'],printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- download_packages(package_requests, target_dir=None, usecache=True, printhook=None)[source]¶
- Download a list of packages (requests are of the form packagename (>version) )
returns a dict of {“downloaded,”skipped”,”errors”}
If package_requests is a list of PackageEntry, update localpath of entry to match downloaded file.
- Args:
package_requests (list) : list of PackageEntry to download or list of package with optional version
- Returns:
dict: ‘packages’, ‘downloaded’, ‘skipped’, ‘errors’
>>> repo = WaptRemoteRepo(url='http://wapt.tranquil.it/wapt') >>> wapt.download_packages(['tis-firefox','tis-waptdev'],printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- get_requests_session(url=None, http_proxy=None)[source]¶
Returns a requests session object with optional ssl client side auth and proxies
- Returns:
requests.Session
- is_available(url=None, http_proxy=None)[source]¶
Check if repo is reachable an return creation date of Packages.
Try to access the repo and return last modified date of repo index or None if not accessible
- Returns:
str: Iso creation date of remote Package file as returned in http headers
>>> repo = WaptRemoteRepo(name='main',url='https://wapt/wapt',timeout=1) >>> repo.is_available() <= datetime2isodate() True >>> repo = WaptRemoteRepo(name='main',url='https://badwapt/wapt',timeout=1) >>> repo.is_available() is None True
- load_config(config=None, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided.
Use ‘global’ if no section named section in ini file
- Args:
config (RawConfigParser): ini configuration section (str) : section where to loads parameters
defaults to name of repository
- Returns:
WaptRemoteRepo: return itself for chain calls.
- packages_url(url=None)[source]¶
return url of Packages index file
>>> repo = WaptRemoteRepo(name='main',url='http://wapt/wapt',timeout=4) >>> repo.packages_url 'http://wapt/wapt/Packages'
hardcoded path to the Packages index.
- property proxies¶
dict for http proxies url suitable for requests based on the http_proxy repo attribute
- Returns:
dict: {‘http’:’http://proxy:port’,’https’:’http://proxy:port’}
- property repo_url¶
- waptpackage.control_to_dict(control, int_params=('size', 'installed_size'), out_excluded_control_keys=['filename', 'size', 'md5sum', 'repo_url', 'repo'], out_control_lines=None)[source]¶
Convert a control file like object key1: value1 key2: value2 … list of lines into a dict
Multilines strings begins with a space
Breaks when an empty line is reached (limit between 2 package in Packages indexes)
- Args:
control (file,str or list): file like object to read control from (until an empty line is reached) int_params (list): attributes which must be converted to int out_control_lines (list): if not None, decoded raw control lines are appended to this list.
- Returns:
dict
- waptpackage.parse_major_minor_patch_build(version)[source]¶
Parse version to major, minor, patch, pre-release, build parts.
- waptpackage.update_packages(adir, force=False, proxies=None, canonical_filenames=False)[source]¶
Helper function to update a local packages index
- This function is used on repositories to rescan all packages and
update the Packages index.
>>> if os.path.isdir('c:\wapt\cache'): ... repopath = 'c:\wapt\cache' ... else: ... repopath = '/var/www/wapt' >>> p = PackageEntry() >>> p.package = 'test' >>> p.version = '10' >>> new_package_fn = os.path.join(repopath,p.make_package_filename()) >>> if os.path.isfile(new_package_fn): ... os.unlink(new_package_fn) >>> res = update_packages(repopath) >>> os.path.isfile(res['packages_filename']) True >>> r = WaptLocalRepo(localpath=repopath) >>> l1 = r.packages() >>> res = r.update_packages_index() >>> l2 = r.packages() >>> [p for p in l2 if p not in l1] ["test (=10)"]
Common¶
- exception common.EWaptBadServerAuthentication[source]¶
Bases:
EWaptException
- class common.Wapt(config_filename=None, defaults=None, disable_update_server_status=True, wapt_base_dir=None, dbpath=None)[source]¶
Bases:
BaseObjectClass
Global WAPT engine
- add_hosts_repo() WaptHostRepo [source]¶
Add an automatic host repository, remove existing WaptHostRepo last one before
- add_pyscripter_project(target_directory)[source]¶
Add a pyscripter project file to package development directory.
- Args:
target_directory (str): path to location where to create the wapt.psproj file.
- Returns:
None
- add_vscode_project(target_directory)[source]¶
Add .vscode folder with project files to the package development directory
- Args:
target_directory (str): path to the package development directory.
- Returns:
None
- audit(package, force=False, audited_by=None) str [source]¶
Run the audit hook for the installed package” Source setup.py from database, filename, or packageEntry Stores the result and log into “wapt_localstatus” table
- Args:
package (PackageEntry or directory or package name or {package_uuid} ): package to audit
- Returns:
str : iso datetime of this audit as stored in packages status table
- audit_data_expired(section, key, value)[source]¶
Check if the latest value associated with section/key is expired
- Returns:
bool: True is data exists and has expires or if data does not exists.
- authorized_certificates()[source]¶
return a list of autorized package certificate issuers for this host check_certificates_validity enable date checking.
- build_package(directoryname, target_directory=None, excludes=[])[source]¶
Build the WAPT package from a directory
- Args:
directoryname (str): source root directory of package to build inc_package_release (boolean): increment the version of package in control file. set_maturity (str): if not None, change package maturity to this. Can be something like DEV, PROD etc..
- Returns:
str: Filename of built WAPT package
- build_upload(sources_directories, private_key_passwd=None, wapt_server_user=None, wapt_server_passwd=None, inc_package_release=False, target_directory=None, set_maturity=None)[source]¶
Build a list of packages and upload the resulting packages to the main repository. if section of package is group or host, user specific wapt-host or wapt-group
- Returns
list: list of filenames of built WAPT package
- property cabundle¶
- call_python_code(python_filename, func_name, package_entry=None, force=None, params=None, working_dir=None, import_modules=[])[source]¶
Calls a function in python_filename. Set basedir, control, and run context within the function context.
- Args:
python_filename : python filename mith module to load. func_name (str): name of function to call in setuppy package_entry (PackageEntry): if not None, use it to set environment
- Returns:
output of hook.
- check_all_depends_conflicts()[source]¶
Check the whole dependencies/conflicts tree for installed packages
- check_depends(apackages, forceupgrade=False, force=False, assume_removed=[], package_request_filter=None)[source]¶
Given a list of packagename or requirement “name (=version)”, return a dictionnary of {‘additional’ ‘upgrade’ ‘install’ ‘skipped’ ‘unavailable’,’remove’} of [packagerequest,matching PackageEntry]
- Args:
apackages (str or list): list of packages for which to check missing dependencies. forceupgrade (boolean): if True, check if the current installed packages is the latest available force (boolean): if True, install the latest version even if the package is already there and match the requirement assume_removed (list): list of packagename which are assumed to be absent even if they are actually installed to check the
consequences of removal of packages, implies force=True
- package_request_filter (PackageRequest): additional filter to apply to packages to sort by locales/arch/mat preferences
if None, get active host filter
- Returns:
dict : {‘additional’ ‘upgrade’ ‘install’ ‘skipped’ ‘unavailable’, ‘remove’} with list of [packagerequest,matching PackageEntry]
- check_downloads(apackages=None, usecache=True)[source]¶
Return list of available package entries to match supplied packages requirements
- Args:
apackages (list or str): list of packages usecache (bool) : returns only PackageEntry not yet in cache
- Returns:
list: list of PackageEntry to download
- check_install(apackages=None, force=True, forceupgrade=True)[source]¶
Return a list of actions required for install of apackages list of packages if apackages is None, check for all pending updates.
- Args:
apackages (str or list): list of packages or None to check pending install/upgrades force (boolean): if True, already installed package listed in apackages
will be considered to be reinstalled
- forceupgrade: if True, all dependencies are upgraded to latest version,
even if current version comply with depends requirements
- Returns:
- dict: with keys [‘skipped’, ‘additional’, ‘remove’, ‘upgrade’, ‘install’, ‘unavailable’] and list of
(package requirements, PackageEntry)
- check_install_running(max_ttl=60)[source]¶
Check if an install is in progress, return list of pids of install in progress Kill old stucked wapt-get processes/children and update db status max_ttl is maximum age of wapt-get in minutes
- check_remove(apackages)[source]¶
Return a list of additional package to remove if apackages are removed
- Args:
apackages (str or list of req or PackageRequest): list of packages for which parent dependencies will be checked.
- Returns:
list: list of PackageRequest with broken dependencies
- cleanup(obsolete_only=False)[source]¶
Remove cached WAPT files from local disk
- Args:
- obsolete_only (boolean): If True, remove packages which are either no more available,
or installed at a equal or newer version
- Returns:
list: list of filenames of removed packages
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> l = wapt.download_packages(wapt.check_downloads()) >>> res = wapt.cleanup(True)
- cleanup_session_setup()[source]¶
Remove all current user session_setup informations for removed packages
- create_or_update_host_certificate(force_recreate=False)[source]¶
- Create a rsa key pair for the host and a x509 certiticate.
Location of key is <wapt_root>private Should be kept secret restricted access to system account and administrators only.
- Args:
force_recreate (bool): recreate key pair even if already exists for this FQDN.
- Returns:
str: x509 certificate of this host.
- property dbpath¶
- dependencies(packagename, expand=False)[source]¶
Return all dependecies of a given package >>> w = Wapt(config_filename=’c:/wapt/wapt-get.ini’) >>> dep = w.dependencies(‘tis-waptdev’) >>> isinstance(dep,list) and isinstance(dep[0],PackageEntry) True
- download_icons(package_requests, usecache=True, printhook=None)[source]¶
Download a list of package icons (requests are of the form packagename (>version) ) returns a dict of {“downloaded,”skipped”,”errors”}
- Args:
package_requests (str or list): list of packages to prefetch usecache (boolean) : if True, don’t download package if already in cache printhook (func) : callback with signature report(received,total,speed,url) to display progress
- Returns:
dict: with keys {“downloaded,”skipped”,”errors”,”packages”} and list of PackageEntry.
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> def nullhook(*args): ... pass >>> wapt.download_packages(['tis-firefox','tis-waptdev'],usecache=False,printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- download_packages(package_requests, usecache=True, printhook=None)[source]¶
Download a list of packages (requests are of the form packagename(=version) ) If several packages are matching a request, the highest/latest only is kept.
- Args:
package_requests (str or list): list of packages to prefetch usecache (boolean) : if True, don’t download package if already in cache printhook (func) : callback with signature report(received,total,speed,url) to display progress
- Returns:
dict: with keys {“downloaded,”skipped”,”errors”,”packages”} and list of PackageEntry.
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> def nullhook(*args): ... pass >>> wapt.download_packages(['tis-firefox','tis-waptdev'],usecache=False,printhook=nullhook) {'downloaded': [u'c:/wapt\\cache\\tis-firefox_37.0.2-9_all.wapt', u'c:/wapt\\cache\\tis-waptdev.wapt'], 'skipped': [], 'errors': []}
- duplicate_package(packagename, newname=None, newversion=None, newmaturity=None, target_directory=None, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, auto_inc_version=True, usecache=True, printhook=None, cabundle=None)[source]¶
Duplicate an existing package. Duplicate an existing package from declared repostory or file into targetdirectory with
optional newname and version.
- Args:
packagename (str) : packagename to duplicate, or filepath to a local package or package development directory. newname (str): name of target package newversion (str): version of target package. if None, use source package version target_directory (str): path where to put development files. If None, use temporary. If empty, use default development dir append_depends (list): comma str or list of depends to append. remove_depends (list): comma str or list of depends to remove. auto_inc_version (bool): if version is less than existing package in repo, set version to repo version+1 usecache (bool): If True, allow to use cached package in local repo instead of downloading it. printhook (func): hook for download progress cabundle (SSLCABundle): list of authorized ca certificate (SSLPublicCertificate) to check authenticity of source packages. If None, no check is performed.
- Returns:
PackageEntry : new packageEntry with sourcespath = target_directory
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> wapt.dbpath = ':memory:' >>> r= wapt.update() >>> def nullhook(*args): ... pass >>> tmpdir = 'c:/tmp/testdup-wapt' >>> if os.path.isdir(tmpdir): ... import shutil ... shutil.rmtree(tmpdir) >>> p = wapt.duplicate_package('tis-wapttest', ... newname='testdup', ... newversion='20.0-0', ... target_directory=tmpdir, ... excludes=['.svn','.git','.gitignore','*.pyc','src'], ... append_depends=None, ... auto_inc_version=True, ... usecache=False, ... printhook=nullhook) >>> print repr(p['package']) PackageEntry('testdup','20.0-0') >>> if os.path.isdir(tmpdir): ... import shutil ... shutil.rmtree(tmpdir) >>> p = wapt.duplicate_package('tis-wapttest', ... target_directory=tempfile.mkdtemp('wapt'), ... auto_inc_version=True, ... append_depends=['tis-firefox','tis-irfanview'], ... remove_depends=['tis-wapttestsub'], ... ) >>> print repr(p['package']) PackageEntry('tis-wapttest','120')
- edit_host(hostname, target_directory=None, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, printhook=None, description=None, cabundle=None)[source]¶
Download and extract a host package from host repositories into target_directory for modification
- Args:
hostname (str) : fqdn of the host to edit target_directory (str) : where to place the developments files. if empty, use default one from wapt-get.ini configuration append_depends (str or list) : list or comma separated list of package requirements remove_depends (str or list) : list or comma separated list of package requirements to remove cabundle (SSLCA Bundle) : authorized ca certificates. If None, use default from current wapt.
- Returns:
PackageEntry
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> tmpdir = 'c:/tmp/dummy' >>> wapt.edit_host('dummy.tranquilit.local',target_directory=tmpdir,append_depends='tis-firefox') >>> import shutil >>> shutil.rmtree(tmpdir) >>> host = wapt.edit_host('htlaptop.tranquilit.local',target_directory=tmpdir,append_depends='tis-firefox') >>> 'package' in host True >>> shutil.rmtree(tmpdir)
- edit_package(packagerequest, target_directory='', use_local_sources=True, append_depends=None, remove_depends=None, append_conflicts=None, remove_conflicts=None, auto_inc_version=True, cabundle=None)[source]¶
Download an existing package from repositories into target_directory for modification if use_local_sources is True and no newer package exists on repos, updates current local edited data else if target_directory exists and is not empty, raise an exception
- Args:
packagerequest (str) : path to existing wapt file, or package request use_local_sources (boolean) : don’t raise an exception if target exist and match package version append_depends (list of str) : package requirements to add to depends remove_depends (list or str) : package requirements to remove from depends auto_inc_version (bool) : cabundle (SSLCABundle) : list of authorized certificate filenames. If None, use default from current wapt.
- Returns:
PackageEntry : edit local package with sourcespath attribute populated
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> wapt.dbpath = ':memory:' >>> r= wapt.update() >>> tmpdir = tempfile.mkdtemp('wapt') >>> res = wapt.edit_package('tis-wapttest',target_directory=tmpdir,append_depends='tis-firefox',remove_depends='tis-7zip') >>> res['target'] == tmpdir and res['package'].package == 'tis-wapttest' and 'tis-firefox' in res['package'].depends True >>> import shutil >>> shutil.rmtree(tmpdir)
- forget_packages(packages_list)[source]¶
Remove install status for packages from local database without actually uninstalling the packages
- Args:
packages_list (list): list of installed package names to forget
- Returns:
list: list of package names actually forgotten
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> res = wapt.install('tis-test') ??? >>> res = wapt.is_installed('tis-test') >>> isinstance(res,PackageEntry) True >>> wapt.forget_packages('tis-test') ['tis-test'] >>> wapt.is_installed('tis-test') >>> print wapt.is_installed('tis-test') None
- generate_host_uuid(forced_uuid=None)[source]¶
Regenerate a random UUID for this host or force with supplied one.
Normally, the UUID is taken from BIOS through wmi.
In case bios returns some duplicates or garbage, it can be useful to force a random uuid. This is stored as uuid key in wapt-get.ini.
In case we want to link th host with a an existing record on server, we can force a old UUID.
- Args;
forced_uuid (str): uuid to force for this host. If None, generate a random one
- get_auth_token(purpose='websocket')[source]¶
Get an auth token from server providing signed uuid by provite host key.
- Returns:
dict
- get_default_development_dir(packagecond, section='base')[source]¶
Returns the default development directory for package named <packagecond> based on default_sources_root ini parameter if provided
- Args:
packagecond (PackageEntry or str): either PackageEntry or a “name(=version)” string
- Returns:
unicode: path to local proposed development directory
- get_host_certificate()[source]¶
Return the current host certificate. If the certificate does not yet exist, it is created as a self signed certificate and stored in get_host_certificate_filename path
- Returns:
SSLCertificate: host public certificate.
- get_host_certificate_filename()[source]¶
Full filepath of the host own certificate and RSA public key
- Returns:
str
- get_host_certificate_signing_request()[source]¶
Return a CSR with CN and dnsname pointing to this host uuid
- Returns:
SSLCertificateSigningRequest: host public certificate sigbinbg request.
- get_host_key(create=True)[source]¶
Return private key used to sign uploaded data from host Create key if it does not exists yet.
- Returns:
SSLPrivateKey: Private key used to sign data posted by host.
- get_host_packages()[source]¶
Return list of implicit available host packages based on computer UUID and AD Org Units
- Returns:
list: list of PackageEntry.
- get_host_packages_names()[source]¶
Return list of implicit host package names based on computer UUID and AD Org Units
- Returns:
list: list of str package names.
- get_installed_host_packages()[source]¶
Get the implicit package names (host and unit packages) which are installed but no longer relevant
- Returns:
list: of installed package names
- get_installer_defaults(installer_path)[source]¶
Returns guessed default values for package templates based on installer binary
- Args:
installer_path (str): filepath to installer
- Returns:
dict:
>>> get_installer_defaults(r'c: ranquilit\wapt ests\SumatraPDF-3.1.1-install.exe') {'description': u'SumatraPDF Installer (Krzysztof Kowalczyk)', 'filename': 'SumatraPDF-3.1.1-install.exe', 'silentflags': '/VERYSILENT', 'simplename': u'sumatrapdf-installer', 'type': 'UnknownExeInstaller', 'version': u'3.1.1'}
>>> get_installer_defaults(r'c: ranquilit\wapt estsz920.msi') {'description': u'7-Zip 9.20 (Igor Pavlov)', 'filename': '7z920.msi', 'silentflags': '/q /norestart', 'simplename': u'7-zip-9.20', 'type': 'MSI', 'version': u'9.20.00.0'}
- get_json_config_filename(config_name)[source]¶
Returns the filename for a json config named <config_name>
- get_last_update_status()[source]¶
Get update status of host as stored at the end of last operation.
- Returns:
- dict:
‘date’: timestamp of last operation ‘runstatus’: last printed message of wapt core ‘running_tasks’: list of tasks ‘errors’: list of packages not installed properly ‘upgrades’: list of packages which need to be upgraded
- get_next_audit_datetime()[source]¶
Return next datetime for next audit loop = minimum(next_audit_date)
- get_outdated_host_packages()[source]¶
Check and return the available host packages available and not installed
- get_package_attribute(package, key, default_value=None)[source]¶
Store in local db a key/value pair for later use
- get_package_entries(packages_names)[source]¶
Return most up to date packages entries for packages_names packages_names is either a list or a string.
‘missing’ key lists the package requirements which are not available in the package index.
- Args;
packages_names (list or str): list of package requirements
- Returns:
dict : {‘packages’:[PackageEntries,],’missing’:[str,]}
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> res = wapt.get_package_entries(['tis-firefox','tis-putty']) >>> isinstance(res['missing'],list) and isinstance(res['packages'][0],PackageEntry) True
- get_previous_package_params(package_entry)[source]¶
Return the params used when previous install of package_entry.package If no previous install, return {} The params are stored as json string in local package status table.
- Args:
package_entry (PackageEntry): package request to lookup.
- Returns:
dict
- get_sources(package)[source]¶
Download sources of package (if referenced in package as a https svn) in the current directory
- Args:
package (str or PackageRequest): package to get sources for
- Returns:
str : checkout directory path
- get_unrelevant_host_packages()[source]¶
Get the implicit package names (host and unit packages) which are installed but no longer relevant
- Returns:
list: of installed package names
- global_attributes = ['wapt_base_dir', 'waptserver', 'config_filename', 'proxies', 'repositories', 'personal_certificate_path', 'public_certs_dir', 'package_cache_dir', 'dbpath', 'http_proxy', 'use_http_proxy_for_repo', 'use_http_proxy_for_server', 'limit_bandwidth', 'waptservice_user', 'waptservice_password', 'waptservice_admin_filter', 'waptservice_port', 'waptservice_poll_timeout', 'locales', 'custom_tags', 'packages_whitelist', 'packages_blacklist', 'maturities', 'host_uuid', 'use_fqdn_as_uuid', 'use_hostpackages', 'use_ad_groups', 'use_repo_rules', 'host_profiles', 'host_organizational_unit_dn', 'host_ad_site', 'allow_user_service_restart', 'allow_remote_shutdown', 'allow_remote_reboot', 'ldap_auth_server', 'ldap_auth_base_dn', 'ldap_auth_ssl_enabled', 'verify_cert_ldap', 'loglevel', 'loglevel_waptcore', 'loglevel_waptservice', 'loglevel_wapttasks', 'loglevel_waptws', 'loglevel_waptdb', 'loglevel_websocket', 'loglevel_waitress', 'log_to_windows_events', 'download_after_update_with_waptupdate_task_period', 'websockets_ping', 'websockets_retry_delay', 'websockets_check_config_interval', 'websockets_hurry_interval', 'notify_user', 'waptaudit_task_period', 'signature_clockskew', 'wol_relay', 'hiberboot_enabled', 'max_gpo_script_wait', 'pre_shutdown_timeout', 'minimum_battery_percent', 'uninstallkey_timeout', 'check_certificates_validity', 'token_lifetime', 'repositories', 'trust_all_certs_in_pems', 'wapt_temp_dir']¶
- property hiberboot_enabled¶
get HiberbootEnabled.
- host_capabilities()[source]¶
Return the current capabilities of host taken in account to determine packages list and whether update should be forced (when filter criteria are updated) This includes host certificate,architecture,locale,authorized certificates
- Returns:
dict
- host_capabilities_fingerprint()[source]¶
Return a fingerprint representing the current capabilities of host This includes host certificate,architecture,locale,authorized certificates
- Returns:
str
- property host_dn¶
- property host_organizational_unit_dn¶
Get host org unit DN from wapt-get.ini [global] host_organizational_unit_dn if defined or from registry as supplied by AD / GPO process
- property host_profiles¶
- property host_site¶
- property host_uuid: str¶
- http_upload_package(packages, wapt_server_user=None, wapt_server_passwd=None, progress_hook=None)[source]¶
Upload a package or host package to the waptserver.
- Args:
packages (str or list): list of filepaths or PackageEntry to wapt packages to upload wapt_server_user (str) : user for basic auth on waptserver wapt_server_passwd (str) : password for basic auth on waptserver
Returns:
>>> from common import * >>> wapt = Wapt(config_filename = r'C:\tranquilit\wapt\tests\wapt-get.ini') >>> r = wapt.update() >>> d = wapt.duplicate_package('tis-wapttest','toto') >>> print d {'target': u'c:\\users\\htouvet\\appdata\\local\\temp\\toto.wapt', 'package': PackageEntry('toto','119')} >>> wapt.http_upload_package(d['package'],wapt_server_user='admin',wapt_server_passwd='password')
- install(apackages=None, force=False, params_dict={}, download_only=False, usecache=True, printhook=None, installed_by=None, only_priorities=None, only_if_not_process_running=False, process_dependencies=True)[source]¶
Install a list of packages and its dependencies removes first packages which are in conflicts package attribute
Returns a dictionary of (package requirement,package) with ‘install’,’skipped’,’additional’
- Args:
apackages (list or str): list of packages requirements “packagename(=version)” or list of PackageEntry. force (bool) : reinstalls the packages even if it is already installed params_dict (dict) : parameters passed to the install() procedure in the packages setup.py of all packages
as params variables and as “setup module” attributes
download_only (bool) : don’t install package, but only download them usecache (bool) : use the already downloaded packages if available in cache directory printhook (func) : hook for progress print
- Returns:
- dict: with keys [‘skipped’, ‘additional’, ‘remove’, ‘upgrade’, ‘install’, ‘unavailable’] and list of
(package requirements, PackageEntry)
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> def nullhook(*args): ... pass >>> res = wapt.install(['tis-wapttest'],usecache=False,printhook=nullhook,params_dict=dict(company='toto')) >>> isinstance(res['upgrade'],list) and isinstance(res['errors'],list) and isinstance(res['additional'],list) and isinstance(res['install'],list) and isinstance(res['unavailable'],list) True >>> res = wapt.remove('tis-wapttest') >>> res == {'removed': ['tis-wapttest'], 'errors': []} True
- install_immediate(force=False, only_priorities=None, only_if_not_process_running=False)[source]¶
Install pending packages which must be forcibly installed at a specific time.
- Args:
only_if_not_process_running: install package only if impacted_process are not running
- Returns:
- dict: {‘upgrade’: [], ‘additional’: [], ‘downloads’:
{‘downloaded’: [], ‘skipped’: [], ‘errors’: []},
‘remove’: [], ‘skipped’: [], ‘install’: [], ‘errors’: [], ‘unavailable’: []}
- install_json_config(conf, config_name=None, priority=None)[source]¶
Add a dynamic configuration from dict conf with name config_name and priority
- install_json_config_file(json_config_file, config_name=None, priority=None)[source]¶
Install a json config stored in a file
- install_json_config_from_url(url=None, config_hash=None, config_name='default_config', priority=None)[source]¶
Load a json config from the remote wapt server (default location is /var/www/wapt/conf.d/<config_name>-<config_hash>.json) given its name and hash.
- install_json_configb64(json_config_b64, config_name=None, priority=None)[source]¶
Install a json config encoded as a base64 string
- install_wapt(fname, params_dict={}, explicit_by=None, force=None)[source]¶
Install a single wapt package given its WAPT filename. return install status
- Args:
fname (str): Path to wapt Zip file or unzipped development directory params (dict): custom parmaters for the install function explicit_by (str): identify who has initiated the install
- Returns:
str: ‘OK’,’ERROR’
Raises:
EWaptMissingCertificate EWaptNeedsNewerAgent EWaptUnavailablePackage EWaptConflictingPackage EWaptBadPackageAttribute EWaptException various Exception depending on setup script
- installed(include_errors=False)[source]¶
Returns all installed packages with their status
- Args:
include_errors (boolean): include packages wnot installed successfully
- Returns:
list: list of PackageEntry merged with local install status.
- inventory()[source]¶
Return full inventory of the computer as a dictionary.
- Returns:
dict: {‘host_info’,’wapt_status’,’dmi’,’installed_softwares’,’installed_packages’}
- …changed:
1.4.1: renamed keys 1.6.2.4: removed setup.py from packages inventory.
- is_available(packagename)[source]¶
Check if a package (with optional version condition) is available in repositories.
- Args:
packagename (str) : package name to lookup or package requirement ( packagename(=version) )
- Returns:
list : of PackageEntry sorted by package version ascending
>>> wapt = Wapt(config_filename='c:/tranquilit/wapt/tests/wapt-get.ini') >>> l = wapt.is_available('tis-wapttest') >>> l and isinstance(l[0],PackageEntry) True
- is_installed(packagename, include_errors=False)[source]¶
Checks if a package is installed. Return package entry and additional local status or None
- Args:
packagename (str): name / {package_uuid}/ package request to query
- Returns:
- PackageEntry: None en PackageEntry merged with local install_xxx fields
install_date
install_output
install_params
install_status
- is_locally_allowed_package(package)[source]¶
Return True if package is not in blacklist and is in whitelist if whitelist is not None packages_whitelist and packages_blacklist are list of package name wildcards (file style wildcards) blacklist is taken in account first if defined. whitelist is taken in acoount if not None, else all not blacklisted package names are allowed.
- is_wapt_package_development_dir(directory)[source]¶
Return PackageEntry if directory is a wapt developement directory (a WAPT/control file exists) or False
- is_wapt_package_file(filename)[source]¶
Return PackageEntry if filename is a wapt package or False True if file ends with .wapt and control file can be loaded and decoded from zip file
- Args:
filename (str): path to a file
- Returns:
False or PackageEntry
- last_install_log(packagename)[source]¶
Get the printed output of the last install of package named packagename
- Args:
packagename (str): name of package to query
- Returns:
dict: {status,log} of the last install of a package
>>> w = Wapt() >>> w.last_install_log('tis-7zip') ??? {'install_status': u'OK', 'install_output': u'Installing 7-Zip 9.38.0-1\n7-Zip already installed, skipping msi install\n',install_params: ''}
- list(searchwords=[])[source]¶
Returns a list of installed packages which have the searchwords in their description
- Args:
- searchwords (list): list of words to llokup in package name and description
only entries which have words in the proper order are returned.
- Returns:
list: list of PackageEntry matching the search words
>>> w = Wapt() >>> w.list('zip') [PackageEntry('tis-7zip','16.4-8') ]
- list_upgrade(current_datetime=None)[source]¶
Returns a list of package requirements for packages which should be installed / upgraded / removed
- Returns:
dict: {‘additional’: [], ‘install’: [], ‘remove’: [], ‘upgrade’: []}
- load_config(config_filename=None, merge_config_packages=None)[source]¶
Load configuration parameters from supplied inifilename
- make_group_template(packagename='', maturity=None, depends=None, conflicts=None, directoryname=None, section='group', description=None)[source]¶
Creates or updates on disk a skeleton of a WAPT group package. If the a package skeleton already exists in directoryname, it is updated.
sourcespath attribute of returned PackageEntry is populated with the developement directory of group package.
- Args:
packagename (str): group name depends : conflicts directoryname section description
- Returns:
PackageEntry
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> tmpdir = 'c:/tmp/dummy' >>> if os.path.isdir(tmpdir): ... import shutil ... shutil.rmtree(tmpdir) >>> p = wapt.make_group_template(packagename='testgroupe',directoryname=tmpdir,depends='tis-firefox',description=u'Test de groupe') >>> print p >>> print p['package'].depends tis-firefox >>> import shutil >>> shutil.rmtree(tmpdir)
- make_host_template(packagename='', depends=None, conflicts=None, directoryname=None, description=None)[source]¶
- make_package_template(installer_path='', packagename='', directoryname='', section='base', description=None, depends='', version=None, silentflags=None, uninstallkey=None, maturity=None, architecture='all', target_os='all', product_name=None)[source]¶
- Build a skeleton of WAPT package based on the properties of the supplied installer
Return the path of the skeleton
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> wapt.dbpath = ':memory:' >>> files = 'c:/tmp/files' >>> if not os.path.isdir(files): ... os.makedirs(files) >>> tmpdir = 'c:/tmp/dummy' >>> devdir = wapt.make_package_template(files,packagename='mydummy',directoryname=tmpdir,depends='tis-firefox') >>> os.path.isfile(os.path.join(devdir,'WAPT','control')) True >>> p = wapt.build_package(devdir) >>> 'filename' in p and isinstance(p['files'],list) and isinstance(p['package'],PackageEntry) True >>> import shutil >>> shutil.rmtree(tmpdir)
- property max_gpo_script_wait¶
get / set the MaxGPOScriptWait.
- property merged_config_hash¶
- packages_filter_for_host()[source]¶
Returns a PackageRequest object based on host capabilities to filter applicable packages from a repo
- Returns:
PackageRequest
- packages_matching(package_request=None, query=None, args=())[source]¶
Returns the list of known packages Entries matching a PackageRequest
- Args:
package_request (PackageRequest): request
- Returns:
list (of PackageEntry)
- personal_certificate()[source]¶
Returns the personal certificates chain
- Returns:
list (of SSLCertificate). The first one is the personal certificate. The other are useful if intermediate CA are used.
- property pre_shutdown_timeout¶
get / set the pre shutdown timeout shutdown tasks.
- private_key(private_key_password=None)[source]¶
SSLPrivateKey matching the personal_certificate When key has been found, it is kept in memory for later use.
- Args:
private_key_password : password to use to decrypt key. If None, passwd_callback is called.
- Returns:
SSLPrivateKey
- Raises:
EWaptMissingPrivateKey if ket can not be decrypted or found.
- property private_key_password_callback¶
- reachable_ip()[source]¶
Return the local IP which is most probably reachable by wapt server
- In case there are several network connections, returns the local IP
which Windows choose for sending packets to WaptServer.
This can be the most probable IP which would get packets from WaptServer.
- Returns:
str: Local IP
- read_audit_data(section, key, default=None, ptype=None, include_expired_data=True)[source]¶
Retrieve the latest value associated with section/key from database
- read_audit_data_set(section, key, as_dict=False, raw_data=False, descending=True)[source]¶
Retrieve all the values associated with section/key from database
- read_audit_data_since(last_query_date=None, raw_data=False)[source]¶
Retrieve all the values associated with section/key from database
- read_param(name, default=None, ptype=None)[source]¶
read a param value from local db >>> wapt = Wapt(config_filename=’c:/wapt/wapt-get.ini’) >>> wapt.read_param(‘db_version’) u’20140410’
- read_upgrade_status()[source]¶
Return last stored pending updates status
- Returns:
dict: {running_tasks errors pending (dict) upgrades (list)}
- register_computer(description=None, retry_with_password=False)[source]¶
Send computer informations to WAPT Server if description is provided, updates local registry with new description
- Returns:
dict: response from server.
>>> wapt = Wapt() >>> s = wapt.register_computer() >>>
- registry_uninstall_snapshot()[source]¶
Return list of uninstall ID from registry launched before and after an installation to capture uninstallkey
- reload_config_if_updated()[source]¶
Check if config file has been updated, Return None if config has not changed or date of new config file if reloaded
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> wapt.reload_config_if_updated()
- remove(packages_list, force=False, only_priorities=None, only_if_not_process_running=False)[source]¶
Removes a package giving its package name, unregister from local status DB
- Args:
- packages_list (str or list or path): packages to remove (package name,
list of package requirement, package entry or development directory)
force : if True, unregister package from local status database, even if uninstall has failed
- Returns:
dict: {‘errors’: [], ‘removed’: [], ‘not_allowed’: []}
- property repositories¶
- reset_host_organizational_unit_dn()[source]¶
Reset forced host_organizational_unit_dn to AD / GPO registry defaults. If it was forced in ini file, remove setting from ini file.
- reset_host_uuid()[source]¶
Reset host uuid to bios provided UUID. If it was forced in ini file, remove setting from ini file.
- run_notfatal(*cmd, **args)[source]¶
Runs the command and wait for it termination returns output, don’t raise exception if exitcode is not null but return ‘’
- property runstatus: str¶
returns the current run status for tray display
- search(searchwords=[], exclude_host_repo=True, section_filter=None, newest_only=False)[source]¶
Returns a list of packages which have the searchwords in their description
- Args:
searchwords (str or list): words to search in packages name or description exclude_host_repo (boolean): if True, don’t search in host repoisitories. section_filter (str or list): restrict search to the specified package sections/categories
- Returns:
list: list of PackageEntry
- self_service_auth(login, password, host_uuid, groups)[source]¶
Sends login and password to server, who then checks if it’s a valid user
- Returns:
list: groups the user belongs to.
- session_setup(package, force=False)[source]¶
Setup the user session for a specific system wide installed package” Source setup.py from database or filename
- set_client_cert_auth(connection, force=False)[source]¶
Set client side ssl authentication for a waptserver or a waptrepo using host_certificate if client_certificate is not yet set in config and host certificate is able to do client_auth
- Args:
connection: object with client_certificate, client_private_key and client_auth
- set_local_password(user='admin', pwd='password')[source]¶
Set admin/password local auth for waptservice in ini file as a sha256 hex hash
- set_package_attribute(package, key, value)[source]¶
Store in local db a key/value pair for later use
- show_progress(show_box=False, msg='Loading...', progress=None, progress_max=None)[source]¶
Global hook to report progress feedback to the user
- Args:
show_box (bool): indicate to display or hide the notification msg (str): A status message to display. If None, nothing is changed. progress (float): Completion progress_max (float): Target of completion.
- sign_host_content(data)[source]¶
Sign data str with host private key with sha256 + RSA Args:
data (bytes) : data to sign
- Returns
bytes: signature of sha256 hash of data.
- sign_package(zip_or_directoryname, certificate=None, private_key_password=None, private_key=None, set_maturity=None, inc_package_release=False, keep_signature_date=False, excludes=[])[source]¶
- Calc the signature of the WAPT/manifest.sha256 file and put/replace it in ZIP or directory.
if directory, creates WAPT/manifest.sha256 and add it to the content of package create a WAPT/signature file and it to directory or zip file.
- known issueif zip file already contains a manifest.sha256 file, it is not removed, so there will be
2 manifest files in zip / wapt package.
- Args:
zip_or_directoryname: filename or path for the wapt package’s content certificate (list): certificates chain of signer. private_key (SSLPrivateKey): the private key to use private_key_password (str) : passphrase to decrypt the private key. If None provided, use self.private_key_password_callback
- Returns:
str: base64 encoded signature of manifest.sha256 file (content
- store_upgrade_status(upgrades=None)[source]¶
Stores in DB the current pending upgrades and running installs for query by waptservice
- trust_package_signer(cert)[source]¶
Add a certificate to the list of trusted package signers certificates Stores the PEM encoded data in public_certs_dir directory. The certificate is added to this current Wapt.cabundle instance for immediate use (until config is reloaded)
- uninstall(packagename, params_dict={}, force=False)[source]¶
Launch the uninstall script of an installed package” Source setup.py from database or filename
- uninstall_cmd(guid)[source]¶
return the (quiet) command stored in registry to uninstall a software given its registry key
- unregister_computer()[source]¶
Remove computer informations from WAPT Server
- Returns:
dict: response from server.
>>> wapt = Wapt() >>> s = wapt.unregister_computer() >>>
- untrust_package_signer(cert)[source]¶
Removes a certificate from the list of trusted package signers certificates based on the fingerprint of the certificate. Certs in ssl public_certs_dir with matching fingerprint are deleted.
- update(force=False, register=True)[source]¶
Update local database with packages definition from repositories
- Args:
- force (boolean): update even if Packages index on repository has not been
updated since last update (based on http headers)
register (boolean): Send informations about status of local packages to waptserver
- Returns;
list of (host package entry,entry date on server)
- Returns:
dict: {“added”,”removed”,”count”,”repos”,”upgrades”,”date”}
>>> wapt = Wapt(config_filename='c:/wapt/wapt-get.ini') >>> updates = wapt.update() >>> 'count' in updates and 'added' in updates and 'upgrades' in updates and 'date' in updates and 'removed' in updates True
- update_server_status(force=False, include_wmi=None, include_dmi=None)[source]¶
- Send host_info, installed packages and installed softwares,
and last update status informations to WAPT Server, but don’t send register info like dmi or wmi.
Changed in version 1.4.3: if last status has been properly sent to server and data has not changed, don’t push data again to server. the hash is stored in memory, so is not pass across threads or processes.
>>> wapt = Wapt() >>> s = wapt.update_server_status() >>>
- upgrade(only_priorities=None, only_if_not_process_running=False)[source]¶
Install “well known” host package from main repository if not already installed then query localstatus database for packages with a version older than repository and install all newest packages
- Args:
priorities (list of str): If not None, upgrade only packages with these priorities.
- Returns:
- dict: {‘upgrade’: [], ‘additional’: [], ‘downloads’:
{‘downloaded’: [], ‘skipped’: [], ‘errors’: []},
‘remove’: [], ‘skipped’: [], ‘install’: [], ‘errors’: [], ‘unavailable’: []}
- upload_package(filenames, wapt_server_user=None, wapt_server_passwd=None)[source]¶
Method to upload a package using Shell command (like scp) instead of http upload You must define first a command in inifile with the form :
upload_cmd=”c:Program Files”puttypscp -v -l waptserver %(waptfile)s srvwapt:/var/www/%(waptdir)s/
- or
upload_cmd=”C:Program FilesWinSCPWinSCP.exe” root@wapt.tranquilit.local /upload %(waptfile)s
- You can define a “after_upload” shell command. Typical use is to update the Packages index
after_upload=”c:Program Files”puttyplink -v -l waptserver srvwapt.tranquilit.local “python /opt/wapt/wapt-scanpackages.py /var/www/%(waptdir)s/”
- wapt_status()[source]¶
Wapt configuration and version informations
- Returns:
- dict: versions of main main files, waptservice config,
repos and waptserver config
>>> w = Wapt() >>> w.wapt_status() { 'setuphelpers-version': '1.1.1', 'waptserver': { 'wapt_server': u'tranquilit.local', 'proxies': { 'http': None, 'https': None }, 'server_url': 'https: //wapt.tranquilit.local' }, 'waptservice_protocol': 'http', 'repositories': [{ 'wapt_server': u'tranquilit.local', 'proxies': { 'http': None, 'https': None }, 'name': 'global', 'repo_url': 'http: //wapt.tranquilit.local/wapt' }, { 'wapt_server': u'tranquilit.local', 'proxies': { 'http': None, 'https': None }, 'name': 'wapt-host', 'repo_url': 'http: //srvwapt.tranquilit.local/wapt-host' }], 'common-version': '1.1.1', 'wapt-exe-version': u'1.1.1.0', 'waptservice_port': 8088, 'wapt-py-version': '1.1.1' }
- property waptserver¶
- waptserver_available()[source]¶
Test reachability of waptserver.
If waptserver is defined and available, return True, else False
- Returns:
boolean: True if server is defined and actually reachable
- property waptsessiondb: WaptSessionDB¶
Wapt user session database
- write_audit_data(section, key, value, ptype=None, value_date=None, expiration_date=None, max_count=2, keep_days=None)[source]¶
Stores in database a metrics, removes expired ones
- Args:
section (str) key (str) value (any) value_date (str or datetime): value date (utc). By default datetime.datetime.utcnow() expiration_date (str) : expiration date of the new value max_count (int) : keep at most max_count value. remove oldest one. keep_days (int) : set the expiration date to now + keep_days days. override expiration_date arg if not None
- Returns:
None
- write_audit_data_if_changed(section, key, value, ptype=None, value_date=None, expiration_date=None, max_count=2, keep_days=None)[source]¶
Write data only if different from last one
- Returns:
previous value
- property wua_repository¶
- class common.WaptBaseDB(dbpath)[source]¶
Bases:
BaseObjectClass
- curr_db_version = None¶
- property db¶
- property db_version¶
- property dbpath¶
- get_param(name, default=None, ptype=None)[source]¶
Retrieve the value associated with name from database
- query(query, args=(), one=False, as_dict=True)[source]¶
execute la requete query sur la db et renvoie un tableau de dictionnaires
- set_package_attribute(install_id, key, value)[source]¶
Store permanently a (key/value) pair in database for a given package, replace existing one
- class common.WaptDB(dbpath)[source]¶
Bases:
WaptBaseDB
Class to manage SQLite database with local installation status
- add_start_install(package_entry, params_dict={}, explicit_by=None)[source]¶
Register the start of installation in local db
- Args:
params_dict (dict) : dictionary of parameters provided on command line with –param or by the server explicit_by (str) : username of initiator of the install.
if not None, install is not a dependencie but an explicit manual install
- setuppy (str)python source code used for install, uninstall or session_setup
code used for uninstall or session_setup must use only wapt self library as package content is no longer available at this step.
- Returns:
int : rowid of the inserted install status row
- build_depends(packages, packages_filter=None)[source]¶
Given a list of packages conditions (packagename (optionalcondition)) return a list of dependencies (packages conditions) to install
- Args:
packages (list of str): list of packages requirements ( package_name(=version) )
- Returns:
(list depends,list conflicts,list missing) : tuple of (all_depends,missing_depends)
TODO : choose available dependencies in order to reduce the number of new packages to install
>>> waptdb = WaptDB(':memory:') >>> office = PackageEntry('office','0') >>> firefox22 = PackageEntry('firefox','22') >>> firefox22.depends = 'mymissing,flash' >>> firefox24 = PackageEntry('firefox','24') >>> thunderbird = PackageEntry('thunderbird','23') >>> flash10 = PackageEntry('flash','10') >>> flash12 = PackageEntry('flash','12') >>> office.depends='firefox(<24),thunderbird,mymissing' >>> firefox22.depends='flash(>=10)' >>> firefox24.depends='flash(>=12)' >>> waptdb.add_package_entry(office) >>> waptdb.add_package_entry(firefox22) >>> waptdb.add_package_entry(firefox24) >>> waptdb.add_package_entry(flash10) >>> waptdb.add_package_entry(flash12) >>> waptdb.add_package_entry(thunderbird) >>> waptdb.build_depends('office') ([u'flash(>=10)', u'firefox(<24)', u'thunderbird'], [u'mymissing'])
- curr_db_version = '20210420'¶
- install_status(id)[source]¶
Return the local install status for id
- Args:
id: sql rowid
- Returns:
dict : merge of package local install, audit and package attributes.
- installed(include_errors=False, include_setup=True)[source]¶
Return a list of installed packages on this host (status ‘OK’ or ‘UNKNWON’)
- Args:
- include_errors (bool)if False, only packages with status ‘OK’ and ‘UNKNOWN’ are returned
if True, all packages are installed.
include_setup (bool) : if True, setup.py files content is in the result rows
- Returns:
list: of installed PackageEntry
- installed_matching(package_cond, include_errors=False, include_setup=True)[source]¶
Return a list of PackageEntry if one properly installed (if include_errors=False) package match the package condition ‘tis-package (>=version)’
- Args:
package_cond (str): package requirement to lookup include_errors
- Returns:
list of PackageEntry merge with localstatus attributes WITH setuppy
- installed_packages_inventory()[source]¶
Return a list of installed packages status suitable for inventory
List of list. First line is the header (fields names)
- Returns:
list (of list): of installed packages
- installed_search(searchwords=[], include_errors=False)[source]¶
Return a list of installed package entries based on search keywords
- Returns:
list of PackageEntry merge with localstatus attributes without setuppy
- known_packages()[source]¶
Return a dict of all known packages PackageKey(s) indexed by package_uuid
- Returns:
dict {‘package_uuid’:PackageKey(package)}
- packages_audit_inventory(after_date=None)[source]¶
Return a list of packages audit status suitable for inventory
List of list. First line is the header (fields names)
- Returns:
list (of list): of installed packages
- packages_matching(package_cond)[source]¶
Return an ordered list of available packages entries which match the condition “packagename[([=<>]version)]?” version ascending
- Args:
package_cond (PackageRequest or str): filter packages and determine the ordering
- Returns:
list of PakcageEntry
- packages_search(searchwords=[], exclude_host_repo=True, section_filter=None, packages_filter=None)[source]¶
Return a list of package entries matching the search words
- Args:
searchwords (list): list of words which must be in package name or description exclude_host (bool): don’t take in account packages comming from a repo named ‘wapt-host” section_filter (list): list of packages sections to take in account packages_filter (PackageRequest): additional filters (arch, locale, maturities etc…)
to take in account for filter and sort
- Returns:
list of PackageEntry
- purge_repo(repo_name)[source]¶
remove references to repo repo_name
>>> waptdb = WaptDB('c:/wapt/db/waptdb.sqlite') >>> waptdb.purge_repo('main')
- query_package_entry(query, args=(), one=False, package_request=None)[source]¶
Execute the query on the db try to map result on PackageEntry attributes Fields which don’t match attributes are added as attributes (and listed in _calc_attributes list)
- Args:
query (str): sql query args (list): parameters of the sql query package_request (PackageRequest): keep only entries which match package_request one(bool): if True, return the highest available version (if package_request is not None, use it to compare packages)
- Result:
PackageEntry or list of PackageEntry
>>> waptdb = WaptDB(':memory:') >>> waptdb.add_package_entry(PackageEntry('toto','0',repo='main')) >>> waptdb.add_package_entry(PackageEntry('dummy','2',repo='main')) >>> waptdb.add_package_entry(PackageEntry('dummy','1',repo='main')) >>> waptdb.query_package_entry("select * from wapt_package where package=?",["dummy"]) [PackageEntry('dummy','2'), PackageEntry('dummy','1')] >>> waptdb.query_package_entry("select * from wapt_package where package=?",["dummy"],one=True) PackageEntry('dummy','2')
- remove_install_status(package=None, package_uuid=None)[source]¶
Remove status of package installation from localdb
- store_setuppy(rowid, setuppy=None, install_params={})[source]¶
Update status of package installation on localdb
- switch_to_explicit_mode(package, user_id)[source]¶
Set package install mode to manual so that package is not removed when meta packages don’t require it anymore
- update_audit_status(rowid, set_status=None, set_output=None, append_line=None, set_last_audit_on=None, set_next_audit_on=None)[source]¶
Update status of package installation on localdb
- update_install_status(rowid, set_status=None, append_line=None, uninstall_key=None, persistent_dir=None)[source]¶
Update status of package installation on localdb
- update_install_status_pid(pid, set_status='ERROR')[source]¶
Update status of package installation on localdb
- upgradeable(include_errors=True, check_version_only=True, include_setup=False)[source]¶
Return a dictionary of upgradable Package entries
- Returns:
dict ‘package’: [candidates] order by mot adequate to more generic and
- upgradeable_status()[source]¶
Return a list of upgradable Package entries. (faster than upgradeable() )
- Returns:
list of (installed PackageEntry, candidate PackageEntry)
- wapt_package_columns = {'architecture': 'varchar', 'audit_schedule': 'varchar', 'categories': 'varchar', 'changelog': 'varchar', 'conflicts': 'varchar', 'depends': 'varchar', 'description': 'varchar', 'editor': 'varchar', 'filename': 'varchar', 'forced_install_on': 'varchar', 'homepage': 'varchar', 'icon_sha256sum': 'varchar', 'id': 'INTEGER PRIMARY KEY AUTOINCREMENT', 'impacted_process': 'varchar', 'installed_size': 'integer', 'keywords': 'varchar', 'licence': 'varchar', 'locale': 'varchar', 'maintainer': 'varchar', 'maturity': 'varchar', 'max_os_version': 'varchar', 'md5sum': 'varchar', 'min_os_version': 'varchar', 'min_wapt_version': 'varchar', 'name': 'varchar', 'package': 'varchar', 'package_uuid': 'varchar', 'priority': 'varchar', 'repo': 'varchar', 'repo_url': 'varchar', 'section': 'varchar', 'signature': 'varchar', 'signature_date': 'varchar', 'signed_attributes': 'varchar', 'signer': 'varchar', 'signer_fingerprint': 'varchar', 'size': 'integer', 'sources': 'varchar', 'target_os': 'varchar', 'valid_from': 'varchar', 'valid_until': 'varchar', 'version': 'varchar'}¶
- class common.WaptHostRepo(url=None, name='wapt-host', verify_cert=None, http_proxy=None, timeout=None, host_id=None, cabundle=None, config=None, section=None, host_key=None, WAPT=None)[source]¶
Bases:
WaptRepo
Dummy http repository for host packages
>>> host_repo = WaptHostRepo(name='wapt-host',host_id=['0D2972AC-0993-0C61-9633-529FB1A177E3','4C4C4544-004E-3510-8051-C7C04F325131']) >>> host_repo.load_config_from_file(r'C:\Users\htouvet\AppData\Local\waptconsole\waptconsole.ini') >>> host_repo.packages() [PackageEntry('0D2972AC-0993-0C61-9633-529FB1A177E3','10') , PackageEntry('4C4C4544-004E-3510-8051-C7C04F325131','30') ]
- download_packages(package_requests, target_dir=None, usecache=True, printhook=None)[source]¶
Download a list of packages from repo
- Args:
package_request (list,PackageEntry): a list of PackageEntry to download target_dir (str): where to store downloaded Wapt Package files usecache (bool): wether to try to use cached Wapt files if checksum is ok printhook (callable): to show progress of download
- Returns:
dict: {“downloaded”:[local filenames],”skipped”:[filenames in cache],”errors”:[],”packages”:self.packages()}
- property host_id¶
- is_available()[source]¶
Check if repo is reachable an return creation date of Packages.
Try to access the repo and return last modified date of repo index or None if not accessible
- Returns:
str: Iso creation date of remote Package file as returned in http headers
>>> repo = WaptRemoteRepo(name='main',url='https://wapt/wapt',timeout=1) >>> repo.is_available() <= datetime2isodate() True >>> repo = WaptRemoteRepo(name='main',url='https://badwapt/wapt',timeout=1) >>> repo.is_available() is None True
- load_config(config, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file
- property repo_url¶
Repository URL
Fixed url if none is set in wapt-get.ini by querying the server.
The URL is queried once and then cached into a local property.
- Returns:
str: url to the repository
>>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> print repo.repo_url http://srvwapt.tranquilit.local/wapt
- class common.WaptPackageAuditLogger(console, wapt_context=None, install_id=None, user=None, running_status='RUNNING', exit_status=None, error_status='ERROR')[source]¶
Bases:
LogOutput
Context handler to log all print messages to a wapt package audit log
- Args:
console (file) : sys.stderr wapt_context (Wapt): Wapt instance install_id (int): name of running or installed package local status where to log status and output
>>>
- class common.WaptPackageInstallLogger(console, wapt_context=None, install_id=None, user=None, running_status='RUNNING', exit_status='OK', error_status='ERROR')[source]¶
Bases:
LogOutput
Context handler to log all print messages to a wapt package install log
- Args:
wapt_context (Wapt): Wapt instance package_name (str): name of running or installed package local status where to log status and output
>>>
- class common.WaptPackageSessionSetupLogger(console, waptsessiondb, install_id, running_status='RUNNING', exit_status=None, error_status='ERROR')[source]¶
Bases:
LogOutput
Context handler to log all print messages to a wapt package install log
- Args:
wapt_context (Wapt): Wapt instance package_name (str): name of running or installed package local status where to log status and output
>>>
- class common.WaptRepo(url=None, name='wapt', verify_cert=None, http_proxy=None, timeout=None, cabundle=None, config=None, section=None, WAPT=None)[source]¶
Bases:
WaptRemoteRepo
Gives access to a remote http repository, with a zipped Packages packages index Find its repo_url based on * repo_url explicit setting in ini config section [<name>] * if there is some rules use rules >>> repo = WaptRepo(name=’main’,url=’http://wapt/wapt’,timeout=4) >>> packages = repo.packages() >>> len(packages)
- property WAPT¶
- property cached_wapt_repo_url¶
- load_config(config, section=None)[source]¶
Load waptrepo configuration from inifile section.
Use name of repo as section name if section is not provided. Use ‘global’ if no section named section in ini file
- property proxies¶
dict for http proxies url suitable for requests based on the http_proxy repo attribute
- Returns:
dict: {‘http’:’http://proxy:port’,’https’:’http://proxy:port’}
- property repo_url¶
Repository URL
Fixed url if none is set in wapt-get.ini by querying the server.
The URL is queried once and then cached into a local property.
- Returns:
str: url to the repository
>>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> repo = WaptRepo(name='wapt',timeout=4) >>> print repo.wapt_server http://wapt.wapt.fr/ >>> print repo.repo_url http://srvwapt.tranquilit.local/wapt
- property rules¶
- class common.WaptServer(url=None, proxies={'http': None, 'https': None}, timeout=5.0, dnsdomain=None, name='waptserver')[source]¶
Bases:
BaseObjectClass
Manage connection to waptserver
- auth(action=None, enable_password_callback=True)[source]¶
Return an authenticator for the action. This is basically a tuple (user,password) or a more sophisticated AuthBase descendent. The motivation for action is to not request password from user if not needed. Or enable kerberos for specific endpoint.
- Args:
action (str): for which an authenticator is requested
- Returns:
tuple or requests.auth.AuthBase descendant instance
- client_auth()[source]¶
Return SSL pair (cert,key) filenames for client side SSL auth
- Returns:
tuple: (cert path,key path,strkeypassword)
- get(action, auth=None, timeout=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True, decode_json=True)[source]¶
Make a get http request to the server, and return result decoded from json This assuems remo
- Args:
action (str): doc part of the url auth (tuple): authentication passed to requests (user,password) enable_capture_external_ip (bool) : if true and callback is defined, try to get external IP from X-Remote-IP header
and set it though self.capture_external_ip_callback
enable_password_callback (bool) : if true, password callback will be called if needed.
- Returns:
dict : response returned from server as json data.
- head(action, auth=None, timeout=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True)[source]¶
- load_config_from_file(config_filename, section='global')[source]¶
Load waptserver configuration from an inifile located at config_filename
- Args:
config_filename (str) : path to wapt inifile section (str): ini section from which to get parameters. default to ‘global’
- Returns:
WaptServer: self
- post(action, data=None, files=None, auth=None, timeout=None, signature=None, signer=None, content_length=None, use_ssl_auth=True, enable_capture_external_ip=True, enable_password_callback=True, max_retry_count=3)[source]¶
Post data to waptserver using http POST method
Add a signature to the posted data using host certificate.
Posted Body is gzipped
- Args:
action (str): doc part of the url data (str) : posted data body files (list or dict) : list of filenames
- Returns:
dict : response returned from server as json data.
- save_server_certificate(server_ssl_dir=None, overwrite=False)[source]¶
Retrieve certificate of https server for further checks
- Args:
server_ssl_dir (str): Directory where to save x509 certificate file
- Returns:
str : full path to x509 certificate file.
- property server_url¶
Return fixed url if any
>>> server = WaptServer(timeout=4) >>> print server.server_url https://wapt.tranquil.it
- class common.WaptSessionDB(username='')[source]¶
Bases:
WaptBaseDB
- add_start_install(package_entry)[source]¶
Register the start of installation in local db
- Returns:
int : rowid of the inserted record
- curr_db_version = '20201217'¶
- remove_install_status(package)[source]¶
Remove status of package installation from localdb
>>> wapt = Wapt() >>> wapt.forget_packages('tis-7zip') ???
- remove_obsolete_install_status(installed_packages)[source]¶
Remove local user status of packages no more installed
- common.check_is_member_of(huser, group_name)[source]¶
Check if a user is a member of a group
- Args:
huser (handle) : pywin32 group_name (str) : group
>>> from win32security import LogonUser >>> hUser = win32security.LogonUser ('technique','tranquilit','xxxxxxx',win32security.LOGON32_LOGON_NETWORK,win32security.LOGON32_PROVIDER_DEFAULT) >>> check_is_member_of(hUser,'domain admins') False
- common.check_user_membership(user_name, password, domain_name, group_name)[source]¶
Check if a user is a member of a group
- Args:
user_name (str): user password (str): domain_name (str) : If empty, check local then domain group_name (str): group
>>> from win32security import LogonUser >>> hUser = win32security.LogonUser ('technique','tranquilit','xxxxxxx',win32security.LOGON32_LOGON_NETWORK,win32security.LOGON32_PROVIDER_DEFAULT) >>> check_is_member_of(hUser,'domain admins') False
- common.get_domain_admins_group_name()[source]¶
- Return localized version of domain admin group (ie “domain admins” or
“administrateurs du domaine” with RID -512)
>>> get_domain_admins_group_name() u'Domain Admins'
- common.lookup_name_from_rid(domain_controller, rid)[source]¶
- return username or group name from RID (with localization if applicable)
from https://mail.python.org/pipermail/python-win32/2006-May/004655.html domain_controller : should be a DC rid : integer number (512 for domain admins, 513 for domain users, etc.)
>>> lookup_name_from_rid('srvads', DOMAIN_GROUP_RID_ADMINS) u'Domain Admins'
- common.sid_from_rid(domain_controller, rid)[source]¶
Return SID structure based on supplied domain controller’s domain and supplied rid rid can be for example DOMAIN_GROUP_RID_ADMINS, DOMAIN_GROUP_RID_USERS
- common.wapt_sources_edit(wapt_sources_dir: str, editor_for_packages: Optional[str] = None) str [source]¶
- Utility to open PyScripter or the configured editor with package sources if it is installed
else open the wapt package development directory in System Shell Explorer.
- Args
wapt_sources_dir (str): directory path of the wapt package sources
- Returns:
str: sources path