#!/usr/bin/env python3
##
## -----------------------------------------------------------------
## This file is part of WAPT Software Deployment
## Copyright (C) 2012 - 2020 Tranquil IT https://www.tranquil.it
## All Rights Reserved.
##
## WAPT helps systems administrators to efficiently deploy
## setup, update and configure applications.
## ------------------------------------------------------------------
##
import glob
import locale
import logging
import os
import platform
import shutil
import socket
import stat
import subprocess
import sys
import time
import psutil
import netifaces
import platform
import time
import getpass
from iniparse import RawConfigParser
from waptpackage import PackageEntry
from waptutils import (Version, __version__, all_files, dateof,
datetime2isodate, ensure_list, ensure_unicode,
fileisodate, find_all_files, get_disk_free_space,
hours_minutes, httpdatetime2isodate, isodate2datetime,
time2display, wget, wgets, makepath, killtree, isfile, isdir,
CalledProcessErrorOutput, remove_file, mkdirs, get_main_ip, get_local_IPs,
killalltasks, isrunning, get_sha256, CustomZipFile,
run,run_notfatal,CalledProcessError,RunOutput,RunReader,listening_sockets)
from urllib.request import getproxies
import bs4 as BeautifulSoup
import requests
import uuid
import hashlib
__all__ = []
__all__.extend([
'CalledProcessError',
'PackageEntry',
'Version',
'__version__',
'add_double_quotes_around',
'all_files',
'application_data',
'bs_find',
'bs_find_all',
'control',
'copytree2',
'currentdate',
'currentdatetime',
'dateof',
'datetime2isodate',
'default_gateway',
'default_oncopy',
'default_overwrite',
'default_overwrite_older',
'default_skip',
'dir_is_empty',
'dmi_info',
'ensure_dir',
'ensure_list',
'ensure_unicode',
'error',
'file_is_locked',
'filecopyto',
'fileisodate',
'find_all_files',
'find_processes',
'get_computer_groups',
'get_computername',
'get_config_package_filename',
'get_current_user',
'get_default_gateways',
'get_disk_free_space',
'get_dns_servers',
'get_fqdn',
'get_hostname',
'get_language',
'get_last_logged_on_user',
'get_local_IPs',
'get_loggedinusers',
'get_os_name',
'get_proxies',
'get_proxies_from_wapt_console',
'get_main_ip',
'get_os_version',
'get_sha256',
'glob',
'host_info',
'host_metrics',
'hours_minutes',
'httpdatetime2isodate',
'inifile_deleteoption',
'inifile_deletesection',
'inifile_hasoption',
'inifile_hassection',
'inifile_readstring',
'inifile_writestring',
'installed_softwares',
'install_config_package',
'is64',
'is32',
'isARM',
'isARM64',
'isdir',
'isfile',
'isodate2datetime',
'isrunning',
'json_write_file',
'json_load_file',
'killalltasks',
'killtree',
'listening_sockets',
'logger',
'makepath',
'mkdirs',
'networking',
'os',
'params',
'processes_for_file',
'remove_config_package',
'remove_file',
'remove_outdated_binaries',
'remove_tree',
'run',
'run_notfatal',
'running_on_ac',
'shell_launch',
'shutil',
'time2display',
'unzip',
'user_home_directory',
'wget',
'wgets',
])
# Conditionnal imports for setuphelpers
if platform.system() == 'Windows':
from setuphelpers_windows import *
__all__.extend([
'EnsureWUAUServRunning',
'HKEY_CLASSES_ROOT',
'HKEY_CURRENT_CONFIG',
'HKEY_CURRENT_USER',
'HKEY_LOCAL_MACHINE',
'HKEY_USERS',
'InstallerTypes',
'KEY_ALL_ACCESS',
'KEY_READ',
'KEY_WRITE',
'REG_DWORD',
'REG_EXPAND_SZ',
'REG_MULTI_SZ',
'REG_SZ',
'RunOutput',
'RunReader',
'TimeoutExpired',
'WindowsVersions',
'add_shutdown_script',
'add_to_system_path',
'add_user_to_group',
'adjust_current_privileges',
'battery_lifetime',
'battery_percent',
'bookmarks',
'common_desktop',
'create_daily_task',
'create_desktop_shortcut',
'create_group',
'create_onetime_task',
'create_programs_menu_shortcut',
'create_shortcut',
'create_user',
'create_user_desktop_shortcut',
'create_user_programs_menu_shortcut',
'critical_system_pending_updates',
'default_user_appdata',
'default_user_local_appdata',
'delete_at_next_reboot',
'delete_group',
'delete_task',
'delete_user',
'desktop',
'disable_file_system_redirection',
'disable_task',
'enable_task',
'fix_wmi',
'get_all_scheduled_tasks',
'get_antivirus_info',
'get_app_install_location',
'get_app_path',
'get_appath',
'get_computer_description',
'get_computer_domain',
'get_default_app',
'get_domain_fromregistry',
'get_file_properties',
'get_file_assocation',
'get_installer_defaults_win',
'get_language_code',
'get_local_profiles',
'get_msi_properties',
'get_powershell_str',
'get_product_props',
'get_profile_path',
'get_profiles_users',
'get_shortcut_properties',
'get_service_start_mode',
'get_task',
'get_user_from_sid',
'get_version_from_binary',
'getsilentflags',
'getscreens',
'install_exe_if_needed',
'install_location',
'install_msi_if_needed',
'installed_windows_updates',
'InstallerTypes',
'iswin64',
'is_kb_installed',
'is_pending_reboot',
'list_local_printers',
'local_admins',
'local_desktops',
'local_drives',
'local_group_members',
'local_group_memberships',
'local_groups',
'local_users',
'local_users_profiles',
'memory_status',
'messagebox',
'my_documents',
'need_install',
'pending_reboot_reasons',
'programdata',
'programfiles',
'programfiles32',
'programfiles64',
'programs',
'win32com_ensure_dispatch_patch',
'reboot_machine',
'recent',
'reg_closekey',
'reg_delvalue',
'reg_enum_subkeys',
'reg_enum_values',
'reg_getvalue',
'reg_key_exists',
'reg_openkey_noredir',
'reg_setvalue',
'reg_value_exists',
'register_dll',
'register_ext',
'register_uninstall',
'register_windows_uninstall',
'registered_organization',
'registry_delete',
'registry_deletekey',
'registry_readstring',
'registry_set',
'registry_setstring',
'remove_appx',
'remove_desktop_shortcut',
'remove_from_system_path',
'remove_metroapp',
'remove_printer',
'remove_programs_menu_folder',
'remove_programs_menu_shortcut',
'remove_shutdown_script',
'remove_user_desktop_shortcut',
'remove_user_from_group',
'remove_user_programs_menu_folder',
'remove_user_programs_menu_shortcut',
'replace_at_next_reboot',
'run_as_administrator',
'run_notfatal',
'run_powershell',
'run_powershell_from_file',
'run_task',
'running_as_admin',
'running_as_system',
'sendto',
'service_delete',
'service_installed',
'service_is_running',
'service_is_stopped',
'service_restart',
'service_start',
'service_stop',
'set_computer_description',
'set_environ_variable',
'set_file_hidden',
'set_file_visible',
'set_service_start_mode',
'showmessage',
'shutdown_scripts_ui_visible',
'start_menu',
'startup',
'system32',
'systemdrive',
'task_exists',
'taskscheduler',
'uac_enabled',
'uninstall_cmd',
'uninstall_key_exists',
'unregister_dll',
'unregister_uninstall',
'unset_environ_variable',
'unzip_with_7zip',
'user_appdata',
'user_desktop',
'user_local_appdata',
'wait_uninstallkey_absent',
'wait_uninstallkey_present',
'win_startup_info',
'wincomputername',
'windomainname',
'windows_version',
'winshell',
'wmi_as_struct',
'wmi_info',
'wmi_info_basic',
'wua_agent_version',
'getscreens'
])
else:
# UNIX functions
__all__.extend([
'application_data',
'default_gateway',
'dmi_info',
'get_computername',
'get_computername',
'get_current_user',
'get_default_gateways',
'get_dns_servers',
'get_domain_from_socket',
'get_domain_info',
'get_file_properties',
'get_groups',
'get_kernel_version',
'get_last_logged_on_user',
'get_loggedinusers',
'host_info_common_unix',
'host_metrics',
'is_valid_ipv4_address',
'local_drives',
'networking',
'user_appdata',
'user_local_appdata',
])
if platform.system() == 'Darwin':
from setuphelpers_macos import *
__all__.extend([
'brew_install',
'brew_uninstall',
'get_applications_info_files',
'get_info_plist_path',
'get_installed_pkgs',
'get_pkg_info',
'get_plist_obj',
'install_app',
'install_dmg',
'install_pkg',
'is_dmg_installed',
'is_local_app_installed',
'is_local_pkg_installed',
'mount_dmg',
'uninstall_app',
'uninstall_pkg',
'unmount_dmg',
])
elif platform.system() == 'Linux':
from setuphelpers_linux import *
__all__.extend([
'is_debian_based',
'is_redhat_based',
'is_rhel_based',
'is_linux64',
'isLinux64',
'install_apt',
'uninstall_apt',
'install_deb',
'purge_deb',
'get_code_name_version',
'get_distrib_version',
'get_distrib_linux',
'install_package_if_needed',
'install_required_dependencies_apt',
'autoremove_apt',
'install_yum',
'uninstall_yum',
'autoremove_yum',
'update_apt',
'upgrade_apt',
'update_yum',
'upgrade_yum',
'type_debian',
'type_redhat',
'install_rpm',
'systemd_start_service',
'systemd_stop_service',
'systemd_restart_service',
'systemd_status_service',
])
logger = logging.getLogger('waptcore')
[docs]def user_home_directory():
return os.path.expanduser("~")
[docs]def get_os_name():
r"""Get the name of the current running operating system
Returns:
str: Windows, Linux, Darwin
>>> get_os_name()
'Windows'
"""
return platform.system()
[docs]def json_load_file(json_file):
r"""Get content of a json file
Args:
path (str): path to the file
Returns:
dictionary (dict)
"""
with open(json_file) as read_file:
return json.load(read_file)
[docs]def json_write_file(json_file, data, indent=4, sort_keys=False, encoding='utf-8'):
r"""Write dictionary to a json file
Args:
json_file (str): path to json file
data (dict): dictionary content
indent (str or list of str): tabulation size ; default: 4 spaces
sort_keys (bool): alphabetical order or not
encoding (bool): file encoding
"""
with open(json_file, 'w', encoding=encoding) as write_file:
json.dump(data, write_file, sort_keys=sort_keys, indent=indent)
[docs]def get_proxies():
r"""Return system proxy with the urllib python library
>>> get_proxies()
{'http': 'http://srvproxy.ad.domain.lan:8080',
'https': 'http://srvproxy.ad.domain.lan:8080'}
"""
return getproxies()
[docs]def get_proxies_from_wapt_console():
r"""Return proxy information from the current user WAPT console
>>> get_proxies_from_wapt_console()
{'http': 'http://srvproxy.ad.domain.lan:8080',
'https': 'http://srvproxy.ad.domain.lan:8080'}
"""
proxies = {}
if platform.system() == 'Windows':
waptconsole_ini_path = makepath(user_local_appdata(), 'waptconsole', 'waptconsole.ini')
else:
waptconsole_ini_path = makepath(user_home_directory(), '.config', 'waptconsole', 'waptconsole.ini')
if isfile(waptconsole_ini_path):
proxy_wapt = inifile_readstring(waptconsole_ini_path, 'global', 'http_proxy')
if proxy_wapt:
proxies = {'http': proxy_wapt, 'https': proxy_wapt}
return proxies
[docs]def bs_find(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs):
r"""Parse html web page with BeautifulSoup and get the first result
Args:
url (str): url of the web page to parse
element (str): searched element
attribute (str): selected attribute of the element
value (str): value of the selected attribute
user_agent (str): specify a user-agent if needed
proxies (dict): specify your proxy if needed
**kwargs (str): joker for requests parameters
features (str): bs feature to use
>>> bs_find('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')['href']
'https://web-platform-tests.org/'
>>> bs_find('https://www.w3.org/', 'span', 'class', 'alt-logo').string
'W3C'
.. versionadded:: 2.0
"""
if user_agent:
page = requests.get(url, proxies=proxies, headers={'User-Agent':'%s' % user_agent}, **kwargs).text
else:
page = requests.get(url, proxies=proxies, **kwargs).text
soup = BeautifulSoup.BeautifulSoup(page, features=features)
if value:
return soup.find(element, {attribute: value})
else:
return soup.find(element)
[docs]def bs_find_all(url, element, attribute=None, value=None, user_agent=None, proxies=None, features='html.parser', **kwargs):
r"""Parse html web page with BeautifulSoup and get a list of the result
Args:
url (str): url of the web page to parse
element (str): searched element
attribute (str): selected attribute of the element
value (str): value of the selected attribute
user_agent (str): specify a user-agent if needed
proxies (dict): specify your proxy if needed
**kwargs (str): joker for requests parameters
features (str): bs feature to use
>>> bs_find_all('https://www.w3.org/', 'a', 'title', 'Open Web Platform testing')[0]['href']
'https://web-platform-tests.org/'
>>> bs_find_all('https://www.w3.org/', 'span', 'class', 'alt-logo')[0].string
'W3C'
.. versionadded:: 2.0
"""
if user_agent:
page = requests.get(url, proxies=proxies, headers={'User-Agent':'%s' % user_agent}, **kwargs).text
else:
page = requests.get(url, proxies=proxies, **kwargs).text
soup = BeautifulSoup.BeautifulSoup(page, features=features)
if value:
return soup.find_all(element, {attribute:value})
else:
return soup.find_all(element)
[docs]def remove_outdated_binaries(version, list_extensions=["exe", "msi", "deb", "rpm", "dmg", "pkg"], filename_contains=None):
r"""Remove files based on the version contained in his filename, failing over on file version on compatible OSes
Args:
version (str): version number of keeped files
list_extensions (str or list of str): file extensions of compared files
filename_contains (str or list of str): Part of the filename that must be contained (useful for distinguishing architecture and os)
Returns:
list: list of deleted files
.. versionadded:: 2.0
.. versionchanged:: 2.2
Now returns removed files, now checking .exe and .msi file versions
"""
files = []
if type(list_extensions) != list:
list_extensions = [list_extensions]
if filename_contains:
if type(filename_contains) != list:
filename_contains = [filename_contains]
list_extensions = ["." + ext for ext in list_extensions if ext[0] != "."]
for file_ext in list_extensions:
for bin_in_dir in glob.glob("*%s" % file_ext):
if not version in bin_in_dir:
if platform.system() == "Windows":
if file_ext == ".exe" or file_ext == ".msi":
if Version(version) == Version(get_version_from_binary(bin_in_dir, "FileVersion")) or Version(version) == Version(
get_version_from_binary(bin_in_dir, "ProductVersion")
):
print("%s file or product version is correct (%s)" % (bin_in_dir, version))
continue
remove_file(bin_in_dir)
files.append(bin_in_dir)
if filename_contains:
for filename_contain in filename_contains:
if not filename_contain in bin_in_dir:
remove_file(bin_in_dir)
files.append(bin_in_dir)
return [fn for fn in files]
[docs]def get_language(full_locale=False, separator='_'):
"""Get the os default locale (example: fr, en, pl, etc.)
>>> get_language()
'fr'
>>> get_language(full_locale=True)
'fr_FR'
>>> get_language(full_locale=True, separator='-').lower()
'fr-fr'
"""
if sys.platform == 'win32':
lang = locale.windows_locale[ctypes.windll.kernel32.GetUserDefaultUILanguage()]
else:
lang = locale.getdefaultlocale()[0]
if lang==None:
lang='en_US'
if full_locale:
return lang.replace('_', separator)
return lang.split('_')[0]
[docs]def add_double_quotes_around(string):
r"""Return the string with double quotes around
Args:
string (str): a string
"""
return '"'+string+'"'
[docs]def filecopyto(filename, target):
"""Copy file from absolute or package temporary directory to target directory
If file is dll or exe, logs the original and new version.
Args:
filename (str): absolute path to file to copy,
or relative path to temporary package install content directory.
target (str) : absolute path to target directory where to copy file.
target is either a full filename or a directory name
if filename is .exe or .dll, logger prints version numbers
>>> if not os.path.isfile('c:/tmp/fc.test'):
... with open('c:/tmp/fc.test','wb') as f:
... f.write('test')
>>> if not os.path.isdir('c:/tmp/target'):
... os.mkdir('c:/tmp/target')
>>> if os.path.isfile('c:/tmp/target/fc.test'):
... os.unlink('c:/tmp/target/fc.test')
>>> filecopyto('c:/tmp/fc.test','c:/tmp/target')
>>> os.path.isfile('c:/tmp/target/fc.test')
True
"""
(dir, fn) = os.path.split(filename)
if not dir:
dir = os.getcwd()
if os.path.isdir(target):
target = os.path.join(target, os.path.basename(filename))
if os.path.isfile(target):
if os.path.splitext(target)[1] in ('.exe', '.dll'):
try:
ov = get_file_properties(target)['FileVersion']
nv = get_file_properties(filename)['FileVersion']
logger.info('Replacing %s (%s) -> %s' % (ensure_unicode(target), ov, nv))
except:
logger.info('Replacing %s' % target)
else:
logger.info('Replacing %s' % target)
else:
if os.path.splitext(target)[1] in ('.exe', '.dll'):
try:
nv = get_file_properties(filename)['FileVersion']
logger.info('Copying %s (%s)' % (ensure_unicode(target), nv))
except:
logger.info('Copying %s' % (ensure_unicode(target)))
else:
logger.info('Copying %s' % (ensure_unicode(target)))
shutil.copy(filename, target)
# Copy of an entire tree from install temp directory to target
[docs]def default_oncopy(msg, src, dst):
logger.debug('%s : "%s" to "%s"' % (ensure_unicode(msg), ensure_unicode(src), ensure_unicode(dst)))
return True
[docs]def default_skip(src, dst):
return False
[docs]def default_overwrite(src, dst):
return True
[docs]def default_overwrite_older(src, dst):
if os.stat(src).st_mtime <= os.stat(dst).st_mtime:
logger.debug('Skipping, file on target is newer than source: "%s"' % (dst,))
return False
else:
logger.debug('Overwriting file on target is older than source: "%s"' % (dst,))
return True
[docs]def dir_is_empty(path):
"""Check if a directory is empty"""
return isdir(path) and len(os.listdir(path)) == 0
[docs]def file_is_locked(path, timeout=5):
"""Check if a file is locked. waits timout seconds for the release"""
count = timeout
while count > 0:
try:
f = open(path, 'ab')
f.close()
return False
except IOError as e:
if e.errno == 13:
count -= 1
if count < 0:
return True
else:
print('Waiting for %s to be released...' % path)
time.sleep(1)
else:
raise
return True
[docs]def copytree2(src, dst, ignore=None, onreplace=default_skip, oncopy=default_oncopy, enable_replace_at_reboot=True):
r"""Copy src directory to dst directory. dst is created if it doesn't exists
src can be relative to installation temporary dir
oncopy is called for each file copy. if False is returned, copy is skipped
onreplace is called when a file will be overwritten.
Args:
src (str): path to source directory (absolute path or relative to package extraction tempdir)
dst (str): path to target directory (created if not present)
ignore (func) : callback func(root_dir,filenames) which returns names to ignore
onreplace (func) : callback func(src,dst):boolean called when a file will be replaced to decide what to do.
default is to not replace if target exists. can be default_overwrite or default_overwrite_older or
custom function.
oncopy (func) : callback func(msg,src,dst) called when a file is copied.
default is to log in debug level the operation
enable_replace_at_reboot (boolean): if True, files which are locked will be scheduled for replace at next reboot
Returns:
Raises:
>>> copytree2(r'c:\tranquilit\wapt\tests',r'c:\tranquilit\wapt\tests2')
>>> isdir(r'c:\tranquilit\wapt\tests2')
True
>>> remove_tree(r'c:\tranquilit\wapt\tests2')
>>> isdir(r'c:\tranquilit\wapt\tests2')
False
"""
logger.debug('Copy tree from "%s" to "%s"' % (ensure_unicode(src), ensure_unicode(dst)))
# path relative to temp directory...
tempdir = os.getcwd()
if not os.path.isdir(src) and os.path.isdir(os.path.join(tempdir, src)):
src = os.path.join(tempdir, src)
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
if not os.path.isdir(dst):
if oncopy('create directory', src, dst):
os.makedirs(dst)
errors = []
skipped = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if os.path.isdir(srcname):
if oncopy('directory', srcname, dstname):
copytree2(srcname, dstname, ignore=ignore, onreplace=onreplace, oncopy=oncopy)
else:
try:
if os.path.isfile(dstname):
if onreplace(srcname, dstname) and oncopy('overwrites', srcname, dstname):
os.unlink(dstname)
shutil.copy2(srcname, dstname)
else:
if oncopy('copy', srcname, dstname):
shutil.copy2(srcname, dstname)
except (IOError, os.error) as e:
# file is locked...
if enable_replace_at_reboot and e.errno in (5, 13):
filecopyto(srcname, dstname+'.pending')
replace_at_next_reboot(tmp_filename=dstname+'.pending', target_filename=dstname)
else:
raise
except (IOError, os.error) as why:
logger.critical('Error copying from "%s" to "%s" : %s' % (ensure_unicode(src), ensure_unicode(dst), ensure_unicode(why)))
errors.append((srcname, dstname, str(why)))
# catch the Error from the recursive copytree so that we can
# continue with other files
except shutil.Error as err:
# errors.extend(err.args[0])
errors.append(err)
try:
shutil.copystat(src, dst)
except WindowsError:
# can't copy file access times on Windows
pass
except OSError as why:
errors.extend((src, dst, str(why)))
if errors:
raise shutil.Error(errors)
[docs]def shell_launch(cmd):
"""Launch a command (without arguments) but doesn't wait for its termination
>>> open('c:/tmp/test.txt','w').write('Test line')
>>> shell_launch('c:/tmp/test.txt')
"""
os.startfile(cmd)
[docs]def processes_for_file(filepath, open_files=True, dll=True):
"""Generator returning processes currently having a open file descriptor for filepath
If not running as System account, can not access system processes.
Args:
filepath (str): file path or pattern (glob *)
Returns:
iterator psutil.Process
"""
for process in psutil.process_iter():
if dll:
try:
for dllproc in process.memory_maps():
if glob.fnmatch.fnmatch(dllproc.path, filepath):
yield process
break
except Exception as e:
# often : psutil.AccessDenied
pass
if open_files:
try:
for open_file in process.open_files():
if glob.fnmatch.fnmatch(open_file.path, filepath):
yield process
break
except Exception as e:
# often : psutil.AccessDenied
pass
[docs]def find_processes(process_name):
"""Return list of Process names process_name
Args:
process_name (str): process name to lookup
Returns:
list: list of processes (Process) named process_name or process_name.exe
>>> [p.pid for p in find_processes('explorer')]
[2756, 4024]
"""
process_name = process_name.lower()
result = []
for p in psutil.process_iter():
try:
if p.name().lower() in [process_name, process_name+'.exe']:
result.append(p)
except (psutil.AccessDenied, psutil.NoSuchProcess):
pass
return result
def get_domain():
"""Return main DNS domain of the computer
Returns:
str: domain name
>>> get_domain_fromregistry()
u'tranquilit.local'
"""
if sys.platform == 'win32':
return get_domain_fromregistry()
elif sys.platform.startswith('linux'):
return get_domain_from_socket()
[docs]def inifile_hasoption(inifilename, section, key):
"""Check if an option is present in section of the inifile
Args:
inifilename (str): Path to the ini file
section (str): section
key (str): value key to check
Returns:
boolean : True if the key exists
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version')
True
>>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','dontexist')
False
"""
inifile = RawConfigParser()
inifile.read(inifilename)
return inifile.has_section(section) and inifile.has_option(section, key)
[docs]def inifile_hassection(inifilename, section):
"""Check if an option is present in section of the inifile
Args:
inifilename (str): Path to the ini file
section (str): section
Returns:
boolean : True if the key exists
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_hassection('c:/tranquilit/wapt/tests/test.ini','global')
True
"""
inifile = RawConfigParser()
inifile.read(inifilename)
return inifile.has_section(section)
[docs]def inifile_deleteoption(inifilename, section, key):
"""Remove a key within the section of the inifile
Args:
inifilename (str): Path to the ini file
section (str): section
key (str): value key of option to remove
Returns:
boolean : True if the key/option has been removed
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version')
True
>>> print inifile_deleteoption('c:/tranquilit/wapt/tests/test.ini','global','version')
False
"""
inifile = RawConfigParser()
inifile.read(inifilename)
inifile.remove_option(section, key)
inifile.write(open(inifilename, 'w'))
return inifile.has_section(section) and not inifile.has_option(section, key)
[docs]def get_os_version():
if sys.platform == 'win32':
return windows_version()
if sys.platform.startswith('linux'):
return get_distrib_version()
if sys.platform == 'darwin':
return platform.mac_ver()[0]
return ('OS not supported')
[docs]def is64():
return platform.machine().endswith('64')
[docs]def is32():
return platform.machine().endswith('86')
[docs]def isARM():
return 'arm' in platform.machine().lower()
[docs]def isARM64():
return 'aarch64' in platform.machine().lower()
[docs]def inifile_deletesection(inifilename, section):
"""Remove a section within the inifile
Args:
inifilename (str): Path to the ini file
section (str): section to remove
Returns:
boolean : True if the section has been removed
"""
inifile = RawConfigParser()
inifile.read(inifilename)
inifile.remove_section(section)
inifile.write(open(inifilename, 'w'))
return not inifile.has_section(section)
[docs]def inifile_readstring(inifilename, section, key, default=None):
"""Read a string parameter from inifile
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2')
>>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version')
1.1.2
>>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','undefaut','defvalue')
defvalue
"""
inifile = RawConfigParser()
inifile.read(inifilename)
if inifile.has_section(section) and inifile.has_option(section, key):
return inifile.get(section, key)
else:
return default
[docs]def inifile_writestring(inifilename, section, key, value):
r"""Write a string parameter to inifile
>>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.1')
>>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version')
1.1.1
"""
inifile = RawConfigParser()
inifile.read(inifilename)
if not inifile.has_section(section):
inifile.add_section(section)
inifile.set(section, key, value)
inifile.write(open(inifilename, 'w'))
[docs]def get_fqdn():
return ensure_unicode(get_hostname()) if os.name == 'nt' else ensure_unicode(socket.getfqdn()).lower()
[docs]def running_as_system():
"""Dirty way to check if current process is running as system user
"""
user = getpass.getuser()
return user.endswith('$') and user[:-1].upper() == get_computername().upper()
def get_json_config_filename(config_name):
return os.path.join(os.path.dirname(__file__), 'conf.d', config_name+".json")
[docs]def get_config_package_filename(config_name):
return get_json_config_filename(config_name)
def get_config_certificate_filename(config_name, crt_fingerprint):
return os.path.join(os.path.dirname(__file__), 'ssl', config_name+"-"+crt_fingerprint+".crt")
def get_config_certificate_server_filename(config_name, crt_fingerprint):
return os.path.join(os.path.dirname(__file__), 'ssl', "server" , config_name+"-"+crt_fingerprint+".crt")
def get_config_package_certificates_filenames(config_name):
result = []
conf_filename = get_config_package_filename(config_name)
if not os.path.isfile(conf_filename):
return None
with open(conf_filename, "r") as file:
crts = json.load(file).get('certificates', [])
for key in crts:
result.append(get_config_certificate_filename(config_name, key))
return result
def get_config_package_certificates_server_filenames(config_name):
result = []
conf_filename = get_config_package_filename(config_name)
if not os.path.isfile(conf_filename):
return None
with open(conf_filename, "r") as file:
crts = json.load(file).get('global', {}).get('verify_cert')
if crts and crts.lower() in ["0","1","true","false"]:
return
return crts
def install_json_config(json_config_str, config_name=None, priority=None):
try:
conf = json.loads(json_config_str)
if 'reposync' in conf:
conf['repo-sync'] = conf['reposync']
del conf['reposync']
if not type(conf) is dict:
raise Exception('Json configuration is not a dict')
if priority:
conf['priority'] = int(priority)
for section in conf:
if section in ('priority','name','certificates','server_certificates'):
continue
if not isinstance(conf[section],dict):
continue
if conf[section].get('verify_cert', None):
if conf[section]['verify_cert'].lower().strip() in ['0','1','true','false']:
conf[section]['verify_cert'] = conf[section]['verify_cert']
else:
cert_server = os.path.join(os.path.dirname(__file__), 'ssl', 'server', config_name + '-' + conf[section]['verify_cert']) + '.crt'
with open(cert_server, "w") as f:
f.write(conf['server_certificates'][conf[section]['verify_cert']])
conf[section]['verify_cert'] = cert_server
## Copy the json config to conf.d
config_filename = get_json_config_filename(config_name)
with open(config_filename, "w") as config_file:
config_file.write(json.dumps(conf))
## We copy the certificates
crts = conf.get('certificates', [])
for key in crts:
crt_dest = get_config_certificate_filename(config_name, key)
with open(crt_dest, "w") as f:
f.write(crts[key])
return (os.path.isfile(config_filename), config_filename)
except Exception as e:
raise Exception("Invalid json configuration %s:\n%s\n%s" % (config_name, json_config_str, str(e)))
def install_json_configb64(json_config_b64, config_name=None, priority=None):
try:
json_config_str = base64.b64decode(json_config_b64.encode('utf-8')).decode('utf-8')
return install_json_config(json_config_str, config_name, priority)
except:
raise Exception("Invalid base64 json configuration : \n%s" % json_config_b64)
def install_json_config_from_url(url, config_hash=None, config_name=None, priority=None):
if not config_hash:
if url and len(url) > 70 and url.endswith('.json') and url[-70] == '_':
config_hash = url[-69:-5]
else:
raise Exception("No hash given to verify json configuration from url %s", (url))
try:
local_file = wget(url, sha256=config_hash,verify_cert=False)
except:
raise Exception("Can't download or verify json configuration from %s with hash: %s" % (url, config_hash))
if not config_name:
config_name = hashlib.sha256(url.lower().encode('utf8')).hexdigest()
res = install_json_config_file(local_file, config_name, priority=priority)
if isfile(local_file):
os.remove(local_file)
return res
def install_json_config_file(json_config_file, config_name=None, priority=None):
if not os.path.isfile(json_config_file):
raise Exception("%s configuration file not found" % json_config_file)
json_config_str = ''
with open(json_config_file, "r") as file:
json_config_str = file.read()
return install_json_config(json_config_str, config_name, priority)
[docs]def install_config_package(config_filename, config_name=None):
## We removed the previous certificates (needed when we upgrade the package)
remove_config_package_certificates(config_name)
install_json_config_file(config_filename, config_name)
def remove_config_package_certificates(config_name):
if not os.path.isfile(get_config_package_filename(config_name)):
return
for matching_crt in get_config_package_certificates_filenames(config_name):
if isfile(matching_crt):
os.remove(matching_crt)
crtserverfile = get_config_package_certificates_server_filenames(config_name)
if crtserverfile:
if isfile(crtserverfile):
os.remove(crtserverfile)
[docs]def remove_config_package(config_name):
filename = get_config_package_filename(config_name)
if os.path.isfile(filename):
## Delete the certificates copied by the package
remove_config_package_certificates(config_name)
if isfile(filename):
os.remove(filename)
[docs]def unzip(zipfn, target=None, filenames=None, extract_with_full_paths=True):
r"""Unzip the files from zipfile with patterns in filenames to target directory
Args:
zipfn (str) : path to zipfile. (can be relative to temporary unzip location of package)
target (str) : target location. Defaults to dirname(zipfile) + basename(zipfile)
filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash)
extract_with_full_paths (bool): keeping or not the subfolders of the archive as is
Returns:
list : list of extracted files
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt')
['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi',
'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi',
'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames=['*.msi','*.py'])
['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi',
'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi',
'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target='test', filenames=['*.msi','*.py'])
['C:\\example\\test\\7z920-x64.msi',
'C:\\example\\test\\7z920.msi',
'C:\\example\\test\\setup.py']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/*')
['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256',
'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control')
['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target='.', filenames='WAPT/control')
['.\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target=r'C:\example\', filenames='WAPT/control')
['C:\\example\\WAPT\\control']
>>> unzip('tis-7zip_9.2.0-15_all.wapt', target=basedir, filenames='WAPT/control')
['C:\\example\\WAPT\\control']
>>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control', extract_with_full_paths=False)
['C:\\example\\control']
.. versionadded:: 1.3.11
.. versionchanged:: 2.2
added extract_with_full_paths parameter
"""
zipf = CustomZipFile(zipfn, allowZip64=True)
if target is None:
target = makepath(os.path.dirname(os.path.abspath(zipfn)), os.path.splitext(os.path.basename(zipfn))[0])
if filenames is not None:
filenames = [pattern.replace('\\', '/') for pattern in ensure_list(filenames)]
def match(fn, filenames):
# return True if fn matches one of the pattern in filenames
if filenames is None:
return True
else:
for pattern in filenames:
if glob.fnmatch.fnmatch(fn, pattern):
return True
return False
if filenames is not None:
files = [fn for fn in zipf.namelist() if match(fn, filenames)]
members = files
else:
files = zipf.namelist()
members = None
if sys.platform != 'darwin' and extract_with_full_paths:
zipf.extractall(target, members)
elif not extract_with_full_paths:
files_to_extract = files
files = []
os.makedirs(target, exist_ok=True)
for fn in files_to_extract:
filename = os.path.basename(fn)
# skip directories
if not filename:
continue
# copy file (taken from zipfile's extract)
source = zipf.open(fn)
files.append(os.path.join(filename))
dest = open(os.path.join(target, filename), "wb")
with source, dest:
shutil.copyfileobj(source, dest)
else: # the zipfile module doesn't support symlinks, and it's used in macOS packages
try:
members_str = " ".join('"{0}"'.format(m) for m in members) if members else ""
unzip_cmd = 'unzip -qq {zipfile} {members_str} -d {destination_dir}'.format(
zipfile=zipfn,
members_str=members_str,
destination_dir=target)
run(unzip_cmd)
except:
print('Error : couldn\'t unzip {}'.format(zipfn))
return []
return [makepath(target, fn.replace('/', os.sep)) for fn in files]
# Specific parameters for install scripts
params = {}
control = PackageEntry()
if __name__ == '__main__':
pass