#!/usr/bin/env python3#### -----------------------------------------------------------------## This file is part of WAPT Software Deployment## Copyright (C) 2012 - 2022 Tranquil IT https://www.tranquil.it## All Rights Reserved.#### WAPT helps systems administrators to efficiently deploy## setup, update and configure applications.## ------------------------------------------------------------------##importglobimportlocaleimportloggingimportosimportplatformimportshutilimportsocketimportstatimportsubprocessimportsysimporttimeimportpsutilimportnetifacesimportplatformimporttimeimportgetpassfrominiparseimportRawConfigParserfromwaptpackageimportPackageEntryfromwaptutilsimport(Version,__version__,all_files,dateof,datetime2isodate,ensure_list,ensure_unicode,fileisodate,find_all_files,get_disk_free_space,hours_minutes,httpdatetime2isodate,isodate2datetime,time2display,wget,wgets,makepath,killtree,isfile,isdir,CalledProcessErrorOutput,remove_file,mkdirs,get_main_ip,get_local_IPs,killalltasks,isrunning,get_sha256,CustomZipFile,run,run_notfatal,CalledProcessError,RunOutput,RunReader,listening_sockets,copytree2,default_skip,default_overwrite,default_oncopy,default_overwrite_older)fromurllib.requestimportgetproxiesimportrequestsimportuuidimporthashlib__all__=[]__all__.extend(['CalledProcessError','PackageEntry','Version','__version__','all_files','application_data','control','copytree2','currentdate','currentdatetime','dateof','datetime2isodate','default_gateway','default_oncopy','default_overwrite','default_overwrite_older','default_skip','dir_is_empty','dmi_info','ensure_dir','ensure_list','ensure_unicode','error','file_is_locked','filecopyto','fileisodate','find_all_files','find_processes','get_computer_groups','get_computername','get_current_user','get_default_gateways','get_disk_free_space','get_dns_servers','get_fqdn','get_hostname','get_language','get_last_logged_on_user','get_local_IPs','get_loggedinusers','get_os_name','get_main_ip','get_os_version','get_proxies','get_sha256','glob','host_info','host_metrics','hours_minutes','httpdatetime2isodate','inifile_deleteoption','inifile_deletesection','inifile_hasoption','inifile_hassection','inifile_readstring','inifile_writestring','installed_softwares','is64','is32','isARM','isARM64','isdir','isfile','isodate2datetime','isrunning','json_write_file','json_load_file','killalltasks','killtree','listening_sockets','logger','makepath','mkdirs','networking','os','params','processes_for_file','remove_file','remove_tree','run','run_notfatal','running_on_ac','shell_launch','shutil','time2display','unzip','user_home_directory','wget','wgets',])# Conditionnal imports for setuphelpersifplatform.system()=='Windows':fromsetuphelpers_windowsimport*__all__.extend(['EnsureWUAUServRunning','HKEY_CLASSES_ROOT','HKEY_CURRENT_CONFIG','HKEY_CURRENT_USER','HKEY_LOCAL_MACHINE','HKEY_USERS','InstallerTypes','KEY_ALL_ACCESS','KEY_READ','KEY_WRITE','REG_DWORD','REG_EXPAND_SZ','REG_MULTI_SZ','REG_SZ','RunOutput','RunReader','TimeoutExpired','WindowsVersions','add_shutdown_script','add_to_system_path','add_user_to_group','adjust_current_privileges','battery_lifetime','battery_percent','bookmarks','common_desktop','create_daily_task','create_desktop_shortcut','create_group','create_onetime_task','create_programs_menu_shortcut','create_shortcut','create_user','create_user_desktop_shortcut','create_user_programs_menu_shortcut','critical_system_pending_updates','default_user_appdata','default_user_local_appdata','delete_at_next_reboot','delete_group','delete_task','delete_user','desktop','disable_file_system_redirection','disable_task','enable_task','fix_wmi','get_all_scheduled_tasks','get_antivirus_info','get_app_install_location','get_app_path','get_appath','get_computer_description','get_computer_domain','get_default_app','get_domain_fromregistry','get_file_properties','get_file_assocation','get_installer_defaults_win','get_language_code','get_local_profiles','get_msi_properties','get_powershell_str','get_product_props','get_profile_path','get_profiles_users','get_shortcut_properties','get_service_start_mode','get_task','get_user_from_sid','get_version_from_binary','getsilentflags','getscreens','install_exe_if_needed','install_location','install_msi_if_needed','installed_windows_updates','InstallerTypes','iswin64','is_kb_installed','is_pending_reboot','list_local_printers','local_admins','local_desktops','local_drives','local_group_members','local_group_memberships','local_groups','local_users','local_users_profiles','memory_status','messagebox','my_documents','need_install','pending_reboot_reasons','programdata','programfiles','programfiles32','programfiles64','programs','win32com_ensure_dispatch_patch','reboot_machine','recent','reg_closekey','reg_delvalue','reg_enum_subkeys','reg_enum_values','reg_getvalue','reg_key_exists','reg_openkey_noredir','reg_setvalue','reg_value_exists','register_dll','register_ext','register_uninstall','register_windows_uninstall','registered_organization','registry_delete','registry_deletekey','registry_readstring','registry_set','registry_setstring','remove_appx','remove_user_appx','remove_desktop_shortcut','remove_from_system_path','remove_metroapp','remove_printer','remove_programs_menu_folder','remove_programs_menu_shortcut','remove_shutdown_script','remove_user_desktop_shortcut','remove_user_from_group','remove_user_programs_menu_folder','remove_user_programs_menu_shortcut','replace_at_next_reboot','run_as_administrator','run_notfatal','run_powershell','run_powershell_from_file','run_task','running_as_admin','running_as_system','sendto','service_delete','service_installed','service_is_running','service_is_stopped','service_list','service_restart','service_start','service_stop','set_computer_description','set_environ_variable','set_file_hidden','set_file_visible','set_service_start_mode','showmessage','shutdown_scripts_ui_visible','start_menu','startup','system32','systemdrive','task_exists','taskscheduler','uac_enabled','uninstall_cmd','uninstall_key_exists','unregister_dll','unregister_uninstall','unset_environ_variable','unzip_with_7zip','user_appdata','user_desktop','user_local_appdata','wait_uninstallkey_absent','wait_uninstallkey_present','win_startup_info','wincomputername','windomainname','windows_version','winshell','wmi_as_struct','wmi_info','wmi_info_basic','wua_agent_version','getscreens'])else:# UNIX functions__all__.extend(['application_data','default_gateway','dmi_info','get_computername','get_computername','get_current_user','get_default_gateways','get_dns_servers','get_domain_from_socket','get_domain_info','get_file_properties','get_groups','get_kernel_version','get_last_logged_on_user','get_loggedinusers','host_info_common_unix','host_metrics','is_valid_ipv4_address','local_drives','networking','user_appdata','user_local_appdata','local_group_members','local_group_memberships','local_groups'])ifplatform.system()=='Darwin':fromsetuphelpers_macosimport*__all__.extend(['brew_install','brew_uninstall','get_applications_info_files','get_info_plist_path','get_installed_pkgs','get_pkg_info','get_plist_obj','install_app','install_dmg','install_pkg','is_dmg_installed','is_local_app_installed','is_local_pkg_installed','mount_dmg','uninstall_app','uninstall_pkg','unmount_dmg','uninstall_key_exists','get_release_name','get_os_version'])elifplatform.system()=='Linux':fromsetuphelpers_linuximport*__all__.extend(['is_debian_based','is_redhat_based','is_rhel_based','is_linux64','isLinux64','install_apt','uninstall_apt','install_deb','purge_deb','get_code_name_version','get_distrib_version','get_distrib_linux','install_package_if_needed','install_required_dependencies_apt','autoremove_apt','install_yum','uninstall_yum','autoremove_yum','update_apt','upgrade_apt','update_yum','upgrade_yum','type_debian','type_redhat','install_rpm','systemd_start_service','systemd_stop_service','systemd_restart_service','systemd_status_service',])logger=logging.getLogger('waptcore')
[docs]defget_os_name():r"""Get the name of the current running operating system Returns: str: Windows, Linux, Darwin >>> get_os_name() 'Windows' """returnplatform.system()
[docs]defjson_load_file(json_file,encoding="utf-8"):r"""Get content of a json file Args: path (str): path to the file Returns: dictionary (dict) """withopen(json_file,encoding=encoding)asread_file:returnjson.load(read_file)
[docs]defjson_write_file(json_file,data,indent=4,sort_keys=False,encoding='utf-8'):r"""Write dictionary to a json file Args: json_file (str): path to json file data (dict): dictionary content indent (str or list of str): tabulation size ; default: 4 spaces sort_keys (bool): alphabetical order or not encoding (bool): file encoding """withopen(json_file,'w',encoding=encoding)aswrite_file:json.dump(data,write_file,sort_keys=sort_keys,indent=indent)
[docs]defget_proxies():r"""Return system proxy with the urllib python library >>> get_proxies() {'http': 'http://srvproxy.ad.domain.lan:8080', 'https': 'http://srvproxy.ad.domain.lan:8080'} """returngetproxies()
[docs]deffilecopyto(filename,target):"""Copy file from absolute or package temporary directory to target directory If file is dll or exe, logs the original and new version. Args: filename (str): absolute path to file to copy, or relative path to temporary package install content directory. target (str) : absolute path to target directory where to copy file. target is either a full filename or a directory name if filename is .exe or .dll, logger prints version numbers >>> if not os.path.isfile('c:/tmp/fc.test'): ... with open('c:/tmp/fc.test','wb') as f: ... f.write('test') >>> if not os.path.isdir('c:/tmp/target'): ... os.mkdir('c:/tmp/target') >>> if os.path.isfile('c:/tmp/target/fc.test'): ... os.unlink('c:/tmp/target/fc.test') >>> filecopyto('c:/tmp/fc.test','c:/tmp/target') >>> os.path.isfile('c:/tmp/target/fc.test') True """(dir,fn)=os.path.split(filename)ifnotdir:dir=os.getcwd()ifos.path.isdir(target):target=os.path.join(target,os.path.basename(filename))ifos.path.isfile(target):ifos.path.splitext(target)[1]in('.exe','.dll'):try:ov=get_file_properties(target)['FileVersion']nv=get_file_properties(filename)['FileVersion']logger.info('Replacing %s (%s) -> %s'%(ensure_unicode(target),ov,nv))except:logger.info('Replacing %s'%target)else:logger.info('Replacing %s'%target)else:ifos.path.splitext(target)[1]in('.exe','.dll'):try:nv=get_file_properties(filename)['FileVersion']logger.info('Copying %s (%s)'%(ensure_unicode(target),nv))except:logger.info('Copying %s'%(ensure_unicode(target)))else:logger.info('Copying %s'%(ensure_unicode(target)))shutil.copy(filename,target)
# Copy of an entire tree from install temp directory to target
[docs]defdir_is_empty(path):"""Check if a directory is empty"""returnisdir(path)andlen(os.listdir(path))==0
[docs]deffile_is_locked(path,timeout=5):"""Check if a file is locked. waits timout seconds for the release"""count=timeoutwhilecount>0:try:f=open(path,'ab')f.close()returnFalseexceptIOErrorase:ife.errno==13:count-=1ifcount<0:returnTrueelse:print('Waiting for %s to be released...'%path)time.sleep(1)else:raisereturnTrue
[docs]defshell_launch(cmd):"""Launch a command (without arguments) but doesn't wait for its termination >>> open('c:/tmp/test.txt','w').write('Test line') >>> shell_launch('c:/tmp/test.txt') """os.startfile(cmd)
[docs]defprocesses_for_file(filepath,open_files=True,dll=True):"""Generator returning processes currently having a open file descriptor for filepath If not running as System account, can not access system processes. Args: filepath (str): file path or pattern (glob *) Returns: iterator psutil.Process """forprocessinpsutil.process_iter():ifdll:try:fordllprocinprocess.memory_maps():ifglob.fnmatch.fnmatch(dllproc.path,filepath):yieldprocessbreakexceptExceptionase:# often : psutil.AccessDeniedpassifopen_files:try:foropen_fileinprocess.open_files():ifglob.fnmatch.fnmatch(open_file.path,filepath):yieldprocessbreakexceptExceptionase:# often : psutil.AccessDeniedpass
[docs]deffind_processes(process_name):"""Return list of Process names process_name Args: process_name (str): process name to lookup Returns: list: list of processes (Process) named process_name or process_name.exe >>> [p.pid for p in find_processes('explorer')] [2756, 4024] """process_name=process_name.lower()result=[]forpinpsutil.process_iter():try:ifp.name().lower()in[process_name,process_name+'.exe']:result.append(p)except(psutil.AccessDenied,psutil.NoSuchProcess):passreturnresult
defget_domain():"""Return main DNS domain of the computer Returns: str: domain name >>> get_domain_fromregistry() u'tranquilit.local' """ifsys.platform=='win32':returnget_domain_fromregistry()elifsys.platform.startswith('linux'):returnget_domain_from_socket()
[docs]definifile_hasoption(inifilename,section,key):"""Check if an option is present in section of the inifile Args: inifilename (str): Path to the ini file section (str): section key (str): value key to check Returns: boolean : True if the key exists >>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version') True >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','dontexist') False """inifile=RawConfigParser()inifile.read(inifilename)returninifile.has_section(section)andinifile.has_option(section,key)
[docs]definifile_hassection(inifilename,section):"""Check if an option is present in section of the inifile Args: inifilename (str): Path to the ini file section (str): section Returns: boolean : True if the key exists >>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hassection('c:/tranquilit/wapt/tests/test.ini','global') True """inifile=RawConfigParser()inifile.read(inifilename)returninifile.has_section(section)
[docs]definifile_deleteoption(inifilename,section,key):"""Remove a key within the section of the inifile Args: inifilename (str): Path to the ini file section (str): section key (str): value key of option to remove Returns: boolean : True if the key/option has been removed >>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_hasoption('c:/tranquilit/wapt/tests/test.ini','global','version') True >>> print inifile_deleteoption('c:/tranquilit/wapt/tests/test.ini','global','version') False """inifile=RawConfigParser()inifile.read(inifilename)inifile.remove_option(section,key)inifile.write(open(inifilename,'w'))returninifile.has_section(section)andnotinifile.has_option(section,key)ifsys.platform.startswith('linux'):returnget_distrib_version()ifsys.platform=='darwin':returnplatform.mac_ver()[0]return('OS not supported')
[docs]definifile_deletesection(inifilename,section):"""Remove a section within the inifile Args: inifilename (str): Path to the ini file section (str): section to remove Returns: boolean : True if the section has been removed """inifile=RawConfigParser()inifile.read(inifilename)inifile.remove_section(section)inifile.write(open(inifilename,'w'))returnnotinifile.has_section(section)
[docs]definifile_readstring(inifilename,section,key,default=None):"""Read a string parameter from inifile >>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.2') >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version') 1.1.2 >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','undefaut','defvalue') defvalue """inifile=RawConfigParser()inifile.read(inifilename)ifinifile.has_section(section)andinifile.has_option(section,key):returninifile.get(section,key)else:returndefault
[docs]definifile_writestring(inifilename,section,key,value):r"""Write a string parameter to inifile >>> inifile_writestring('c:/tranquilit/wapt/tests/test.ini','global','version','1.1.1') >>> print inifile_readstring('c:/tranquilit/wapt/tests/test.ini','global','version') 1.1.1 """inifile=RawConfigParser()inifile.read(inifilename)ifnotinifile.has_section(section):inifile.add_section(section)inifile.set(section,key,value)inifile.write(open(inifilename,'w'))
[docs]defunzip(zipfn,target=None,filenames=None,extract_with_full_paths=True):r"""Unzip the files from zipfile with patterns in filenames to target directory Args: zipfn (str) : path to zipfile. (can be relative to temporary unzip location of package) target (str) : target location. Defaults to dirname(zipfile) + basename(zipfile) filenames (str or list of str): list of filenames / glob patterns (path sep is normally a slash) extract_with_full_paths (bool): keeping or not the subfolders of the archive as is Returns: list : list of extracted files >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt') ['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature'] >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames=['*.msi','*.py']) ['C:\\example\\tis-7zip_9.2.0-15_all\\7z920-x64.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\7z920.msi', 'C:\\example\\tis-7zip_9.2.0-15_all\\setup.py'] >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target='test', filenames=['*.msi','*.py']) ['C:\\example\\test\\7z920-x64.msi', 'C:\\example\\test\\7z920.msi', 'C:\\example\\test\\setup.py'] >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/*') ['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/control', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/wapt.psproj', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/manifest.sha256', 'C:\\example\\tis-7zip_9.2.0-15_all\\WAPT/signature'] >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control') ['C:\\example\\tis-7zip_9.2.0-15_all\\WAPT\\control'] >>> unzip('tis-7zip_9.2.0-15_all.wapt', target='.', filenames='WAPT/control') ['.\\WAPT\\control'] >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', target=r'C:\example\', filenames='WAPT/control') ['C:\\example\\WAPT\\control'] >>> unzip('tis-7zip_9.2.0-15_all.wapt', target=basedir, filenames='WAPT/control') ['C:\\example\\WAPT\\control'] >>> unzip(r'C:\example\tis-7zip_9.2.0-15_all.wapt', filenames='WAPT/control', extract_with_full_paths=False) ['C:\\example\\control'] .. versionadded:: 1.3.11 .. versionchanged:: 2.2 added extract_with_full_paths parameter """zipf=CustomZipFile(zipfn,allowZip64=True)iftargetisNone:target=makepath(os.path.dirname(os.path.abspath(zipfn)),os.path.splitext(os.path.basename(zipfn))[0])iffilenamesisnotNone:filenames=[pattern.replace('\\','/')forpatterninensure_list(filenames)]defmatch(fn,filenames):# return True if fn matches one of the pattern in filenamesiffilenamesisNone:returnTrueelse:forpatterninfilenames:ifglob.fnmatch.fnmatch(fn,pattern):returnTruereturnFalseiffilenamesisnotNone:files=[fnforfninzipf.namelist()ifmatch(fn,filenames)]members=fileselse:files=zipf.namelist()members=Noneifsys.platform!='darwin'andextract_with_full_paths:zipf.extractall(target,members)elifnotextract_with_full_paths:files_to_extract=filesfiles=[]os.makedirs(target,exist_ok=True)forfninfiles_to_extract:filename=os.path.basename(fn)# skip directoriesifnotfilename:continue# copy file (taken from zipfile's extract)source=zipf.open(fn)files.append(os.path.join(filename))dest=open(os.path.join(target,filename),"wb")withsource,dest:shutil.copyfileobj(source,dest)else:# the zipfile module doesn't support symlinks, and it's used in macOS packagestry:members_str=" ".join('"{0}"'.format(m)forminmembers)ifmemberselse""unzip_cmd='unzip -qq {zipfile}{members_str} -d {destination_dir}'.format(zipfile=zipfn,members_str=members_str,destination_dir=target)run(unzip_cmd)except:print('Error : couldn\'t unzip {}'.format(zipfn))return[]return[makepath(target,fn.replace('/',os.sep))forfninfiles]
# Specific parameters for install scriptsparams={}control=PackageEntry()if__name__=='__main__':pass