Source code for setuphelpers_linux

#!/usr/bin/env python3
##
## -----------------------------------------------------------------
##    This file is part of WAPT Software Deployment
##    Copyright (C) 2012 - 2023  Tranquil IT https://www.tranquil.it
##    All Rights Reserved.
##
##    WAPT helps systems administrators to efficiently deploy
##    setup, update and configure applications.
## ------------------------------------------------------------------
##
import os
import socket
import platform
import subprocess
import logging
import glob
import datetime
import re
import locale
import threading
import grp
import psutil

from setuphelpers_unix import host_info_common_unix,dmi_info_common_unix
from waptutils import isfile,fixed_columns_to_dicts,ensure_unicode,run
from contextlib import contextmanager

LOCALE_LOCK = threading.Lock()

@contextmanager
def setlocale(name):
    with LOCALE_LOCK:
        saved = locale.setlocale(locale.LC_ALL)
        try:
            yield locale.setlocale(locale.LC_ALL, name)
        finally:
            locale.setlocale(locale.LC_ALL, saved)

try:
    import distro
    linux_distribution = distro.linux_distribution(False)
except ImportError:
    linux_distribution = (None, None, None)

logger = logging.getLogger('waptcore')

[docs]def is_linux64(): return platform.machine().endswith('64')
isLinux64 = is_linux64
[docs]def get_distrib_version(): return linux_distribution[1]
[docs]def get_os_version(): return get_distrib_version()
[docs]def get_distrib_linux(): return linux_distribution[0]
[docs]def get_code_name_version(): return linux_distribution[2]
[docs]def get_hostname(): try: return socket.getfqdn().lower() except: return ""
[docs]def type_debian(): return linux_distribution is not None and linux_distribution[0].lower() in ('debian', 'ubuntu', 'linuxmint', 'raspbian')
is_debian_based=type_debian
[docs]def is_debian(): return linux_distribution is not None and linux_distribution[0].lower() == 'debian'
[docs]def get_debian_version(): if isfile('/etc/debian_version'): with open('/etc/debian_version') as f: debian_version = f.read().strip() return debian_version
[docs]def type_redhat(): return linux_distribution is not None and linux_distribution[0].lower() in ('rhel', 'centos', 'fedora', 'oracle','almalinux','rocky' )
def type_rhel(): return linux_distribution is not None and linux_distribution[0].lower() in ('rhel', 'centos', 'oracle','almalinux','rocky') is_rhel_based=type_rhel is_redhat_based=type_redhat def local_users(): result = [] for u in [entry.split(':',1) for entry in open('/etc/passwd').read().splitlines]: if not u[0]: continue result.append(u[0]) return result
[docs]def host_info(): info = host_info_common_unix() try: dmi = dmi_info() info['system_manufacturer'] = dmi['System_Information']['Manufacturer'] info['system_productname'] = dmi['System_Information']['Product_Name'] except: logger.warning('Error while running dmidecode, dmidecode needs root privileges') pass info['os_release_name'] = get_code_name_version().lower() if info['os_release_name'] == 'core': info['os_release_name'] = get_distrib_linux() + ' ' + get_distrib_version() info['platform'] = platform.system() info['os_name'] = get_distrib_linux() info['os_version'] = get_distrib_version() if is_debian(): info['debian_version'] = get_debian_version() if type_debian(): info['reboot_needed'] = isfile('/var/run/reboot-required') info['reboot-required.pkgs'] = [] if isfile('/var/run/reboot-required.pkgs'): with open('/var/run/reboot-required.pkgs','r') as f: info['reboot-required.pkgs'] = f.read().split('\n') info['linux64'] = isLinux64() info['local_groups'] = {group.split(':')[0]:[m for m in group.split(':')[3].split(',') if m != '' ] for group in open('/etc/group').read().split('\n') if group.split(':')[0] != ''} info['local_users'] = [] for u in [entry.split(':') for entry in open('/etc/passwd').read().split('\n')]: if not u[0]: continue info['local_users'].append(u[0]) gr_struct = grp.getgrgid(u[3]) if gr_struct.gr_name in info['local_groups']: if u[0] not in info['local_groups'][gr_struct.gr_name]: info['local_groups'][gr_struct.gr_name].append(u[0]) return info
[docs]def dmi_info(): return dmi_info_common_unix()
def listcontrol_to_dict(cmd=None, first_entry=None): if os.path.isfile(cmd): with open(cmd,'r') as f : list_package = '\n' + f.read() else: list_package = '\n' + run(cmd) list_control=[] for i in list_package.split(first_entry): if not i.strip('\n'): continue list_control.append(first_entry.replace('\n','') + i) list_package = [] for pkg in list_control: entry = {} gpg = False for l in pkg.split('\n'): if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in l: gpg = True continue if '-----END PGP PUBLIC KEY BLOCK-----' in l: gpg = False continue if gpg : continue if not l: continue if (':' in l) and (not '::' in l) and (not l.startswith(' ')) : key = l.split(':',1)[0].strip().lower() value = l.split(':',1)[1].strip() entry[key] = value.strip('\n') else: entry[key] = entry[key].strip('\n') + '\n' + l list_package.append(entry) return list_package
[docs]def installed_softwares(keywords=None, name=None, ignore_empty_names=True): """ Return list of installed software from apt or rpm Args: keywords (str or list): string to lookup in key, display_name or publisher fields Returns: list of dicts: [{'key', 'name', 'version', 'install_date', 'install_location' 'uninstall_string', 'publisher','system_component'}] """ name_re = re.compile(name) if name is not None else None list_installed_softwares = [] if isinstance(keywords, str): keywords = keywords.lower().split() elif isinstance(keywords, bytes): keywords = str(keywords).lower().split() elif keywords is not None: keywords = [ensure_unicode(k).lower() for k in keywords] else: keywords = None def check_words(target, words): mywords = target.lower() result = not words or mywords for w in words: result = result and w in mywords return result if type_debian(): try: r = listcontrol_to_dict('dpkg-query -s','\nPackage: ') except: r = listcontrol_to_dict('/var/lib/dpkg/status','\nPackage: ') dict_info_test = {} for fileinfotest in glob.glob("/var/lib/dpkg/info/*.list"): dict_info_test[fileinfotest.rsplit(':',1)[0] + '.list'] = fileinfotest for pkg in r: path_dpkg_info = "/var/lib/dpkg/info/" if pkg['status'] == "install ok installed" and (not ignore_empty_names or pkg.get('package', '') != '') and ( (name_re is None or name_re.match(pkg['package'])) and (keywords is None or check_words(' '.join([pkg['package'], pkg['version'] , pkg.get('homepage', '') ]), keywords))): path_dpkg_info_full = path_dpkg_info + pkg['package'] +'.list' if not os.path.isfile(path_dpkg_info_full): if path_dpkg_info_full in dict_info_test: path_dpkg_info_full = dict_info_test[path_dpkg_info_full] try: install_date = os.path.getctime(path_dpkg_info_full) install_date = datetime.datetime.fromtimestamp(install_date).strftime('%Y-%m-%d %H:%M:%S') except: install_date = '' list_installed_softwares.append({ 'key': pkg['package'], 'name': pkg['package'], 'version': pkg['version'], 'install_date': install_date, 'install_location': '', 'uninstall_string': '', 'publisher': pkg.get('homepage',''), 'system_component': ''}) elif type_redhat(): for header in listcontrol_to_dict('LANG="en_US.UTF-8" rpm -qai','Name : '): if (not ignore_empty_names or header.get('name', '') != '') and ( (name_re is None or name_re.match(ensure_unicode(header['name']))) and (keywords is None or check_words(' '.join([ensure_unicode(header['name']), ensure_unicode(header.get('url', ''))]), keywords))): with setlocale('C'): try: dateformat = datetime.datetime.strptime(header['install date'],'%a %b %d %H:%M:%S %Y').strftime('%Y-%m-%d %H:%M:%S') except ValueError: dateformat = datetime.datetime.strptime(header['install date'].rsplit(' ',1)[0],'%a %d %b %Y %H:%M:%S %p').strftime('%Y-%m-%d %H:%M:%S') list_installed_softwares.append({ 'key': '%s_%s' % (ensure_unicode(header['name']),ensure_unicode(header['version'])), 'name': ensure_unicode(header['name']), 'version': ensure_unicode(header['version']), 'install_date': dateformat, 'install_location': '', 'uninstall_string': '', 'publisher': ensure_unicode(header.get('url','')), 'system_component': ''}) else: list_installed_softwares.append({'key': 'Distribution not supported yet', 'name': 'Distribution not supported yet', 'version': 'Distribution not supported yet', 'install_date': 'Distribution not supported yet', 'install_location': 'Distribution not supported yet', 'uninstall_string': 'Distribution not supported yet', 'publisher': 'Distribution not supported yet', 'system_component': 'Distribution not supported yet'}) try: for snap in get_snap_softwares(): if (not ignore_empty_names or snap['name'] != '') and ( (name_re is None or name_re.match(snap['name'])) and (keywords is None or check_words(snap['name']), keywords)): list_installed_softwares.append(snap) except Exception as e: logger.warning('Error with snap list %s' % e) pass return list_installed_softwares
def get_snap_softwares(): list_installed_softwares=[] cmd = "snap list --color=never --unicode=always" proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) stdout, stderr = proc.communicate() retcode = proc.returncode if retcode == 0: i=0 for line in stdout.split("\n"): # Skip header if i==0: i+=1 continue else: i+=1 package=line.split() if package != []: snap = {} snap['name'] = package[0] snap['version'] = package[1] snap['revision'] = package[2] snap['tracking'] = package[3] snap['publisher'] = 'snap/' + package[4] snap['notes'] = package[5] if os.path.isfile(os.path.join("/var/lib/snapd/snaps/", package[0] + "_" + package[2] + ".snap")): snap['install_path'] = os.path.join("/var/lib/snapd/snaps/", package[0] + "_" + package[2] + ".snap") snap['install_date'] = os.path.getctime(snap['install_path']) snap['install_date'] = datetime.datetime.fromtimestamp(snap['install_date']).strftime('%Y-%m-%d %H:%M:%S') else: snap['install_path'] = '' snap['install_date'] = '' list_installed_softwares.append({ 'key':'snap_' + snap['name'], 'name':snap['name'], 'version':snap['version']+"-"+snap['revision'], 'install_date':snap['install_date'], 'install_location':snap['install_path'], 'uninstall_string':"snap", 'publisher':snap['publisher'], 'system_component':''}) return list_installed_softwares def uninstall_key_exists(uninstallkey): uninstallkey = uninstallkey.lower() for software in installed_softwares(): if software['name'].lower()==uninstallkey: return True return False
[docs]def install_apt(package, allow_unauthenticated=False): """ Install package from APT repositories """ update_apt() if allow_unauthenticated: return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get install -y --allow-unauthenticated %s' % package) else: return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get install -y %s' % package)
[docs]def uninstall_apt(package, autoremove=False): """ Remove package with APT """ if autoremove: return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get remove --autoremove -y %s' % package) else: return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get remove -y %s' % package)
[docs]def install_deb(path_to_deb): """ Install .deb package from file """ try: return run('LANG=C DEBIAN_FRONTEND=noninteractive dpkg -i %s' % path_to_deb) except: return install_required_dependencies_apt()
[docs]def install_package_if_needed(package_name): if not uninstall_key_exists(package_name): if type_debian(): return install_apt(package_name) if type_redhat(): return install_yum(package_name) else: raise Exception('not supported')
[docs]def purge_deb(deb_name): return run('LANG=C DEBIAN_FRONTEND=noninteractive dpkg --purge %s' % deb_name)
[docs]def install_required_dependencies_apt(): return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -f -y install')
[docs]def autoremove_apt(): return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y autoremove')
[docs]def install_yum(package): return run('LANG=C yum install -y %s' % (package))
def install_dnf(package): return install_yum(package)
[docs]def uninstall_yum(package): return run('LANG=C yum remove -y %s' % package)
def uninstall_dnf(package): return uninstall_yum(package)
[docs]def autoremove_yum(): return run('LANG=C yum autoremove -y')
def autoremove_dnf(): return autoremove_yum()
[docs]def update_apt(): return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y update')
[docs]def upgrade_apt(): return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y upgrade')
[docs]def update_yum(): return run('LANG=C yum update -y')
def update_dnf(): return update_yum()
[docs]def upgrade_yum(): return run('LANG=C yum upgrade -y')
def upgrade_dnf(): return upgrade_yum()
[docs]def install_rpm(package): return run('LANG=C yum localinstall -y %s' % (package))
[docs]def systemd_start_service(servicename): return run('LANG=C systemctl start %s' % (servicename))
[docs]def systemd_stop_service(servicename): return run('LANG=C systemctl stop %s' % (servicename))
[docs]def systemd_restart_service(servicename): return run('LANG=C systemctl restart %s' % (servicename))
[docs]def systemd_status_service(servicename): return run('LANG=C systemctl status %s' % (servicename))
[docs]def systemd_enable_start_service(servicename): return run('LANG=C systemctl enable --now %s' % (servicename))
[docs]def systemd_enable_service(servicename): return run('LANG=C systemctl enable %s' % (servicename))
[docs]def systemd_disable_service(servicename): return run('LANG=C systemctl disable %s' % (servicename))
[docs]def systemd_daemon_reload(): return run('LANG=C systemctl daemon-reload')
[docs]def service_list(): result = {} systemd_list =run('LANG=C systemctl systemctl list-units') units = fixed_columns_to_dicts(systemd_list) for unit in units: if unit['UNIT'].endswith('.service'): result[unit['UNIT']] = unit return result
[docs]def running_on_ac(): t = psutil.sensors_battery() if not t: return True return t.power_plugged