#!/usr/bin/env python3
##
## -----------------------------------------------------------------
##    This file is part of WAPT Software Deployment
##    Copyright (C) 2012 - 2024  Tranquil IT https://www.tranquil.it
##    All Rights Reserved.
##
##    WAPT helps systems administrators to efficiently deploy
##    setup, update and configure applications.
## ------------------------------------------------------------------
##
import os
import socket
import platform
import subprocess
import logging
import glob
import datetime
import re
import locale
import threading
import grp
import psutil
from setuphelpers_unix import host_info_common_unix,dmi_info_common_unix
from waptutils import isfile,fixed_columns_to_dicts,ensure_unicode,run
from contextlib import contextmanager
LOCALE_LOCK = threading.Lock()
@contextmanager
def setlocale(name):
    with LOCALE_LOCK:
        saved = locale.setlocale(locale.LC_ALL)
        try:
            yield locale.setlocale(locale.LC_ALL, name)
        finally:
            locale.setlocale(locale.LC_ALL, saved)
try:
    import distro
    linux_distribution = list(distro.linux_distribution(False))
    linux_distribution[2] = distro.codename()
    linux_distribution = tuple(linux_distribution)
except ImportError:
    linux_distribution = (None, None, None)
logger = logging.getLogger('waptcore')
[docs]def is_linux64():
    return platform.machine().endswith('64') 
isLinux64 = is_linux64
[docs]def get_distrib_version():
    return linux_distribution[1] 
[docs]def get_os_version():
    return get_distrib_version() 
[docs]def get_distrib_linux():
    return linux_distribution[0] 
[docs]def get_code_name_version():
    return linux_distribution[2] 
[docs]def get_hostname():
    try:
        return socket.getfqdn().lower()
    except:
        return "" 
[docs]def type_debian():
    return isfile('/etc/debian_version') 
is_debian_based=type_debian
[docs]def is_debian():
    return linux_distribution is not None and linux_distribution[0].lower() == 'debian' 
[docs]def get_debian_version():
    if isfile('/etc/debian_version'):
        with open('/etc/debian_version') as f:
            debian_version = f.read().strip()
        return debian_version 
[docs]def type_redhat():
    return isfile('/etc/redhat-release') 
def type_rhel():
    return linux_distribution is not None and linux_distribution[0].lower() in ('rhel', 'centos', 'oracle','almalinux','rocky')
is_rhel_based=type_rhel
is_redhat_based=type_redhat
def local_users():
    result = []
    with open('/etc/passwd') as f:
        for u in [entry.split(':',1) for entry in f.read().splitlines]:
            if not u[0]:
                continue
            result.append(u[0])
    return result
[docs]def host_info():
    info = host_info_common_unix()
    try:
        if dmi_info:
            dmi = dmi_info()
            info['system_manufacturer'] = dmi['System_Information']['Manufacturer']
            info['system_productname'] = dmi['System_Information']['Product_Name']
            info['serial_nr'] = dmi['System_Information'].get('Serial_Number')
    except:
        logger.warning('Error while running dmidecode, dmidecode needs root privileges')
        pass
    info['os_release_name'] = get_code_name_version().lower()
    if info['os_release_name'] == 'core':
        info['os_release_name'] = get_distrib_linux() + ' ' +  get_distrib_version()
    info['platform'] = platform.system()
    info['os_name'] = get_distrib_linux()
    info['os_version'] = get_distrib_version()
    if is_debian():
        info['debian_version'] = get_debian_version()
    if type_debian():
        info['reboot_needed'] = isfile('/var/run/reboot-required')
        info['reboot-required.pkgs'] = []
        if isfile('/var/run/reboot-required.pkgs'):
            with open('/var/run/reboot-required.pkgs','r') as f:
                info['reboot-required.pkgs'] = f.read().split('\n')
    info['linux64'] = isLinux64()
    with open('/etc/group') as f:
        info['local_groups'] = {group.split(':')[0]:[m for m in group.split(':')[3].split(',') if m != '' ] for group in f.read().split('\n') if group.split(':')[0] != ''}
    info['local_users'] = []
    with open('/etc/passwd') as f:
        for u in [entry.split(':') for entry in f.read().split('\n')]:
            if not u[0]:
                continue
            info['local_users'].append(u[0])
            gr_struct = grp.getgrgid(u[3])
            if gr_struct.gr_name in info['local_groups']:
                if u[0] not in info['local_groups'][gr_struct.gr_name]:
                    info['local_groups'][gr_struct.gr_name].append(u[0])
    return info 
[docs]def dmi_info():
    return dmi_info_common_unix() 
def listcontrol_to_dict(cmd=None, first_entry=None):
    if os.path.isfile(cmd):
        with open(cmd,'r') as f :
            list_package = '\n' + f.read()
    else:
        list_package = '\n' + run(cmd)
    list_control=[]
    for i in list_package.split(first_entry):
        if not i.strip('\n'):
            continue
        list_control.append(first_entry.replace('\n','') + i)
    list_package = []
    for pkg in list_control:
        entry = {}
        gpg = False
        for l in pkg.split('\n'):
            if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in l:
                gpg = True
                continue
            if '-----END PGP PUBLIC KEY BLOCK-----' in l:
                gpg = False
                continue
            if gpg :
                continue
            if not l:
                continue
            if (':' in l) and (not '::' in l) and (not l.startswith(' ')) :
                key = l.split(':',1)[0].strip().lower()
                value = l.split(':',1)[1].strip()
                entry[key] = value.strip('\n')
            else:
                entry[key] =  entry[key].strip('\n') + '\n' + l
        list_package.append(entry)
    return list_package
[docs]def installed_softwares(keywords=None, name=None, ignore_empty_names=True):
    """ Return list of installed software from apt or rpm
        Args:
            keywords (str or list): string to lookup in key, display_name or publisher fields
        Returns:
            list of dicts: [{'key', 'name', 'version', 'install_date', 'install_location'
                         'uninstall_string', 'publisher','system_component'}]
    """
    name_re = re.compile(name) if name is not None else None
    list_installed_softwares = []
    if isinstance(keywords, str):
        keywords = keywords.lower().split()
    elif isinstance(keywords, bytes):
        keywords = str(keywords).lower().split()
    elif keywords is not None:
        keywords = [ensure_unicode(k).lower() for k in keywords]
    else:
        keywords = None
    def check_words(target, words):
        mywords = target.lower()
        result = not words or mywords
        for w in words:
            result = result and w in mywords
        return result
    if type_debian():
        try:
            r = listcontrol_to_dict('dpkg-query -s','\nPackage: ')
        except:
            r = listcontrol_to_dict('/var/lib/dpkg/status','\nPackage: ')
        dict_info_test = {}
        for fileinfotest in glob.glob("/var/lib/dpkg/info/*.list"):
             dict_info_test[fileinfotest.rsplit(':',1)[0] + '.list'] = fileinfotest
        for pkg in r:
            path_dpkg_info = "/var/lib/dpkg/info/"
            if pkg['status'] == "install ok installed" and (not ignore_empty_names or pkg.get('package', '') != '') and (
                (name_re is None or name_re.match(pkg['package'])) and
                (keywords is None or check_words(' '.join([pkg['package'], pkg['version'] , pkg.get('homepage', '') ]), keywords))):
                path_dpkg_info_full = path_dpkg_info + pkg['package'] +'.list'
                if not os.path.isfile(path_dpkg_info_full):
                    if path_dpkg_info_full in dict_info_test:
                        path_dpkg_info_full = dict_info_test[path_dpkg_info_full]
                try:
                    install_date = os.path.getctime(path_dpkg_info_full)
                    install_date = datetime.datetime.fromtimestamp(install_date).strftime('%Y-%m-%d %H:%M:%S')
                except:
                    install_date = ''
                list_installed_softwares.append({
                    'key': pkg['package'],
                    'name': pkg['package'],
                    'version': pkg['version'],
                    'install_date': install_date,
                    'install_location': '',
                    'uninstall_string': '',
                    'publisher': pkg.get('homepage',''),
                    'system_component': ''})
    elif type_redhat():
        for header in listcontrol_to_dict('LANG="en_US.UTF-8" rpm -qai','Name        : '):
            if (not ignore_empty_names or header.get('name', '') != '') and (
                (name_re is None or name_re.match(ensure_unicode(header['name']))) and
                (keywords is None or check_words(' '.join([ensure_unicode(header['name']), ensure_unicode(header.get('url', ''))]), keywords))):
                with setlocale('C'):
                    try:
                        dateformat = datetime.datetime.strptime(header['install date'],'%a %b %d %H:%M:%S %Y').strftime('%Y-%m-%d %H:%M:%S')
                    except ValueError:
                        dateformat = datetime.datetime.strptime(header['install date'].rsplit(' ',1)[0],'%a %d %b %Y %H:%M:%S %p').strftime('%Y-%m-%d %H:%M:%S')
                list_installed_softwares.append({
                    'key': '%s_%s' % (ensure_unicode(header['name']),ensure_unicode(header['version'])),
                    'name': ensure_unicode(header['name']),
                    'version': ensure_unicode(header['version']),
                    'install_date': dateformat,
                    'install_location': '',
                    'uninstall_string': '',
                    'publisher': ensure_unicode(header.get('url','')),
                    'system_component': ''})
    else:
        list_installed_softwares.append({'key': 'Distribution not supported yet', 'name': 'Distribution not supported yet', 'version': 'Distribution not supported yet', 'install_date': 'Distribution not supported yet',
                                         'install_location': 'Distribution not supported yet', 'uninstall_string': 'Distribution not supported yet', 'publisher': 'Distribution not supported yet', 'system_component': 'Distribution not supported yet'})
    try:
        for snap in get_snap_softwares():
            if (not ignore_empty_names or snap['name'] != '') and (
                (name_re is None or name_re.match(snap['name'])) and
                (keywords is None or check_words(snap['name']), keywords)):
                list_installed_softwares.append(snap)
    except Exception as e:
        logger.warning('Error with snap list %s' % e)
        pass
    return list_installed_softwares 
def get_snap_softwares():
          list_installed_softwares=[]
          cmd = "snap list --color=never --unicode=always"
          proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True, universal_newlines=True)
          stdout, stderr = proc.communicate()
          retcode = proc.returncode
          if retcode == 0:
             i=0
             for line in stdout.split("\n"):
                  # Skip header
                  if i==0:
                      i+=1
                      continue
                  else:
                      i+=1
                      package=line.split()
                      if package != []:
                          snap = {}
                          snap['name'] = package[0]
                          snap['version'] = package[1]
                          snap['revision'] = package[2]
                          snap['tracking'] = package[3]
                          snap['publisher'] = 'snap/' + package[4]
                          snap['notes'] = package[5]
                          if  os.path.isfile(os.path.join("/var/lib/snapd/snaps/", package[0] + "_" + package[2] + ".snap")):
                              snap['install_path'] = os.path.join("/var/lib/snapd/snaps/", package[0] + "_" + package[2] + ".snap")
                              snap['install_date'] = os.path.getctime(snap['install_path'])
                              snap['install_date'] = datetime.datetime.fromtimestamp(snap['install_date']).strftime('%Y-%m-%d %H:%M:%S')
                          else:
                              snap['install_path'] = ''
                              snap['install_date'] = ''
                          list_installed_softwares.append({
                              'key':'snap_' + snap['name'],
                              'name':snap['name'],
 'version':snap['version']+"-"+snap['revision'],
 'install_date':snap['install_date'],
 'install_location':snap['install_path'],
                              'uninstall_string':"snap",
                              'publisher':snap['publisher'],
                              'system_component':''})
          return list_installed_softwares
def uninstall_key_exists(uninstallkey):
    uninstallkey = uninstallkey.lower()
    for software in installed_softwares():
        if software['name'].lower()==uninstallkey:
            return True
    return False
[docs]def install_apt(package, allow_unauthenticated=False):
    """
    Install package from APT repositories
    """
    update_apt()
    if allow_unauthenticated:
        return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get install -y --allow-unauthenticated %s' % package)
    else:
        return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get install -y %s' % package) 
[docs]def uninstall_apt(package, autoremove=False):
    """
    Remove package with APT
    """
    if autoremove:
        return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get remove --autoremove -y %s' % package)
    else:
        return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get remove -y %s' % package) 
[docs]def install_deb(path_to_deb):
    """
    Install .deb package from file
    """
    try:
        return run('LANG=C DEBIAN_FRONTEND=noninteractive dpkg -i %s' % path_to_deb)
    except:
        return install_required_dependencies_apt() 
[docs]def install_package_if_needed(package_name):
    if not uninstall_key_exists(package_name):
        if type_debian():
            return install_apt(package_name)
        if type_redhat():
            return install_yum(package_name)
        else:
            raise Exception('not supported') 
[docs]def purge_deb(deb_name):
    return run('LANG=C DEBIAN_FRONTEND=noninteractive dpkg --purge %s' % deb_name) 
[docs]def install_required_dependencies_apt():
    return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -f -y install') 
[docs]def autoremove_apt():
    return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y autoremove') 
[docs]def install_yum(package):
    return run('LANG=C yum install -y %s' % (package)) 
def install_dnf(package):
    return install_yum(package)
[docs]def uninstall_yum(package):
    return run('LANG=C yum remove -y %s' % package) 
def uninstall_dnf(package):
    return uninstall_yum(package)
[docs]def autoremove_yum():
    return run('LANG=C yum autoremove -y') 
def autoremove_dnf():
    return autoremove_yum()
[docs]def update_apt():
    return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y update') 
[docs]def upgrade_apt():
    return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y upgrade') 
[docs]def update_yum():
    return run('LANG=C yum update -y') 
def update_dnf():
    return update_yum()
[docs]def upgrade_yum():
    return run('LANG=C yum upgrade -y') 
def upgrade_dnf():
    return upgrade_yum()
[docs]def install_rpm(package):
    return run('LANG=C yum localinstall -y %s' % (package)) 
[docs]def systemd_start_service(servicename):
    return run('LANG=C systemctl start %s' % (servicename)) 
[docs]def systemd_stop_service(servicename):
    return run('LANG=C systemctl stop %s' % (servicename)) 
[docs]def systemd_restart_service(servicename):
    return run('LANG=C systemctl restart %s' % (servicename)) 
[docs]def systemd_status_service(servicename):
    return run('LANG=C systemctl status %s' % (servicename)) 
[docs]def systemd_enable_start_service(servicename):
    return run('LANG=C systemctl enable --now %s' % (servicename)) 
[docs]def systemd_enable_service(servicename):
    return run('LANG=C systemctl enable %s' % (servicename)) 
[docs]def systemd_disable_service(servicename):
    return run('LANG=C systemctl disable %s' % (servicename)) 
[docs]def systemd_daemon_reload():
    return run('LANG=C systemctl daemon-reload') 
[docs]def service_list():
    result = {}
    systemd_list =run('LANG=C systemctl list-units')
    units = fixed_columns_to_dicts(systemd_list)
    for unit in units:
        if unit['UNIT'].endswith('.service'):
            result[unit['UNIT']] = unit
    return result 
[docs]def running_on_ac():
    t = psutil.sensors_battery()
    if not t:
        return True
    return t.power_plugged