#!/usr/bin/env python3 from abc import ABC, ABCMeta, abstractmethod from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Optional import argparse import logging import os import pwd import sys logging.basicConfig( level=logging.INFO, format='%(levelname)s: %(message)s', ) logger = logging.getLogger(__name__) def _get_username() -> str: return pwd.getpwuid(os.getuid()).pw_name # The recfile descriptor for package requests. # Used if Breadpack generates the file from scratch. REQUEST_DESCRIPTOR: str = """ %rec: PackageRequest %type: Date date %type: Processed bool %type: User line %type: Package line %mandatory: Package %allowed: Date User Processed Package Comment %sort: Processed Date %doc: Package requests made via breadpack """.strip() class NotInstalled(Exception): """ This exception should be raised by local package version checking if it determines the package is not currently installed. This allows detecting uninstalled packages. """ _package_checkers = {} class PackageCheckerMetaclass(type): def __new__(cls, *args, register: bool = True, package_type: str = None, **kwargs): newclass = super().__new__(cls, *args, **kwargs) if register: assert isinstance(package_type, str), 'Package type is required' _package_checkers[package_type] = newclass return newclass class PackageCheckerABC(PackageCheckerMetaclass, ABCMeta): ... class PackageChecker(metaclass=PackageCheckerABC, register=False): @classmethod @abstractmethod def get_current_version(cls, package_data: dict) -> str: ... @classmethod @abstractmethod def get_latest_version(cls, package_data: dict) -> str: ... class APTPackageChecker(PackageChecker, package_type='apt'): _apt_cache = None @classmethod def _get_package(cls, package_data: dict): if not cls._apt_cache: import apt cls._apt_cache = apt.Cache() package_name = package_data.get('apt_name', package_data['name']) return cls._apt_cache[package_name] @classmethod def get_current_version(cls, package_data: dict) -> str: package = cls._get_package(package_data) if not package.is_installed: raise NotInstalled return package.installed.version @classmethod def get_latest_version(cls, package_data: dict) -> str: return cls._get_package(package_data).versions[0].version class JsonDocument(ABC): @classmethod def from_file(cls, path: Path) -> 'JsonDocument': with path.open() as f: return cls(**json.load(f)) class PackageFile(JsonDocument): def __init__(self, version, categories, packages) -> None: self.version = int(version) assert self.version == 0, f'Incompatible file version {version} (expected 0)' assert isinstance(categories, dict), 'categories should be an object' assert all(isinstance(str, v) for v in categories.values()), \ 'categories should map string keys to string descriptions' self.categories = categories assert isinstance(packages, list), 'packages should be a list' assert all(isinstance(package, dict) for package in packages) self.packages = packages def pretty_print_package(package_data: dict): print('{name} ({type}) - current: {current_version}, latest: {latest_version}'.format(**package_data)) ############################################################################### # COMMANDS ############################################################################### def request_subcommand( requestsfile: Path, package_name: str, comment: Optional[str] = None, **kwargs) -> int: if not requestsfile.exists(): logger.warning(f'Creating file {requestsfile}') with requestsfile.open('w') as f: f.write(REQUEST_DESCRIPTOR) data = { 'Date': datetime.now(timezone.utc).isoformat(), 'User': _get_username(), 'Processed': 'no', 'Package': package_name, } if comment: data['Comment'] = comment with requestsfile.open('a') as f: f.write('\n\n' + '\n'.join([f'{k}: {v}' for k, v in data.items()])) logger.info(f'Your request for {package_name!r} has been sent!') return 0 def list_subcommand(json: bool = False, upgradable: bool = False, lockfile: Path = None, **kwargs) -> int: if not lockfile.is_file(): logger.error(f'{lockfile} not found. Ask an admin to run `breadpack check` ' 'or ensure that you have read access to this file.') return 1 packages = PackageFile.from_file(lockfile).packages if upgradable: packages = [package for package in packages if package.get('latest_version') and package['latest_version'] != package['current_version']] if json: print(json.dumps(packages)) else: for package in packages: pretty_print_package(package) return 0 def lint_subcommand(file: Path = None, lockfile: Path = None, **kwargs) -> int: errored = False for path in (file, lockfile): logger.info(f'Linting {path}…') try: PackageFile.from_file(path) except Exception as e: logger.error(f'Failed reading file {path}: {e}') errored = True return errored def check_subcommand(json: bool = False, save: bool = True, **kwargs) -> int: raise NotImplementedError def lock_subcommand(**kwargs) -> int: raise NotImplementedError def main() -> int: parser = argparse.ArgumentParser( description='Breadpunk.club meta-package manager' ) parser.add_argument( '--file', type=Path, help='Path to the breadpack.json file.', default=Path('/bread/breadpack.json'), ) parser.add_argument( '--lockfile', type=Path, help='Path to the breadpack-lock.json file.', default=Path('/bread/breadpack-lock.json'), ) parser.add_argument( '--requestsfile', type=Path, help='Path to the breadpack-requests.rec file.', default=Path('/bread/breadpack-requests.rec'), ) def nocommand(**kwargs) -> None: parser.error('A subcommand is required.') parser.set_defaults(func=nocommand) subparsers = parser.add_subparsers( title='subcommands', ) request_parser = subparsers.add_parser( 'request', help='Request for a new package to be installed.', ) request_parser.add_argument( 'package_name', help='Name of the requested package.', ) request_parser.add_argument( '-c', '--comment', help='Optional comment to add notes ' 'for the admin processing your request.', default=None, ) request_parser.set_defaults(func=request_subcommand) list_parser = subparsers.add_parser( 'list', help='List installed packages.', ) list_parser.add_argument( '--json', help='Use JSON output.', action='store_true', default=False, ) list_parser.add_argument( '--upgradable', help='Only list packages with available updates.', action='store_true', default=False, ) list_parser.set_defaults(func=list_subcommand) lint_parser = subparsers.add_parser( 'lint', help='Check the syntax of the packages file and lockfile.', ) lint_parser.set_defaults(func=lint_subcommand) lock_parser = subparsers.add_parser( 'lock', help='Regenerate the lockfile from scratch.', ) lock_parser.set_defaults(func=lock_subcommand) check_parser = subparsers.add_parser( 'check', help='Check for available updates.', ) check_parser.add_argument( '--json', help='Use JSON output.', action='store_true', default=False, ) check_parser.add_argument( '--save', help='Update the lockfile. This is the default.', action='store_true', default=True, ) check_parser.add_argument( '--no-save', help='Do not update the lockfile.', action='store_false', dest='save', ) args = vars(parser.parse_args()) return args.pop('func', nocommand)(**args) if __name__ == '__main__': sys.exit(main())