#!/usr/bin/env python3
##
## -----------------------------------------------------------------
## This file is part of WAPT Software Deployment
## Copyright (C) 2012 - 2023 Tranquil IT https://www.tranquil.it
## All Rights Reserved.
##
## WAPT helps systems administrators to efficiently deploy
## setup, update and configure applications.
## ------------------------------------------------------------------
##
import os
import socket
import struct
import getpass
import platform
import shutil
import configparser
import psutil
import netifaces
import cpuinfo
import subprocess
import logging
import grp
import pwd
import ipaddress
from ldap3 import Server, Connection, Tls, SASL, KERBEROS
import ssl
import dns.resolver
import uptime
from waptutils import (ensure_unicode, makepath, error, get_main_ip, run, killtree, get_local_IPs, networking)
logger = logging.getLogger('waptcore')
[docs]def get_kernel_version():
return os.uname()[2]
[docs]def get_computer_groups():
"""Try to find the computer in the Active Directory
and return the list of groups
"""
return get_groups(get_computername().split('.')[0] + '$')
[docs]def get_groups(user):
gids = [g.gr_gid for g in grp.getgrall() if user.lower() in g.gr_mem]
gid = pwd.getpwnam(user.lower()).pw_gid
if not gid in gids:
gids.append(grp.getgrgid(gid).gr_gid)
return [grp.getgrgid(gid).gr_name.rsplit('\\')[-1].lower() for gid in gids]
[docs]def get_domain_info(ldap_auth_server=None, use_ssl=True, force_tgt=True, hostname=None, domain=None, verify_cert_ldap=False):
"""Return dict ad_site , ou and groups
Warning : Please note that the search for gssapi does not work if the reverse dns recording is not available for ad
"""
result = {}
result['groups'] = []
if platform.system() == 'Darwin':
cmd = 'ktutil -k /etc/krb5.keytab list'
else:
cmd = 'klist -k'
if (not hostname) or (not domain):
splitlist = run(cmd).split('$@', 1)
if not hostname:
hostname = str(splitlist[0].rsplit(' ', 1)[-1] + '$').split('/')[-1]
if not domain:
domain = splitlist[1].split('\n')[0].strip()
if force_tgt:
try:
subprocess.check_output(r'kinit -k %s\@%s' % (hostname, domain), shell=True, stderr=subprocess.STDOUT)
except:
pass
if verify_cert_ldap:
ldapssl = ssl.CERT_REQUIRED
else:
ldapssl = ssl.CERT_NONE
list_controler = []
if not ldap_auth_server:
for entry in dns.resolver.query('_ldap._tcp.dc._msdcs.%s' % domain.lower(), 'SRV'):
list_controler.append(entry.to_text().split(' ')[-1].strip('.'))
else:
list_controler.append(ldap_auth_server)
for controler in list_controler:
try:
tls = Tls(validate=ldapssl, version=ssl.PROTOCOL_TLSv1_2)
server = Server(controler, use_ssl=use_ssl, tls=tls)
c = Connection(server, authentication=SASL, sasl_mechanism=KERBEROS)
c.bind()
# get ou with ldap
c.search('dc=' + domain.lower().replace('.', ',dc='), search_filter='(samaccountname=%s)' % hostname.lower(), attributes=['distinguishedName', 'memberOf'])
result['ou'] = c.response[0]['dn']
if 'memberOf' in c.response[0]['attributes']:
for u in c.response[0]['attributes']['memberOf']:
result['groups'].append(u.split(',', 1)[0].split('=')[1].lower())
# get site with ldap
c.search('CN=Subnets,CN=Sites,CN=Configuration,dc=' + domain.lower().replace('.', ',dc='), search_filter='(siteObject=*)', attributes=['siteObject', 'cn'])
dict_ip_site = {}
for i in c.response:
dict_ip_site[i['attributes']['cn']] = i['attributes']['siteObject'].split('=', 1)[1].split(',', 1)[0]
c.unbind()
except:
try:
c.unbind()
except:
pass
continue
result['site'] = ''
for value in dict_ip_site:
ip_subnet = ipaddress.ip_network(value)
if (isinstance(ip_subnet, ipaddress.IPv4Network) and (ipaddress.ip_address(get_main_ip(controler)[0]) in ip_subnet)) or (isinstance(ip_subnet, ipaddress.IPv6Network) and (ipaddress.ip_address(get_main_ip(controler)[0]) in ip_subnet)):
result['site'] = dict_ip_site[value]
return result
error('unable to retrieve information')
[docs]def get_default_gateways():
if platform.system() == 'Linux':
"""Read the default gateway directly from /proc."""
with open("/proc/net/route") as fh:
for line in fh:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16)))
elif platform.system() == 'Darwin':
route_output = run('route -n get default').rstrip().split('\n')
route_output = [line.strip() for line in route_output]
route_dict = {}
for line in route_output:
split_l = line.split(':')
try:
route_dict[split_l[0]] = split_l[1].strip()
except:
pass
gateway_ip = route_dict['gateway']
gateway_hex = '{:02X}{:02X}{:02X}{:02X}'.format(*map(int, gateway_ip.split('.')))
return socket.inet_ntoa(struct.pack("<L", int(gateway_hex, 16)))
[docs]def user_local_appdata():
r"""Return the local appdata profile of current user
Returns:
str: path like u'/home/user/.config/'
"""
if 'HOME' in os.environ:
return ensure_unicode(makepath(os.environ['HOME'], '.config/'))
else:
return ''
user_appdata=user_local_appdata
[docs]def remove_tree(*args, **kwargs):
r"""Convenience function to delete a directory tree, with any error
not ignored by default. Pass ignore_errors=False to access possible
errors.
Args:
path (str): path to directory to remove
ignore_errors (boolean) : default to False. Set it to True to ignore exceptions on children deletion
onerror (func) : hook called with (func, path, exc)
on each delete exception. Should raise if stop is required.
"""
return shutil.rmtree(*args, **kwargs)
[docs]def local_drives():
partitions = psutil.disk_partitions()
result = {}
for elem in partitions:
result[elem.mountpoint] = dict(elem._asdict())
result[elem.mountpoint] = result[elem.mountpoint].update(dict(psutil.disk_usage(elem.mountpoint)._asdict()))
return result
[docs]def host_metrics():
"""Frequently updated host data
"""
result = {}
# volatile...
result['physical_memory'] = psutil.virtual_memory().total
result['virtual_memory'] = psutil.swap_memory().total
result['local_drives'] = local_drives()
result['logged_in_users'] = list(get_loggedinusers())
result['last_logged_on_user'] = get_last_logged_on_user()
# memory usage
mem_info = psutil.Process().memory_info()
result['wapt-memory-usage'] = {}
for field in mem_info._fields:
result['wapt-memory-usage'][field] = getattr(mem_info, field)
result['last_bootup_time'] = uptime.boottime()
return result
[docs]def default_gateway():
"""Returns default ipv4 current gateway"""
gateways = netifaces.gateways()
if gateways:
default_gw = gateways.get('default', None)
if default_gw:
default_inet_gw = default_gw.get(netifaces.AF_INET, None)
else:
default_inet_gw = None
if default_gateway:
return default_inet_gw[0]
else:
return None
[docs]def get_current_user():
r"""Get the login name for the current user.
>>> get_current_user()
u'htouvet'
"""
return ensure_unicode(getpass.getuser())
[docs]def application_data():
return os.path.join(os.environ['HOME'], '.config')
[docs]def is_valid_ipv4_address(address):
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError: # no inet_pton here, sorry
try:
socket.inet_aton(address)
except socket.error:
return False
return address.count('.') == 3
except socket.error: # not a valid address
return False
return True
[docs]def get_dns_servers():
dns_ips = []
with open('/etc/resolv.conf') as fp:
for cnt, line in enumerate(fp):
columns = line.split()
if len(columns) == 0:
continue
if columns[0] == 'nameserver':
ip = columns[1:][0]
if is_valid_ipv4_address(ip):
dns_ips.append(ip)
return dns_ips
[docs]def get_loggedinusers():
suser = psutil.users()
result = {}
for elem in suser:
if not elem.name in result:
result[elem.name] = None
if platform.system() != 'Darwin':
try:
output = run('loginctl list-sessions')
for line in output.split('\n'):
if 'SESSION' in line:
continue
if not line.startswith(' '):
continue
col = []
for c in line.split(' '):
if c == '':
continue
col.append(c)
result[col[2]] = col[0]
except:
pass
return result
[docs]def get_last_logged_on_user():
suser = psutil.users()
res = ''
for elem in suser:
if res == '':
res = elem
elif res.started < elem.started:
res = elem
return res
[docs]def get_domain_from_socket():
"""Return main DNS domain of the computer
Returns:
str: domain name
>>> get_domain_from_socket()
u'tranquilit.local'
"""
try:
return socket.getfqdn().split('.', 1)[1]
except:
return ""
[docs]def host_info_common_unix():
"""Read main workstation informations, returned as a dict
Returns:
dict: main properties of host, networking and windows system
.. versionchanged:: 1.4.1
returned keys changed :
dns_domain -> dnsdomain
>>> hi = host_info()
>>> 'computer_fqdn' in hi and 'connected_ips' in hi and 'computer_name' in hi and 'mac' in hi
True
"""
info = {}
info['computer_name'] = socket.gethostname().lower()
info['computer_fqdn'] = socket.getfqdn().lower()
try:
if os.path.isfile('/etc/samba/smb.conf'):
config = configparser.RawConfigParser(strict=False)
config.read('/etc/samba/smb.conf')
if config.has_option('global', 'workgroup'):
info['workgroup_name'] = config.get('global', 'workgroup')
except:
info['workgroup_name'] = ''
info['kernel_version'] = get_kernel_version()
try:
info['cpu_name'] = cpuinfo.get_cpu_info()['brand_raw']
except:
info['cpu_name'] = ""
info['environ'] = {k: ensure_unicode(v) for k, v in os.environ.items()}
return info
[docs]def host_info_networking():
info = {}
info['connected_ips'] = get_local_IPs()
info['interfaces'] = networking()
list_mac = {}
for c in info['interfaces']:
if 'mac' in c and 'addr' in c:
for m in c['addr']:
if m['addr'] in info['connected_ips']:
list_mac[c['mac']] = None
info['gateways'] = [get_default_gateways()]
info['dns_servers'] = get_dns_servers()
info['mac'] = list(list_mac)
info['main_ip'] = get_main_ip()
info['dnsdomain'] = get_domain_from_socket()
return info
[docs]def get_computername():
"""Return host name in lowercase (without domain part)"""
return socket.gethostname().lower()
def dmi_info_common_unix():
"""Hardware System information from BIOS estracted with dmidecode
Convert dmidecode -q output to python dict
Returns:
dict
>>> dmi = dmi_info()
>>> 'UUID' in dmi['System_Information']
True
>>> 'Product_Name' in dmi['System_Information']
True
"""
result = {}
# dmidecode don't show errors
dmiout = ensure_unicode(run('dmidecode -q 2>/dev/null'))
new_section = True
for l in dmiout.splitlines():
if not l.strip() or l.startswith('#'):
new_section = True
continue
if not l.startswith('\t') or new_section:
currobject = {}
key = l.strip().replace(' ', '_')
# already here... so add as array...
if (key in result):
if not isinstance(result[key], list):
result[key] = [result[key]]
result[key].append(currobject)
else:
result[key] = currobject
if l.startswith('\t'):
logger.debug(l)
else:
if not l.startswith('\t\t'):
currarray = []
if ':' in l:
(name, value) = l.split(':', 1)
currobject[name.strip().replace(' ', '_')] = value.strip()
else:
logger.warning("Error in line : %s" % l)
else:
# first line of array
if not currarray:
currobject[name.strip().replace(' ', '_')] = currarray
currarray.append(l.strip())
new_section = False
return result
[docs]def get_file_properties(fname, ignore_warning=True):
r"""Read all properties of the given file return them as a dictionary.
Args:
fname : path to Windows executable or DLL
Returns:
dict: properties of executable
>>> xp = get_file_properties(r'c:\windows\explorer.exe')
>>> 'FileVersion' in xp and 'FileDescription' in xp
True
"""
# TODO : POSIX version
props = {}
return props
def uac_enabled():
return False
[docs]def killalltasks(process_names, include_children=True):
"""Kill the task by their process_names
>>> killalltasks('firefox')
"""
logger.debug('Kill tasks %s' % (process_names,))
if not process_names:
return []
if not isinstance(process_names, list):
process_names = [process_names]
result = []
process_names = [process.lower() for process in process_names]
for p in psutil.process_iter():
try:
if p.name().lower() in process_names:
logger.debug('Kill process %i' % (p.pid,))
result.append((p.pid, p.name()))
if include_children:
killtree(p.pid)
else:
p.kill()
except (psutil.AccessDenied, psutil.NoSuchProcess):
pass
return result
def get_processes_with_name(name):
""" Returns the processes which contain the given name """
try:
processes = []
for proc in psutil.process_iter():
if name.lower() in proc.name().lower():
processes.append(proc)
return [p.as_dict() for p in processes]
except subprocess.CalledProcessError:
return None
[docs]def local_groups():
return [g.gr_name for g in grp.getgrall()]
[docs]def local_group_members(groupname):
return grp.getgrnam(groupname).gr_mem
[docs]def local_group_memberships(username):
"""List the local groups a user is member Of"""
return get_groups(username)