fedmon/fedmon/cli.py

114 lines
3.1 KiB
Python

import csv
import functools
import itertools
import time
from datetime import datetime
from enum import Enum
from io import StringIO
from typing import Tuple
import cachetools.func
import httpx
import typer
from pydantic import BaseModel, BaseSettings, Field
from tabulate import tabulate as tab
from tabulate import tabulate_formats
app = typer.Typer()
tabulate_formats = set(tabulate_formats)
AllowedFormat = Enum(
"AllowedFormat", {"json": "json", "csv": "csv", **{v: v for v in tabulate_formats}}
)
class AppConfig(BaseSettings):
base_url: str = "https://analytics.usa.gov/data/live/all-pages-realtime.csv"
poll: int = 10 * 60
class Config:
case_sensitive = False
env_prefix = "fedmon_"
config = AppConfig()
@cachetools.func.ttl_cache(ttl=config.poll)
def get_data(base_url: str) -> Tuple[str]:
with httpx.Client() as client:
resp = client.get(base_url)
resp.raise_for_status()
return tuple(resp.content.decode().splitlines())
class FedSite(BaseModel):
datetime: str = Field(default_factory=lambda: datetime.now().isoformat())
active_visitors: int
page_title: str
page: str
class Config:
frozen = True
class Summary(BaseModel):
sites: Tuple[FedSite]
@functools.lru_cache()
def parse_data(data: Tuple[str]) -> Tuple[FedSite]:
return tuple(FedSite.parse_obj(line) for line in csv.DictReader(data))
@functools.lru_cache()
def analyze_data(data: Tuple[FedSite]) -> Summary:
usps = {x for x in data if "tools.usps.com" in x.page}
usps_visitors = max(x.active_visitors for x in usps)
hot = tuple(
sorted(
(x for x in data if x.active_visitors >= usps_visitors and x not in usps),
key=lambda x: x.active_visitors,
)
)
return Summary(sites=hot)
def format_response(summary: Summary, format: AllowedFormat, count: int) -> str:
if format == AllowedFormat.json:
return summary.json(indent=4)
elif format == AllowedFormat.csv:
data = StringIO()
to_write = summary.dict()["sites"]
headers = FedSite.schema()["properties"].keys()
writer = csv.DictWriter(data, fieldnames=headers)
if count == 0:
writer.writeheader()
writer.writerows(to_write)
return data.getvalue()
else:
data = summary.dict()["sites"]
for site in data:
site["active_visitors"] = site["active_visitors"].rjust(19)
site["page_title"] = site["page_title"].rjust(25)
site["page"] = site["page"].rjust(26)
if count == 0:
return tab(data, tablefmt=format.value, headers="keys")
else:
return tab(data, tablefmt=format.value)
@app.command()
def main(format: AllowedFormat = typer.Option(default="json", show_choices=True)):
prev = None
counter = itertools.count()
while True:
data = get_data(config.base_url)
data = parse_data(data)
summary = analyze_data(data)
if summary != prev:
if summary:
typer.echo(format_response(summary, format, next(counter)))
prev = summary
time.sleep(config.poll)