fedmon/fedmon/cli.py

138 lines
3.9 KiB
Python

import csv
import functools
import itertools
import subprocess
import time
from datetime import datetime
from enum import Enum
from io import StringIO
from typing import Sequence, Tuple
import cachetools.func
import httpx
import typer
from pydantic import BaseModel, BaseSettings, Field
from tabulate import tabulate as tab
from tabulate import tabulate_formats
from fedmon import appname
app = typer.Typer()
tabulate_formats = set(tabulate_formats)
AllowedFormat = Enum(
"AllowedFormat", {"json": "json", "csv": "csv", **{v: v for v in tabulate_formats}}
)
class AppConfig(BaseSettings):
base_url: str = "https://analytics.usa.gov/data/live/all-pages-realtime.csv"
poll: int = 10 * 60
class Config:
case_sensitive = False
env_prefix = "fedmon_"
config = AppConfig()
@cachetools.func.ttl_cache(ttl=config.poll)
def get_data(base_url: str) -> Tuple[str]:
with httpx.Client() as client:
resp = client.get(base_url)
resp.raise_for_status()
return tuple(resp.content.decode().splitlines())
class FedSite(BaseModel):
datetime: str = Field(default_factory=lambda: datetime.now().isoformat())
active_visitors: int
page_title: str
page: str
class Config:
frozen = True
class Summary(BaseModel):
sites: Sequence[FedSite]
@functools.lru_cache()
def parse_data(data: Tuple[str]) -> Tuple[FedSite]:
return tuple(FedSite.parse_obj(line) for line in csv.DictReader(data))
@functools.lru_cache()
def analyze_data(data: Tuple[FedSite]) -> Summary:
usps = {x for x in data if "tools.usps.com" in x.page}
usps_visitors = max(x.active_visitors for x in usps)
hot = tuple(
sorted(
(x for x in data if x.active_visitors >= usps_visitors and x not in usps),
key=lambda x: x.active_visitors,
)
)
return Summary(sites=hot)
def format_response(summary: Summary, format: AllowedFormat, count: int) -> str:
if format == AllowedFormat.json:
return summary.json(indent=4)
elif format == AllowedFormat.csv:
data = StringIO()
to_write = summary.dict()["sites"]
headers = FedSite.schema()["properties"].keys()
writer = csv.DictWriter(data, fieldnames=headers)
if count == 0:
writer.writeheader()
writer.writerows(to_write)
return data.getvalue()
else:
data = summary.dict()["sites"]
for site in data:
site["active_visitors"] = str(site["active_visitors"]).rjust(19)
site["page_title"] = site["page_title"].rjust(25)
site["page"] = site["page"].rjust(26)
if count == 0:
return tab(data, tablefmt=format.value, headers="keys")
else:
return tab(data, tablefmt=format.value)
@app.command()
def main(
format: AllowedFormat = typer.Option(
default="json", show_choices=True, help="The format to display results in."
),
send_notification: bool = typer.Option(
default=False,
help="If set, use `notify-send` to raise a desktop notification, if available.",
),
):
prev = None
counter = itertools.count()
while True:
data = get_data(config.base_url)
data = parse_data(data)
summary = analyze_data(data)
if summary != prev:
if summary:
typer.echo(format_response(summary, format, next(counter)))
if send_notification:
try:
subprocess.check_call(
[
"notify-send",
appname,
format_response(
summary, AllowedFormat.json, next(counter)
),
]
)
except subprocess.CalledProcessError as e:
typer.echo(e, err=True)
prev = summary
time.sleep(config.poll)