cli/pgo: new command: stabilization
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Anna “CyberTailor” 2024-01-10 13:44:36 +05:00
parent 2e0b687c3d
commit c53b9b0a2a
Signed by: CyberTaIlor
GPG Key ID: E7B76EDC50864BB1
6 changed files with 106 additions and 7 deletions

View File

@ -56,8 +56,8 @@ Adding a new module
1. Create a new Python file in the :file:`find_work/cli` directory.
2. Implement all logic here.
3. Register commands in the :file:`gentle/__main__.py` file.
4. Update the manpage.
3. Register commands in the :file:`find_work/__main__.py` file.
4. Update the manpage and release notes.
Feel free to copy/paste from existing sources, I do the same.

View File

@ -14,6 +14,9 @@ Release Notes
* **New:** Discover outdated packages in the Gentoo repository (command: ``pgo
outdated``).
* **New:** Discover stabilization candidates in the Gentoo repository (command:
``pgo stabilization``).
* **New:** Filter results by maintainer.
* Dependencies introduced:

View File

@ -90,5 +90,6 @@ def repology(options: Options, repo: str) -> None:
bugzilla.add_command(find_work.cli.bugzilla.outdated, aliases=["out", "o"])
pgo.add_command(find_work.cli.pgo.outdated, aliases=["out", "o"])
pgo.add_command(find_work.cli.pgo.stabilization, aliases=["stab", "s"])
repology.add_command(find_work.cli.repology.outdated, aliases=["out", "o"])

View File

@ -9,10 +9,11 @@ from collections.abc import Iterable
import click
import gentoopm
from sortedcontainers import SortedSet
from sortedcontainers import SortedDict, SortedSet
from tabulate import tabulate
from find_work.cli import Message, Options, ProgressDots
from find_work.constants import PGO_API_URL
from find_work.constants import PGO_BASE_URL, PGO_API_URL
from find_work.types import VersionBump
from find_work.utils import (
aiohttp_session,
@ -34,7 +35,7 @@ async def _fetch_outdated() -> list[dict]:
async with session.post(PGO_API_URL, json={"query": query},
raise_for_status=True) as response:
data = await response.json()
return data.get("data", {}).get("outdatedPackages", [])
return data.get("data", {}).get("outdatedPackages", [])
def _collect_version_bumps(data: Iterable[dict],
@ -55,6 +56,9 @@ def _collect_version_bumps(data: Iterable[dict],
async def _outdated(options: Options) -> None:
if options.maintainer:
raise NotImplementedError("-m option is not implemented for this command")
dots = ProgressDots(options.verbose)
options.say(Message.CACHE_LOAD)
@ -83,9 +87,95 @@ async def _outdated(options: Options) -> None:
options.say(Message.NO_WORK)
async def _fetch_maintainer_stabilization(maintainer: str) -> list[dict]:
url = f"{PGO_BASE_URL}/maintainer/{maintainer}/stabilization.json"
async with aiohttp_session() as session:
async with session.get(url, raise_for_status=True) as response:
data = await response.json()
# bring data to a common structure
return [
{
"Atom": f"{item['category']}/{item['package']}",
"Version": item["version"],
"Message": item["message"],
}
for item in data
]
async def _fetch_all_stabilization() -> list[dict]:
query = """query {
pkgCheckResults(Class: "StableRequest") {
Atom
Version
Message
}
}"""
async with aiohttp_session() as session:
async with session.post(PGO_API_URL, json={"query": query},
raise_for_status=True) as response:
data = await response.json()
return data.get("data", {}).get("pkgCheckResults", [])
async def _fetch_stabilization(options: Options) -> list[dict]:
if options.maintainer:
return await _fetch_maintainer_stabilization(options.maintainer)
return await _fetch_all_stabilization()
def _collect_stable_candidates(data: list[dict],
options: Options) -> SortedDict[str, str]:
if options.only_installed:
pm = gentoopm.get_package_manager()
result: SortedDict[str, str] = SortedDict()
for item in data:
if options.only_installed and item["Atom"] not in pm.installed:
continue
key = "-".join([item["Atom"], item["Version"]])
result[key] = item["Message"]
return result
async def _stabilization(options: Options) -> None:
dots = ProgressDots(options.verbose)
options.say(Message.CACHE_LOAD)
with dots():
data = read_json_cache(options.cache_key)
if data is None:
options.vecho("Fetching data from Gentoo Packages API",
nl=False, err=True)
with dots():
data = await _fetch_stabilization(options)
if len(data) == 0:
options.say(Message.EMPTY_RESPONSE)
return
options.say(Message.CACHE_WRITE)
with dots():
write_json_cache(data, options.cache_key)
candidates = _collect_stable_candidates(data, options)
if len(candidates) != 0:
options.echo(tabulate(candidates.items(), tablefmt="plain"))
else:
options.say(Message.NO_WORK)
@click.command()
@click.pass_obj
def outdated(options: Options) -> None:
""" Find outdated packages. """
options.cache_key.feed("outdated")
asyncio.run(_outdated(options))
@click.command()
@click.pass_obj
def stabilization(options: Options) -> None:
""" Find outdated packages. """
options.cache_key.feed("stabilization")
asyncio.run(_stabilization(options))

View File

@ -19,5 +19,8 @@ USER_AGENT = f"Mozilla/5.0 (compatible; {PACKAGE}/{VERSION}; +{HOMEPAGE})"
# Gentoo Bugzilla location.
BUGZILLA_URL = "https://bugs.gentoo.org"
# Gentoo Packages API locations.
PGO_API_URL = "https://packages.gentoo.org/api/graphql/"
# Gentoo Packages location.
PGO_BASE_URL = "https://packages.gentoo.org"
# Gentoo Packages API location.
PGO_API_URL = f"{PGO_BASE_URL}/api/graphql/"

View File

@ -82,6 +82,8 @@ can be one of the following:
.Bl -tag -width Ds
.It Ic outdated Pq alias: Ic out , Ic o
Find outdated packages.
.It Ic stabilization Pq alias: Ic stab , Ic s
Find stabilization candidates.
.El
.
.It Xo