breadpunk
/
breadpack
Archived
2
0
Fork 0
This repository has been archived on 2023-02-23. You can view files and clone it, but cannot push or open issues or pull requests.
breadpack/breadpack.py

301 lines
8.3 KiB
Python
Raw Permalink Normal View History

2020-04-10 04:46:08 +00:00
#!/usr/bin/env python3
from abc import ABC, ABCMeta, abstractmethod
2020-04-10 04:46:08 +00:00
from datetime import datetime, timezone
from enum import Enum
2020-04-10 04:46:08 +00:00
from pathlib import Path
from typing import Optional
import argparse
import logging
import os
import pwd
import sys
2020-04-10 04:46:08 +00:00
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s',
)
logger = logging.getLogger(__name__)
def _get_username() -> str:
2020-04-10 04:46:08 +00:00
return pwd.getpwuid(os.getuid()).pw_name
# The recfile descriptor for package requests.
# Used if Breadpack generates the file from scratch.
REQUEST_DESCRIPTOR: str = """
2020-04-10 04:46:08 +00:00
%rec: PackageRequest
%type: Date date
%type: Processed bool
%type: User line
%type: Package line
%mandatory: Package
%allowed: Date User Processed Package Comment
%sort: Processed Date
%doc: Package requests made via breadpack
""".strip()
class NotInstalled(Exception):
"""
This exception should be raised by local package version checking
if it determines the package is not currently installed.
This allows detecting uninstalled packages.
"""
_package_checkers = {}
class PackageCheckerMetaclass(type):
def __new__(cls, *args, register: bool = True, package_type: str = None, **kwargs):
newclass = super().__new__(cls, *args, **kwargs)
if register:
assert isinstance(package_type, str), 'Package type is required'
_package_checkers[package_type] = newclass
return newclass
class PackageCheckerABC(PackageCheckerMetaclass, ABCMeta): ...
class PackageChecker(metaclass=PackageCheckerABC, register=False):
@classmethod
@abstractmethod
def get_current_version(cls, package_data: dict) -> str: ...
@classmethod
@abstractmethod
def get_latest_version(cls, package_data: dict) -> str: ...
class APTPackageChecker(PackageChecker, package_type='apt'):
_apt_cache = None
@classmethod
def _get_package(cls, package_data: dict):
if not cls._apt_cache:
import apt
cls._apt_cache = apt.Cache()
package_name = package_data.get('apt_name', package_data['name'])
return cls._apt_cache[package_name]
@classmethod
def get_current_version(cls, package_data: dict) -> str:
package = cls._get_package(package_data)
if not package.is_installed:
raise NotInstalled
return package.installed.version
@classmethod
def get_latest_version(cls, package_data: dict) -> str:
return cls._get_package(package_data).versions[0].version
class JsonDocument(ABC):
@classmethod
def from_file(cls, path: Path) -> 'JsonDocument':
with path.open() as f:
return cls(**json.load(f))
class PackageFile(JsonDocument):
def __init__(self, version, categories, packages) -> None:
self.version = int(version)
assert self.version == 0, f'Incompatible file version {version} (expected 0)'
assert isinstance(categories, dict), 'categories should be an object'
assert all(isinstance(str, v) for v in categories.values()), \
'categories should map string keys to string descriptions'
self.categories = categories
assert isinstance(packages, list), 'packages should be a list'
assert all(isinstance(package, dict) for package in packages)
self.packages = packages
def pretty_print_package(package_data: dict):
print('{name} ({type}) - current: {current_version}, latest: {latest_version}'.format(**package_data))
###############################################################################
# COMMANDS
###############################################################################
2020-04-10 04:46:08 +00:00
def request_subcommand(
requestsfile: Path,
package_name: str,
comment: Optional[str] = None,
**kwargs) -> int:
2020-04-10 04:46:08 +00:00
if not requestsfile.exists():
logger.warning(f'Creating file {requestsfile}')
with requestsfile.open('w') as f:
f.write(REQUEST_DESCRIPTOR)
data = {
'Date': datetime.now(timezone.utc).isoformat(),
'User': _get_username(),
'Processed': 'no',
'Package': package_name,
}
if comment:
data['Comment'] = comment
with requestsfile.open('a') as f:
f.write('\n\n' + '\n'.join([f'{k}: {v}' for k, v in data.items()]))
logger.info(f'Your request for {package_name!r} has been sent!')
return 0
2020-04-10 04:46:08 +00:00
def list_subcommand(json: bool = False, upgradable: bool = False, lockfile: Path = None, **kwargs) -> int:
if not lockfile.is_file():
logger.error(f'{lockfile} not found. Ask an admin to run `breadpack check` '
'or ensure that you have read access to this file.')
return 1
packages = PackageFile.from_file(lockfile).packages
if upgradable:
packages = [package for package in packages
if package.get('latest_version')
and package['latest_version'] != package['current_version']]
if json:
print(json.dumps(packages))
else:
for package in packages:
pretty_print_package(package)
return 0
2020-04-10 04:46:08 +00:00
def lint_subcommand(file: Path = None, lockfile: Path = None, **kwargs) -> int:
errored = False
for path in (file, lockfile):
logger.info(f'Linting {path}')
try:
PackageFile.from_file(path)
except Exception as e:
logger.error(f'Failed reading file {path}: {e}')
errored = True
return errored
2020-04-10 04:46:08 +00:00
def check_subcommand(json: bool = False, save: bool = True, **kwargs) -> int:
2020-04-10 04:46:08 +00:00
raise NotImplementedError
def lock_subcommand(**kwargs) -> int:
2020-04-10 04:46:08 +00:00
raise NotImplementedError
def main() -> int:
2020-04-10 04:46:08 +00:00
parser = argparse.ArgumentParser(
description='Breadpunk.club meta-package manager'
)
parser.add_argument(
'--file',
type=Path,
help='Path to the breadpack.json file.',
default=Path('/bread/breadpack.json'),
)
parser.add_argument(
'--lockfile',
type=Path,
help='Path to the breadpack-lock.json file.',
default=Path('/bread/breadpack-lock.json'),
)
parser.add_argument(
'--requestsfile',
type=Path,
help='Path to the breadpack-requests.rec file.',
default=Path('/bread/breadpack-requests.rec'),
)
def nocommand(**kwargs) -> None:
2020-04-10 04:46:08 +00:00
parser.error('A subcommand is required.')
parser.set_defaults(func=nocommand)
subparsers = parser.add_subparsers(
title='subcommands',
)
request_parser = subparsers.add_parser(
'request',
help='Request for a new package to be installed.',
)
request_parser.add_argument(
'package_name',
help='Name of the requested package.',
)
request_parser.add_argument(
'-c', '--comment',
help='Optional comment to add notes '
'for the admin processing your request.',
default=None,
)
request_parser.set_defaults(func=request_subcommand)
list_parser = subparsers.add_parser(
'list',
help='List installed packages.',
)
list_parser.add_argument(
'--json',
help='Use JSON output.',
action='store_true',
default=False,
)
list_parser.add_argument(
'--upgradable',
help='Only list packages with available updates.',
action='store_true',
default=False,
)
list_parser.set_defaults(func=list_subcommand)
lint_parser = subparsers.add_parser(
'lint',
help='Check the syntax of the packages file and lockfile.',
)
lint_parser.set_defaults(func=lint_subcommand)
lock_parser = subparsers.add_parser(
'lock',
help='Regenerate the lockfile from scratch.',
)
lock_parser.set_defaults(func=lock_subcommand)
check_parser = subparsers.add_parser(
'check',
help='Check for available updates.',
)
check_parser.add_argument(
'--json',
help='Use JSON output.',
action='store_true',
default=False,
)
check_parser.add_argument(
'--save',
help='Update the lockfile. This is the default.',
action='store_true',
default=True,
)
check_parser.add_argument(
'--no-save',
help='Do not update the lockfile.',
action='store_false',
dest='save',
)
args = vars(parser.parse_args())
return args.pop('func', nocommand)(**args)
2020-04-10 04:46:08 +00:00
if __name__ == '__main__':
sys.exit(main())