import csv import functools import itertools import subprocess import time from datetime import datetime from enum import Enum from io import StringIO from typing import List, Optional, Sequence, Tuple import cachetools.func import httpx import typer from pydantic import BaseModel, BaseSettings, Field from tabulate import tabulate as tab from tabulate import tabulate_formats from fedmon import appname app = typer.Typer() tabulate_formats = set(tabulate_formats) AllowedFormat = Enum( "AllowedFormat", {"json": "json", "csv": "csv", **{v: v for v in tabulate_formats}} ) class AppConfig(BaseSettings): base_url: str = "https://analytics.usa.gov/data/live/all-pages-realtime.csv" poll: int = 10 * 60 class Config: case_sensitive = False env_prefix = "fedmon_" config = AppConfig() @cachetools.func.ttl_cache(ttl=config.poll) def get_data(base_url: str) -> Tuple[str]: with httpx.Client() as client: resp = client.get(base_url) resp.raise_for_status() return tuple(resp.content.decode().splitlines()) class FedSite(BaseModel): datetime: str = Field(default_factory=lambda: datetime.now().isoformat()) active_visitors: Optional[int] page_title: str page: str class Config: frozen = True class Summary(BaseModel): sites: Sequence[FedSite] @functools.lru_cache() def parse_data(data: Tuple[str]) -> Tuple[FedSite]: return tuple(FedSite.parse_obj(line) for line in csv.DictReader(data)) @functools.lru_cache() def analyze_data(data: Tuple[FedSite], to_ignore: List[str]) -> Summary: if to_ignore is None: to_ignore = [] usps = {x for x in data if "tools.usps.com" in x.page} usps_visitors = max(x.active_visitors for x in usps) hot = tuple( sorted( ( x for x in data if x.active_visitors >= usps_visitors and x not in usps and not any(i in x.page for i in to_ignore) ), key=lambda x: x.active_visitors, ) ) return Summary(sites=hot) def format_response(summary: Summary, format: AllowedFormat, count: int) -> str: if format == AllowedFormat.json: return summary.json(indent=4) elif format == AllowedFormat.csv: data = StringIO() to_write = summary.dict()["sites"] headers = FedSite.schema()["properties"].keys() writer = csv.DictWriter(data, fieldnames=headers) if count == 0: writer.writeheader() writer.writerows(to_write) return data.getvalue() else: data = summary.dict()["sites"] for site in data: site["active_visitors"] = str(site["active_visitors"]).rjust(19) site["page_title"] = site["page_title"].rjust(25) site["page"] = site["page"].rjust(26) if count == 0: return tab(data, tablefmt=format.value, headers="keys") else: return tab(data, tablefmt=format.value) def notify(summary: Summary, counter) -> None: try: subprocess.check_call( [ "notify-send", appname, format_response(summary, AllowedFormat.json, next(counter)), ] ) except subprocess.CalledProcessError as e: typer.echo(e, err=True) @app.command() def main( format: AllowedFormat = typer.Option( default="json", show_choices=True, help="The format to display results in." ), send_notification: bool = typer.Option( default=False, help="If set, use `notify-send` to raise a desktop notification, if available.", ), ignore: Optional[List[str]] = typer.Option(None), ): prev = None counter = itertools.count() while True: try: data = get_data(config.base_url) data = parse_data(data) summary = analyze_data(data, to_ignore=ignore) if summary != prev: if summary.sites: c = next(counter) typer.echo(format_response(summary, format, c)) if send_notification: notify(summary, c) prev = summary except (httpx.ConnectError, httpx.ReadTimeout) as e: typer.echo(e, err=True) finally: time.sleep(config.poll)