Compare commits
3 Commits
b2219bd9aa
...
ffe962d4a5
Author | SHA1 | Date | |
---|---|---|---|
|
ffe962d4a5 | ||
|
087958b030 | ||
|
c7b7314d3b |
|
@ -8,9 +8,10 @@ Release Notes
|
|||
0.3.0
|
||||
-----
|
||||
|
||||
* New module: ``bugzilla``
|
||||
* **New:** Discover version bump requests on Bugzilla (command: ``bugzilla
|
||||
outdated``).
|
||||
|
||||
* New command: Discover version bump requests.
|
||||
* **New:** Filter results by maintainer.
|
||||
|
||||
* Dependencies introduced:
|
||||
|
||||
|
|
|
@ -16,13 +16,16 @@ from find_work.constants import VERSION
|
|||
|
||||
@click.group(cls=ClickAliasedGroup,
|
||||
context_settings={"help_option_names": ["-h", "--help"]})
|
||||
@click.option("-m", "--maintainer", metavar="EMAIL",
|
||||
help="Filter by package maintainer.")
|
||||
@click.option("-q", "--quiet", is_flag=True,
|
||||
help="Be less verbose.")
|
||||
@click.option("-I", "--installed", is_flag=True,
|
||||
help="Only match installed packages.")
|
||||
@click.version_option(VERSION, "-V", "--version")
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context, quiet: bool, installed: bool) -> None:
|
||||
def cli(ctx: click.Context, maintainer: str | None,
|
||||
quiet: bool, installed: bool) -> None:
|
||||
""" Personal advice utility for Gentoo package maintainers. """
|
||||
|
||||
ctx.ensure_object(Options)
|
||||
|
@ -33,14 +36,16 @@ def cli(ctx: click.Context, quiet: bool, installed: bool) -> None:
|
|||
if any(var in os.environ for var in ["NOCOLOR", "NO_COLOR"]):
|
||||
options.colors = False
|
||||
|
||||
today = date.today().toordinal()
|
||||
options.cache_key += str(today).encode() + b"\0"
|
||||
options.cache_key.feed(date.today().toordinal())
|
||||
if maintainer:
|
||||
options.maintainer = maintainer
|
||||
options.cache_key.feed(maintainer=maintainer)
|
||||
|
||||
|
||||
@cli.group(aliases=["bug", "b"], cls=ClickAliasedGroup)
|
||||
@click.option("-c", "--component",
|
||||
@click.option("-c", "--component", metavar="NAME",
|
||||
help="Component name on Bugzilla.")
|
||||
@click.option("-p", "--product",
|
||||
@click.option("-p", "--product", metavar="NAME",
|
||||
help="Product name on Bugzilla.")
|
||||
@click.option("-t", "--time", is_flag=True,
|
||||
help="Sort bugs by time last modified.")
|
||||
|
@ -51,28 +56,26 @@ def bugzilla(options: Options, component: str | None, product: str | None,
|
|||
|
||||
options.bugzilla.chronological_sort = time
|
||||
|
||||
options.cache_key += b"bugzilla" + b"\0"
|
||||
options.cache_key += b"time:" + b"1" if time else b"0" + b"\0"
|
||||
options.cache_key.feed("bugzilla")
|
||||
options.cache_key.feed(time=time)
|
||||
|
||||
if product:
|
||||
options.bugzilla.product = product
|
||||
options.cache_key += b"product:" + product.encode() + b"\0"
|
||||
options.cache_key.feed(product=product)
|
||||
if component:
|
||||
options.bugzilla.component = component
|
||||
options.cache_key += b"component:" + component.encode() + b"\0"
|
||||
options.cache_key.feed(component=component)
|
||||
|
||||
|
||||
@cli.group(aliases=["rep", "r"], cls=ClickAliasedGroup)
|
||||
@click.option("-r", "--repo", required=True,
|
||||
@click.option("-r", "--repo", metavar="NAME", required=True,
|
||||
help="Repository name on Repology.")
|
||||
@click.pass_obj
|
||||
def repology(options: Options, repo: str) -> None:
|
||||
""" Use Repology to find work. """
|
||||
|
||||
options.repology.repo = repo
|
||||
|
||||
options.cache_key += b"repology" + b"\0"
|
||||
options.cache_key += repo.encode() + b"\0"
|
||||
options.cache_key.feed("repology", repo)
|
||||
|
||||
|
||||
bugzilla.add_command(find_work.cli.bugzilla.outdated, aliases=["out", "o"])
|
||||
|
|
|
@ -11,9 +11,10 @@ from dataclasses import field
|
|||
from typing import Any
|
||||
|
||||
import click
|
||||
|
||||
from pydantic.dataclasses import dataclass
|
||||
|
||||
from find_work.types import CacheKey
|
||||
|
||||
|
||||
class ProgressDots:
|
||||
""" Print a dot to the terminal every second. """
|
||||
|
@ -79,6 +80,9 @@ class Options:
|
|||
# Enable/disable colors.
|
||||
colors: bool | None = None
|
||||
|
||||
# Maintainer email.
|
||||
maintainer: str = ""
|
||||
|
||||
# Enable/disable progress reporting.
|
||||
verbose: bool = True
|
||||
|
||||
|
@ -86,7 +90,7 @@ class Options:
|
|||
only_installed: bool = False
|
||||
|
||||
# Byte string used for creating cache key.
|
||||
cache_key: bytes = b""
|
||||
cache_key: CacheKey = field(default_factory=CacheKey)
|
||||
|
||||
# Subcommand options.
|
||||
repology: RepologyOptions = field(default_factory=RepologyOptions)
|
||||
|
|
|
@ -108,6 +108,7 @@ def _fetch_bump_requests(options: Options) -> list[Bug]:
|
|||
query = bz.build_query(
|
||||
product=options.bugzilla.product or None,
|
||||
component=options.bugzilla.component or None,
|
||||
assigned_to=options.maintainer or None,
|
||||
short_desc="version bump",
|
||||
)
|
||||
query["resolution"] = "---"
|
||||
|
@ -141,7 +142,7 @@ def _collect_bump_requests(data: Iterable[Bug],
|
|||
@click.pass_obj
|
||||
def outdated(options: Options) -> None:
|
||||
""" Find packages with version bump requests on Bugzilla. """
|
||||
options.cache_key += b"outdated" + b"\0"
|
||||
options.cache_key.feed("outdated")
|
||||
dots = ProgressDots(options.verbose)
|
||||
|
||||
options.vecho("Checking for cached data", nl=False, err=True)
|
||||
|
@ -156,6 +157,10 @@ def outdated(options: Options) -> None:
|
|||
options.vecho("Fetching data from Bugzilla API", nl=False, err=True)
|
||||
with dots():
|
||||
data = _fetch_bump_requests(options)
|
||||
if len(data) == 0:
|
||||
options.secho("Hmmm, no data returned. Try again with "
|
||||
"different arguments.", fg="yellow")
|
||||
return
|
||||
options.vecho("Caching data", nl=False, err=True)
|
||||
with dots():
|
||||
json_data = _bugs_to_json(data)
|
||||
|
|
|
@ -25,10 +25,15 @@ from find_work.utils import (
|
|||
)
|
||||
|
||||
|
||||
async def _fetch_outdated(repo: str) -> dict[str, set[Package]]:
|
||||
async def _fetch_outdated(options: Options) -> dict[str, set[Package]]:
|
||||
filters: dict = {}
|
||||
if options.maintainer:
|
||||
filters["maintainer"] = options.maintainer
|
||||
|
||||
async with aiohttp_session() as session:
|
||||
return await repology_client.get_projects(inrepo=repo, outdated="on",
|
||||
count=5_000, session=session)
|
||||
return await repology_client.get_projects(inrepo=options.repology.repo,
|
||||
outdated="on", count=5_000,
|
||||
session=session, **filters)
|
||||
|
||||
|
||||
def _projects_from_json(data: dict[str, list]) -> dict[str, set[Package]]:
|
||||
|
@ -90,13 +95,13 @@ async def _outdated(options: Options) -> None:
|
|||
with dots():
|
||||
data = _projects_from_json(cached_data)
|
||||
else:
|
||||
options.vecho("Fetching data from Repology API", nl=False, err=True)
|
||||
try:
|
||||
options.vecho("Fetching data from Repology API", nl=False, err=True)
|
||||
with dots():
|
||||
data = await _fetch_outdated(options.repology.repo)
|
||||
data = await _fetch_outdated(options)
|
||||
except repology_client.exceptions.EmptyResponse:
|
||||
options.secho("Hmmm, no data returned. Most likely you've made a "
|
||||
"typo in the repository name.", fg="yellow")
|
||||
options.secho("Hmmm, no data returned. Try again with different "
|
||||
"arguments.", fg="yellow")
|
||||
return
|
||||
options.vecho("Caching data", nl=False, err=True)
|
||||
with dots():
|
||||
|
@ -118,5 +123,5 @@ async def _outdated(options: Options) -> None:
|
|||
@click.pass_obj
|
||||
def outdated(options: Options) -> None:
|
||||
""" Find outdated packages. """
|
||||
options.cache_key += b"outdated" + b"\0"
|
||||
options.cache_key.feed("outdated")
|
||||
asyncio.run(_outdated(options))
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
""" Type definitions for the application, implemented as Pydantic models. """
|
||||
|
||||
from dataclasses import field
|
||||
from typing import Any
|
||||
|
||||
from pydantic.dataclasses import dataclass
|
||||
|
||||
|
@ -26,3 +27,53 @@ class BugView:
|
|||
last_change_date: str = field(compare=False)
|
||||
assigned_to: str = field(compare=False)
|
||||
summary: str = field(compare=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CacheKey:
|
||||
"""
|
||||
Cache key constructor.
|
||||
|
||||
>>> key = CacheKey()
|
||||
>>> key.feed(b"bytes")
|
||||
>>> key.feed("string")
|
||||
>>> key.feed(count=42)
|
||||
>>> key.feed(flag=True)
|
||||
>>> bytes(key)
|
||||
b'bytes\\x00string\\x00count:42\\x00flag:1\\x00'
|
||||
>>> key.feed([1, 2, 3])
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: Unsupported type conversion
|
||||
"""
|
||||
|
||||
data: bytes = b""
|
||||
|
||||
@staticmethod
|
||||
def _encode(value: Any) -> bytes:
|
||||
match value:
|
||||
case bytes():
|
||||
return value
|
||||
case str():
|
||||
return value.encode()
|
||||
case bool():
|
||||
return b"1" if value else b"0"
|
||||
case int():
|
||||
return str(value).encode()
|
||||
case _:
|
||||
raise TypeError("Unsupported type conversion")
|
||||
|
||||
def feed(self, *args: Any, **kwargs: Any) -> None:
|
||||
""" Update the key with new data. """
|
||||
if args and kwargs or len(kwargs) > 1:
|
||||
raise ValueError("Too many arguments")
|
||||
|
||||
for value in args:
|
||||
self.data += self._encode(value) + b"\0"
|
||||
|
||||
for key, value in kwargs.items():
|
||||
self.data += self._encode(key) + b":"
|
||||
self.data += self._encode(value) + b"\0"
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.data
|
||||
|
|
|
@ -16,6 +16,7 @@ from typing import Any
|
|||
import aiohttp
|
||||
|
||||
from find_work.constants import PACKAGE, USER_AGENT
|
||||
from find_work.types import CacheKey
|
||||
|
||||
with warnings.catch_warnings():
|
||||
# Disable annoying warning shown to LibreSSL users
|
||||
|
@ -54,16 +55,16 @@ def _get_cache_path(cache_key: bytes) -> Path:
|
|||
return file.with_suffix(".json")
|
||||
|
||||
|
||||
def write_json_cache(data: Any, cache_key: bytes, **kwargs: Any) -> None:
|
||||
def write_json_cache(data: Any, cache_key: CacheKey, **kwargs: Any) -> None:
|
||||
"""
|
||||
Write a JSON cache file in a temporary directory. Keyword arguments are
|
||||
passed to :py:function:`json.dump` as is.
|
||||
|
||||
:param data: data to serialize
|
||||
:param cache_key: byte string to use as a key
|
||||
:param cache_key: cache key object
|
||||
"""
|
||||
|
||||
cache = _get_cache_path(cache_key)
|
||||
cache = _get_cache_path(bytes(cache_key))
|
||||
try:
|
||||
cache.parent.mkdir(parents=True, exist_ok=True)
|
||||
except OSError:
|
||||
|
@ -76,16 +77,16 @@ def write_json_cache(data: Any, cache_key: bytes, **kwargs: Any) -> None:
|
|||
pass
|
||||
|
||||
|
||||
def read_json_cache(cache_key: bytes, **kwargs: Any) -> Any | None:
|
||||
def read_json_cache(cache_key: CacheKey, **kwargs: Any) -> Any | None:
|
||||
"""
|
||||
Read a JSON cache file stored in a temporary directory. Keyword arguments
|
||||
are passed to :py:function:`json.load` as is.
|
||||
|
||||
:param cache_key: byte string to use as a key
|
||||
:param cache_key: cache key object
|
||||
:returns: decoded data or ``None``
|
||||
"""
|
||||
|
||||
cache = _get_cache_path(cache_key)
|
||||
cache = _get_cache_path(bytes(cache_key))
|
||||
if not cache.is_file():
|
||||
return None
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
.Op Fl hV
|
||||
.Nm
|
||||
.Op Fl qI
|
||||
.Op Fl m Ar email
|
||||
.Ar module
|
||||
.Op Ar arg ...
|
||||
.Ar command
|
||||
|
@ -24,9 +25,11 @@ It contains filters to show only packages you might be interested in.
|
|||
.Nm
|
||||
provides global and module-specific options.
|
||||
Global options must precede the module name, and are as follows:
|
||||
.Bl -tag -width tenletters
|
||||
.Bl -tag -width Ds
|
||||
.It Fl h , -help
|
||||
Display usage information and exit immediately.
|
||||
.It Fl m Ar email , Fl -maintainer Ar email
|
||||
Only match packages maintained by the specified person.
|
||||
.It Fl q , -quiet
|
||||
Be less verbose.
|
||||
.It Fl I , -installed
|
||||
|
|
Loading…
Reference in New Issue
Block a user