breadpunk
/
breadpack
Archived
2
0
Fork 0
This repository has been archived on 2023-02-23. You can view files and clone it, but cannot push or open issues or pull requests.
breadpack/breadpack.py

301 lines
8.3 KiB
Python
Executable File

#!/usr/bin/env python3
from abc import ABC, ABCMeta, abstractmethod
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Optional
import argparse
import logging
import os
import pwd
import sys
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s',
)
logger = logging.getLogger(__name__)
def _get_username() -> str:
return pwd.getpwuid(os.getuid()).pw_name
# The recfile descriptor for package requests.
# Used if Breadpack generates the file from scratch.
REQUEST_DESCRIPTOR: str = """
%rec: PackageRequest
%type: Date date
%type: Processed bool
%type: User line
%type: Package line
%mandatory: Package
%allowed: Date User Processed Package Comment
%sort: Processed Date
%doc: Package requests made via breadpack
""".strip()
class NotInstalled(Exception):
"""
This exception should be raised by local package version checking
if it determines the package is not currently installed.
This allows detecting uninstalled packages.
"""
_package_checkers = {}
class PackageCheckerMetaclass(type):
def __new__(cls, *args, register: bool = True, package_type: str = None, **kwargs):
newclass = super().__new__(cls, *args, **kwargs)
if register:
assert isinstance(package_type, str), 'Package type is required'
_package_checkers[package_type] = newclass
return newclass
class PackageCheckerABC(PackageCheckerMetaclass, ABCMeta): ...
class PackageChecker(metaclass=PackageCheckerABC, register=False):
@classmethod
@abstractmethod
def get_current_version(cls, package_data: dict) -> str: ...
@classmethod
@abstractmethod
def get_latest_version(cls, package_data: dict) -> str: ...
class APTPackageChecker(PackageChecker, package_type='apt'):
_apt_cache = None
@classmethod
def _get_package(cls, package_data: dict):
if not cls._apt_cache:
import apt
cls._apt_cache = apt.Cache()
package_name = package_data.get('apt_name', package_data['name'])
return cls._apt_cache[package_name]
@classmethod
def get_current_version(cls, package_data: dict) -> str:
package = cls._get_package(package_data)
if not package.is_installed:
raise NotInstalled
return package.installed.version
@classmethod
def get_latest_version(cls, package_data: dict) -> str:
return cls._get_package(package_data).versions[0].version
class JsonDocument(ABC):
@classmethod
def from_file(cls, path: Path) -> 'JsonDocument':
with path.open() as f:
return cls(**json.load(f))
class PackageFile(JsonDocument):
def __init__(self, version, categories, packages) -> None:
self.version = int(version)
assert self.version == 0, f'Incompatible file version {version} (expected 0)'
assert isinstance(categories, dict), 'categories should be an object'
assert all(isinstance(str, v) for v in categories.values()), \
'categories should map string keys to string descriptions'
self.categories = categories
assert isinstance(packages, list), 'packages should be a list'
assert all(isinstance(package, dict) for package in packages)
self.packages = packages
def pretty_print_package(package_data: dict):
print('{name} ({type}) - current: {current_version}, latest: {latest_version}'.format(**package_data))
###############################################################################
# COMMANDS
###############################################################################
def request_subcommand(
requestsfile: Path,
package_name: str,
comment: Optional[str] = None,
**kwargs) -> int:
if not requestsfile.exists():
logger.warning(f'Creating file {requestsfile}')
with requestsfile.open('w') as f:
f.write(REQUEST_DESCRIPTOR)
data = {
'Date': datetime.now(timezone.utc).isoformat(),
'User': _get_username(),
'Processed': 'no',
'Package': package_name,
}
if comment:
data['Comment'] = comment
with requestsfile.open('a') as f:
f.write('\n\n' + '\n'.join([f'{k}: {v}' for k, v in data.items()]))
logger.info(f'Your request for {package_name!r} has been sent!')
return 0
def list_subcommand(json: bool = False, upgradable: bool = False, lockfile: Path = None, **kwargs) -> int:
if not lockfile.is_file():
logger.error(f'{lockfile} not found. Ask an admin to run `breadpack check` '
'or ensure that you have read access to this file.')
return 1
packages = PackageFile.from_file(lockfile).packages
if upgradable:
packages = [package for package in packages
if package.get('latest_version')
and package['latest_version'] != package['current_version']]
if json:
print(json.dumps(packages))
else:
for package in packages:
pretty_print_package(package)
return 0
def lint_subcommand(file: Path = None, lockfile: Path = None, **kwargs) -> int:
errored = False
for path in (file, lockfile):
logger.info(f'Linting {path}')
try:
PackageFile.from_file(path)
except Exception as e:
logger.error(f'Failed reading file {path}: {e}')
errored = True
return errored
def check_subcommand(json: bool = False, save: bool = True, **kwargs) -> int:
raise NotImplementedError
def lock_subcommand(**kwargs) -> int:
raise NotImplementedError
def main() -> int:
parser = argparse.ArgumentParser(
description='Breadpunk.club meta-package manager'
)
parser.add_argument(
'--file',
type=Path,
help='Path to the breadpack.json file.',
default=Path('/bread/breadpack.json'),
)
parser.add_argument(
'--lockfile',
type=Path,
help='Path to the breadpack-lock.json file.',
default=Path('/bread/breadpack-lock.json'),
)
parser.add_argument(
'--requestsfile',
type=Path,
help='Path to the breadpack-requests.rec file.',
default=Path('/bread/breadpack-requests.rec'),
)
def nocommand(**kwargs) -> None:
parser.error('A subcommand is required.')
parser.set_defaults(func=nocommand)
subparsers = parser.add_subparsers(
title='subcommands',
)
request_parser = subparsers.add_parser(
'request',
help='Request for a new package to be installed.',
)
request_parser.add_argument(
'package_name',
help='Name of the requested package.',
)
request_parser.add_argument(
'-c', '--comment',
help='Optional comment to add notes '
'for the admin processing your request.',
default=None,
)
request_parser.set_defaults(func=request_subcommand)
list_parser = subparsers.add_parser(
'list',
help='List installed packages.',
)
list_parser.add_argument(
'--json',
help='Use JSON output.',
action='store_true',
default=False,
)
list_parser.add_argument(
'--upgradable',
help='Only list packages with available updates.',
action='store_true',
default=False,
)
list_parser.set_defaults(func=list_subcommand)
lint_parser = subparsers.add_parser(
'lint',
help='Check the syntax of the packages file and lockfile.',
)
lint_parser.set_defaults(func=lint_subcommand)
lock_parser = subparsers.add_parser(
'lock',
help='Regenerate the lockfile from scratch.',
)
lock_parser.set_defaults(func=lock_subcommand)
check_parser = subparsers.add_parser(
'check',
help='Check for available updates.',
)
check_parser.add_argument(
'--json',
help='Use JSON output.',
action='store_true',
default=False,
)
check_parser.add_argument(
'--save',
help='Update the lockfile. This is the default.',
action='store_true',
default=True,
)
check_parser.add_argument(
'--no-save',
help='Do not update the lockfile.',
action='store_false',
dest='save',
)
args = vars(parser.parse_args())
return args.pop('func', nocommand)(**args)
if __name__ == '__main__':
sys.exit(main())