Source code for setuphelpers_unix

#!/usr/bin/env python3
##
## -----------------------------------------------------------------
##    This file is part of WAPT Software Deployment
##    Copyright (C) 2012 - 2024  Tranquil IT https://www.tranquil.it
##    All Rights Reserved.
##
##    WAPT helps systems administrators to efficiently deploy
##    setup, update and configure applications.
## ------------------------------------------------------------------
##
import os
import socket
import struct
import getpass
import platform
import shutil
import configparser
import psutil
import cpuinfo
import subprocess
import logging
import grp
import pwd

try:
    from waptlicences import peercache_status, uptime, get_dmi_infos, get_bios_infos # embedded in wapt-get binary
    import pyldap # embedded in wapt-get binary
except ImportError:
    pyldap=None
    uptime = None
    get_bios_infos = None
    get_dmi_infos = None

    def peercache_status():
        return None

from waptutils import (ensure_unicode, makepath, error, get_main_ip, run, killtree, get_local_IPs, networking, bootup_time)

logger = logging.getLogger('waptcore')


[docs]def get_kernel_version(): return os.uname()[2]
[docs]def get_computer_groups(): """Try to find the computer in the Active Directory and return the list of groups """ return get_groups(get_computername().split('.')[0] + '$')
[docs]def get_groups(user): gids = [g.gr_gid for g in grp.getgrall() if user.lower() in g.gr_mem] gid = pwd.getpwnam(user.lower()).pw_gid if not gid in gids: gids.append(grp.getgrgid(gid).gr_gid) return [grp.getgrgid(gid).gr_name.rsplit('\\')[-1].lower() for gid in gids]
[docs]def get_domain_info(force_tgt=True, hostname=None, domain=None): """Return dict ad_site , ou and groups""" if not pyldap: error('get_doamin_info: pyldap module unavailable.') result = {} result['groups'] = [] if platform.system() == 'Darwin': cmd = 'ktutil -k /etc/krb5.keytab list' else: cmd = 'klist -k' domain_from_keytab = None if (not hostname) or (not domain): splitlist = run(cmd).split('$@', 1) if not hostname: hostname = str(splitlist[0].rsplit(' ', 1)[-1] + '$').split('/')[-1] if not domain: domain = splitlist[1].split('\n')[0].strip() domain_from_keytab = domain if domain_from_keytab : krb5file = '/etc/krb5.conf' if not os.path.isfile(krb5file): with open(krb5file,'w') as f: f.write(r"""[libdefaults] default_realm = %s dns_lookup_kdc = true dns_lookup_realm=true""" % domain_from_keytab) os.chmod(krb5file, 0o644) if force_tgt: try: subprocess.check_output(r'kinit -k %s\@%s' % (hostname, domain), shell=True, stderr=subprocess.STDOUT) except: pass result['site'] = pyldap.cldap_get_domain_info(domain_name=domain)['client_site'] client = pyldap.PyLdapClient(domain_name=domain) r = client.bind_sasl_kerberos() if not r[0]: error('Failed connect to active directory') client.search(client.root_dn(), False, '(sAMAccountName=%s)' % hostname.lower(), ['distinguishedName','memberOf']) response = client.search_result() attrs = response[0].object_attributes attrs_result = { u.name : u.values for u in attrs} result['ou'] = attrs_result['distinguishedName'][0] if 'memberOf' in attrs_result: for u in attrs_result['memberOf']: result['groups'].append(u.split(',', 1)[0].split('=')[1].lower()) return result
[docs]def default_gateway(): if platform.system() == 'Linux': """Read the default gateway directly from /proc.""" with open("/proc/net/route") as fh: for line in fh: fields = line.strip().split() if fields[1] != '00000000' or not int(fields[3], 16) & 2: continue return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16))) elif platform.system() == 'Darwin': route_output = run('route -n get default').rstrip().split('\n') route_output = [line.strip() for line in route_output] route_dict = {} for line in route_output: split_l = line.split(':') try: route_dict[split_l[0]] = split_l[1].strip() except: pass gateway_ip = route_dict['gateway'] gateway_hex = '{:02X}{:02X}{:02X}{:02X}'.format(*map(int, gateway_ip.split('.'))) return socket.inet_ntoa(struct.pack("<L", int(gateway_hex, 16)))
[docs]def get_default_gateways(): return [ default_gateway() ]
[docs]def user_local_appdata(): r"""Return the local appdata profile of current user Returns: str: path like u'/home/user/.config/' """ if 'HOME' in os.environ: return ensure_unicode(makepath(os.environ['HOME'], '.config/')) else: return ''
user_appdata=user_local_appdata
[docs]def remove_tree(*args, **kwargs): r"""Convenience function to delete a directory tree, with any error not ignored by default. Pass ignore_errors=False to access possible errors. Args: path (str): path to directory to remove ignore_errors (boolean) : default to False. Set it to True to ignore exceptions on children deletion onerror (func) : hook called with (func, path, exc) on each delete exception. Should raise if stop is required. """ return shutil.rmtree(*args, **kwargs)
[docs]def local_drives(): partitions = psutil.disk_partitions() result = {} for elem in partitions: result[elem.mountpoint] = dict(elem._asdict()) result[elem.mountpoint] = result[elem.mountpoint].update(dict(psutil.disk_usage(elem.mountpoint)._asdict())) return result
[docs]def host_metrics(): """Frequently updated host data """ result = {} # volatile... result['physical_memory'] = psutil.virtual_memory().total result['virtual_memory'] = psutil.swap_memory().total result['local_drives'] = local_drives() result['logged_in_users'] = list(get_loggedinusers()) result['last_logged_on_user'] = get_last_logged_on_user() result['peercache'] = peercache_status() # memory usage mem_info = psutil.Process().memory_info() result['wapt-memory-usage'] = {} for field in mem_info._fields: result['wapt-memory-usage'][field] = getattr(mem_info, field) if uptime: result['last_bootup_time'] = bootup_time() return result
[docs]def get_current_user(): r"""Get the login name for the current user. >>> get_current_user() u'htouvet' """ return ensure_unicode(getpass.getuser())
[docs]def application_data(): return os.path.join(os.environ['HOME'], '.config')
[docs]def is_valid_ipv4_address(address): try: socket.inet_pton(socket.AF_INET, address) except AttributeError: # no inet_pton here, sorry try: socket.inet_aton(address) except socket.error: return False return address.count('.') == 3 except socket.error: # not a valid address return False return True
[docs]def get_dns_servers(): dns_ips = [] with open('/etc/resolv.conf') as fp: for cnt, line in enumerate(fp): columns = line.split() if len(columns) == 0: continue if columns[0] == 'nameserver': ip = columns[1:][0] if is_valid_ipv4_address(ip): dns_ips.append(ip) return dns_ips
[docs]def get_loggedinusers(): suser = psutil.users() result = {} for elem in suser: if not elem.name in result: result[elem.name] = None if platform.system() != 'Darwin': try: output = run('loginctl list-sessions') for line in output.split('\n'): if 'SESSION' in line: continue if not line.startswith(' '): continue col = [] for c in line.split(' '): if c == '': continue col.append(c) result[col[2]] = col[0] except: pass return result
[docs]def get_last_logged_on_user(): suser = psutil.users() res = '' for elem in suser: if res == '': res = elem elif res.started < elem.started: res = elem return res
[docs]def get_domain_from_socket(): """Return main DNS domain of the computer Returns: str: domain name >>> get_domain_from_socket() u'tranquilit.local' """ try: return socket.getfqdn().split('.', 1)[1] except: return ""
[docs]def host_info_common_unix(): """Read main workstation informations, returned as a dict Returns: dict: main properties of host, networking and windows system .. versionchanged:: 1.4.1 returned keys changed : dns_domain -> dnsdomain >>> hi = host_info() >>> 'computer_fqdn' in hi and 'connected_ips' in hi and 'computer_name' in hi and 'mac' in hi True """ info = {} info['computer_name'] = socket.gethostname().lower() info['computer_fqdn'] = socket.getfqdn().lower() if get_bios_infos: info['bios_infos'] = get_bios_infos() try: if os.path.isfile('/etc/samba/smb.conf'): config = configparser.RawConfigParser(strict=False) config.read('/etc/samba/smb.conf') if config.has_option('global', 'workgroup'): info['workgroup_name'] = config.get('global', 'workgroup') except: info['workgroup_name'] = '' info['kernel_version'] = get_kernel_version() try: info['cpu_name'] = cpuinfo.get_cpu_info()['brand_raw'] except: info['cpu_name'] = "" info['environ'] = {k: ensure_unicode(v) for k, v in os.environ.items()} return info
[docs]def host_info_networking(): info = {} info['connected_ips'] = get_local_IPs() info['interfaces'] = networking() list_mac = {} for c in info['interfaces']: if 'mac' in c and 'addr' in c: for m in c['addr']: if m['addr'] in info['connected_ips']: list_mac[c['mac']] = None info['gateways'] = get_default_gateways() info['dns_servers'] = get_dns_servers() info['mac'] = list(list_mac) info['main_ip'] = get_main_ip() info['dnsdomain'] = get_domain_from_socket() return info
[docs]def get_computername(): """Return host name in lowercase (without domain part)""" return socket.gethostname().lower()
_dmi_info = None def dmi_info_common_unix(): """Hardware System information from BIOS estracted with dmidecode Convert dmidecode -q output to python dict Returns: dict >>> dmi = dmi_info() >>> 'UUID' in dmi['System_Information'] True >>> 'Product_Name' in dmi['System_Information'] True """ global _dmi_info if _dmi_info is not None: return _dmi_info result = {} # dmidecode don't show errors if platform.system() == 'Darwin': dmiout = ensure_unicode(run('/opt/wapt/bin/dmidecode -q 2>/dev/null')) else: dmiout = ensure_unicode(run('dmidecode -q 2>/dev/null')) new_section = True for l in dmiout.splitlines(): if not l.strip() or l.startswith('#'): new_section = True continue if not l.startswith('\t') or new_section: currobject = {} key = l.strip().replace(' ', '_') # already here... so add as array... if (key in result): if not isinstance(result[key], list): result[key] = [result[key]] result[key].append(currobject) else: result[key] = currobject if l.startswith('\t'): logger.debug(l) else: if not l.startswith('\t\t'): currarray = [] if ':' in l: (name, value) = l.split(':', 1) currobject[name.strip().replace(' ', '_')] = value.strip() else: logger.warning("Error in line : %s" % l) else: # first line of array if not currarray: currobject[name.strip().replace(' ', '_')] = currarray currarray.append(l.strip()) new_section = False _dmi_info = result return result
[docs]def get_file_properties(fname, ignore_warning=True): r"""Read all properties of the given file return them as a dictionary. Args: fname : path to Windows executable or DLL Returns: dict: properties of executable >>> xp = get_file_properties(r'c:\windows\explorer.exe') >>> 'FileVersion' in xp and 'FileDescription' in xp True """ # TODO : POSIX version props = {} return props
def uac_enabled(): return False
[docs]def killalltasks(process_names, include_children=True): """Kill the task by their process_names >>> killalltasks('firefox') """ logger.debug('Kill tasks %s' % (process_names,)) if not process_names: return [] if not isinstance(process_names, list): process_names = [process_names] result = [] process_names = [process.lower() for process in process_names] for p in psutil.process_iter(): try: if p.name().lower() in process_names: logger.debug('Kill process %i' % (p.pid,)) result.append((p.pid, p.name())) if include_children: killtree(p.pid) else: p.kill() except (psutil.AccessDenied, psutil.NoSuchProcess): pass return result
def get_processes_with_name(name): """ Returns the processes which contain the given name """ try: processes = [] for proc in psutil.process_iter(): if name.lower() in proc.name().lower(): processes.append(proc) return [p.as_dict() for p in processes] except subprocess.CalledProcessError: return None
[docs]def local_groups(): return [g.gr_name for g in grp.getgrall()]
[docs]def local_group_members(groupname): return grp.getgrnam(groupname).gr_mem
[docs]def local_group_memberships(username): """List the local groups a user is member Of""" return get_groups(username)