#!/usr/bin/env python3
##
## -----------------------------------------------------------------
## This file is part of WAPT Software Deployment
## Copyright (C) 2012 - 2022 Tranquil IT https://www.tranquil.it
## All Rights Reserved.
##
## WAPT helps systems administrators to efficiently deploy
## setup, update and configure applications.
## ------------------------------------------------------------------
##
import os
import socket
import struct
import getpass
import platform
import psutil
import netifaces
import json
import sys
import subprocess
import logging
import glob
import datetime
import re
import pathlib
from setuphelpers_unix import *
from waptutils import Version
try:
import distro
linux_distribution = distro.linux_distribution(False)
except ImportError:
linux_distribution = (None, None, None)
logger = logging.getLogger('waptcore')
[docs]def is_linux64():
return platform.machine().endswith('64')
isLinux64 = is_linux64
[docs]def get_distrib_version():
return linux_distribution[1]
[docs]def get_os_version():
return get_distrib_version()
[docs]def get_distrib_linux():
return linux_distribution[0]
[docs]def get_code_name_version():
return linux_distribution[2]
[docs]def get_hostname():
try:
return socket.getfqdn().lower()
except:
return ""
[docs]def type_debian():
return linux_distribution is not None and linux_distribution[0].lower() in ('debian', 'ubuntu', 'linuxmint', 'raspbian')
is_debian_based=type_debian
[docs]def type_redhat():
return linux_distribution is not None and linux_distribution[0].lower() in ('rhel', 'centos', 'fedora', 'oracle','almalinux','rocky' )
def type_rhel():
return linux_distribution is not None and linux_distribution[0].lower() in ('rhel', 'centos', 'oracle','almalinux','rocky')
is_rhel_based=type_rhel
is_redhat_based=type_redhat
def local_users():
result = []
for u in [entry.split(':',1) for entry in open('/etc/passwd').read().splitlines]:
if not u[0]:
continue
result.append(u[0])
return result
[docs]def host_info():
info = host_info_common_unix()
try:
dmi = dmi_info()
info['system_manufacturer'] = dmi['System_Information']['Manufacturer']
info['system_productname'] = dmi['System_Information']['Product_Name']
except:
logger.warning('Error while running dmidecode, dmidecode needs root privileges')
pass
info['os_release_name'] = get_code_name_version().lower()
if info['os_release_name'] == 'core':
info['os_release_name'] = get_distrib_linux() + ' ' + get_distrib_version()
info['platform'] = platform.system()
info['os_name'] = get_distrib_linux()
info['os_version'] = get_distrib_version()
info['linux64'] = isLinux64()
info['local_groups'] = {group.split(':')[0]:[m for m in group.split(':')[3].split(',') if m != '' ] for group in open('/etc/group').read().split('\n') if group.split(':')[0] != ''}
info['local_users'] = []
for u in [entry.split(':') for entry in open('/etc/passwd').read().split('\n')]:
if not u[0]:
continue
info['local_users'].append(u[0])
gr_struct = grp.getgrgid(u[3])
if gr_struct.gr_name in info['local_groups']:
if u[0] not in info['local_groups'][gr_struct.gr_name]:
info['local_groups'][gr_struct.gr_name].append(u[0])
return info
[docs]def dmi_info():
return dmi_info_common_unix()
def listcontrol_to_dict(cmd=None, first_entry=None):
if os.path.isfile(cmd):
with open(cmd,'r') as f :
list_package = '\n' + f.read()
else:
list_package = '\n' + run(cmd)
list_control=[]
for i in list_package.split(first_entry):
if not i.strip('\n'):
continue
list_control.append(first_entry.replace('\n','') + i)
list_package = []
for pkg in list_control:
entry = {}
gpg = False
for l in pkg.split('\n'):
if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in l:
gpg = True
continue
if '-----END PGP PUBLIC KEY BLOCK-----' in l:
gpg = False
continue
if gpg :
continue
if not l:
continue
if (':' in l) and (not '::' in l) and (not l.startswith(' ')) :
key = l.split(':',1)[0].strip().lower()
value = l.split(':',1)[1].strip()
entry[key] = value.strip('\n')
else:
entry[key] = entry[key].strip('\n') + '\n' + l
list_package.append(entry)
return list_package
[docs]def installed_softwares(keywords=None, name=None, ignore_empty_names=True):
""" Return list of installed software from apt or rpm
Args:
keywords (str or list): string to lookup in key, display_name or publisher fields
Returns:
list of dicts: [{'key', 'name', 'version', 'install_date', 'install_location'
'uninstall_string', 'publisher','system_component'}]
"""
name_re = re.compile(name) if name is not None else None
list_installed_softwares = []
if isinstance(keywords, str):
keywords = keywords.lower().split()
elif isinstance(keywords, bytes):
keywords = str(keywords).lower().split()
elif keywords is not None:
keywords = [ensure_unicode(k).lower() for k in keywords]
else:
keywords = None
def check_words(target, words):
mywords = target.lower()
result = not words or mywords
for w in words:
result = result and w in mywords
return result
if type_debian():
try:
r = listcontrol_to_dict('dpkg-query -s','\nPackage: ')
except:
r = listcontrol_to_dict('/var/lib/dpkg/status','\nPackage: ')
dict_info_test = {}
for fileinfotest in glob.glob("/var/lib/dpkg/info/*.list"):
dict_info_test[fileinfotest.rsplit(':',1)[0] + '.list'] = fileinfotest
for pkg in r:
path_dpkg_info = "/var/lib/dpkg/info/"
if pkg['status'] == "install ok installed" and (not ignore_empty_names or pkg.get('package', '') != '') and (
(name_re is None or name_re.match(pkg['package'])) and
(keywords is None or check_words(' '.join([pkg['package'], pkg['version'] , pkg.get('homepage', '') ]), keywords))):
path_dpkg_info_full = path_dpkg_info + pkg['package'] +'.list'
if not os.path.isfile(path_dpkg_info_full):
if path_dpkg_info_full in dict_info_test:
path_dpkg_info_full = dict_info_test[path_dpkg_info_full]
try:
install_date = os.path.getctime(path_dpkg_info_full)
install_date = datetime.datetime.fromtimestamp(install_date).strftime('%Y-%m-%d %H:%M:%S')
except:
install_date = ''
list_installed_softwares.append({
'key': pkg['package'],
'name': pkg['package'],
'version': pkg['version'],
'install_date': install_date,
'install_location': '',
'uninstall_string': '',
'publisher': pkg.get('homepage',''),
'system_component': ''})
elif type_redhat():
for header in listcontrol_to_dict('rpm -qai','Name : '):
if (not ignore_empty_names or header.get('name', '') != '') and (
(name_re is None or name_re.match(ensure_unicode(header['name']))) and
(keywords is None or check_words(' '.join([ensure_unicode(header['name']), ensure_unicode(header.get('url', ''))]), keywords))):
list_installed_softwares.append({
'key': '%s_%s' % (ensure_unicode(header['name']),ensure_unicode(header['version'])),
'name': ensure_unicode(header['name']),
'version': ensure_unicode(header['version']),
'install_date': datetime.datetime.strptime(header['install date'].rsplit(' ',1)[0],'%a %d %b %Y %H:%M:%S %p').strftime('%Y-%m-%d %H:%M:%S'),
'install_location': '',
'uninstall_string': '',
'publisher': ensure_unicode(header.get('url','')),
'system_component': ''})
else:
list_installed_softwares.append({'key': 'Distribution not supported yet', 'name': 'Distribution not supported yet', 'version': 'Distribution not supported yet', 'install_date': 'Distribution not supported yet',
'install_location': 'Distribution not supported yet', 'uninstall_string': 'Distribution not supported yet', 'publisher': 'Distribution not supported yet', 'system_component': 'Distribution not supported yet'})
try:
for snap in get_snap_softwares():
if (not ignore_empty_names or snap['name'] != '') and (
(name_re is None or name_re.match(snap['name'])) and
(keywords is None or check_words(snap['name']), keywords)):
list_installed_softwares.append(snap)
except Exception as e:
logger.warning('Error with snap list %s' % e)
pass
return list_installed_softwares
def get_snap_softwares():
list_installed_softwares=[]
cmd = "snap list --color=never --unicode=always"
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True, universal_newlines=True)
stdout, stderr = proc.communicate()
retcode = proc.returncode
if retcode == 0:
i=0
for line in stdout.split("\n"):
# Skip header
if i==0:
i+=1
continue
else:
i+=1
package=line.split()
if package != []:
snap = {}
snap['name'] = package[0]
snap['version'] = package[1]
snap['revision'] = package[2]
snap['tracking'] = package[3]
snap['publisher'] = 'snap/' + package[4]
snap['notes'] = package[5]
if os.path.isfile(os.path.join("/var/lib/snapd/snaps/", package[0] + "_" + package[2] + ".snap")):
snap['install_path'] = os.path.join("/var/lib/snapd/snaps/", package[0] + "_" + package[2] + ".snap")
snap['install_date'] = os.path.getctime(snap['install_path'])
snap['install_date'] = datetime.datetime.fromtimestamp(snap['install_date']).strftime('%Y-%m-%d %H:%M:%S')
else:
snap['install_path'] = ''
snap['install_date'] = ''
list_installed_softwares.append({
'key':'snap_' + snap['name'],
'name':snap['name'],
'version':snap['version']+"-"+snap['revision'],
'install_date':snap['install_date'],
'install_location':snap['install_path'],
'uninstall_string':"snap",
'publisher':snap['publisher'],
'system_component':''})
return list_installed_softwares
def uninstall_key_exists(uninstallkey):
uninstallkey = uninstallkey.lower()
for software in installed_softwares():
if software['name'].lower()==uninstallkey:
return True
return False
[docs]def install_apt(package, allow_unauthenticated=False):
"""
Install package from APT repositories
"""
update_apt()
if allow_unauthenticated:
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get install -y --allow-unauthenticated %s' % package)
else:
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get install -y %s' % package)
[docs]def uninstall_apt(package, autoremove=False):
"""
Remove package with APT
"""
if autoremove:
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get remove --autoremove -y %s' % package)
else:
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get remove -y %s' % package)
[docs]def install_deb(path_to_deb):
"""
Install .deb package from file
"""
try:
return run('LANG=C DEBIAN_FRONTEND=noninteractive dpkg -i %s' % path_to_deb)
except:
return install_required_dependencies_apt()
[docs]def install_package_if_needed(package_name):
if not uninstall_key_exists(package_name):
if type_debian():
return install_apt(package_name)
if type_redhat():
return install_yum(package_name)
else:
raise Exception('not supported')
[docs]def purge_deb(deb_name):
return run('LANG=C DEBIAN_FRONTEND=noninteractive dpkg --purge %s' % deb_name)
[docs]def install_required_dependencies_apt():
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -f -y install')
[docs]def autoremove_apt():
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y autoremove')
[docs]def install_yum(package):
return run('LANG=C yum install -y %s' % (package))
def install_dnf(package):
return install_yum(package)
[docs]def uninstall_yum(package):
return run('LANG=C yum remove -y %s' % package)
def uninstall_dnf(package):
return uninstall_yum(package)
[docs]def autoremove_yum():
return run('LANG=C yum autoremove -y')
def autoremove_dnf():
return autoremove_yum()
[docs]def update_apt():
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y update')
[docs]def upgrade_apt():
return run('LANG=C DEBIAN_FRONTEND=noninteractive apt-get -y upgrade')
[docs]def update_yum():
return run('LANG=C yum update -y')
def update_dnf():
return update_yum()
[docs]def upgrade_yum():
return run('LANG=C yum upgrade -y')
def upgrade_dnf():
return upgrade_yum()
[docs]def install_rpm(package):
return run('LANG=C yum localinstall -y %s' % (package))
[docs]def systemd_start_service(servicename):
return run('LANG=C systemctl start %s' % (servicename))
[docs]def systemd_stop_service(servicename):
return run('LANG=C systemctl stop %s' % (servicename))
[docs]def systemd_restart_service(servicename):
return run('LANG=C systemctl restart %s' % (servicename))
[docs]def systemd_status_service(servicename):
return run('LANG=C systemctl status %s' % (servicename))
[docs]def running_on_ac():
try:
if os.path.isfile('/sys/class/power_supply/AC/online'):
return open('/sys/class/power_supply/AC/online','r').read().startswith('1')
else:
return None
except:
return None