fedmon/fedmon/cli.py

70 lines
1.4 KiB
Python

import csv
import functools
import time
from typing import List
import cachetools.func
import httpx
import hyperlink
import pendulum
import typer
from pydantic import BaseModel, BaseSettings
app = typer.Typer()
class AppConfig(BaseSettings):
base_url: str = "https://analytics.usa.gov/data/live/all-pages-realtime.csv"
poll: int = 10 * 60
class Config:
case_sensitive = False
env_prefix = "fedmon_"
config = AppConfig()
@cachetools.func.ttl_cache(ttl=config.poll)
def get_data(base_url: str):
with httpx.Client() as client:
resp = client.get(base_url)
resp.raise_for_status()
return tuple(resp.content.decode().splitlines())
class FedSite(BaseModel):
page: str
page_title: str
active_visitors: int
@property
def url(self):
return hyperlink.parse(self.page)
class Config:
frozen = True
@functools.lru_cache()
def parse_data(data: List[str]):
return tuple(FedSite.parse_obj(line) for line in csv.DictReader(data))
@functools.lru_cache()
def analyze_data(data: List[FedSite]):
return data[0]
@app.command()
def main():
prev = None
while True:
data = get_data(config.base_url)
data = parse_data(data)
summary = analyze_data(data)
if summary != prev:
typer.echo(f"{pendulum.now()}: {summary}")
prev = summary
time.sleep(config.poll)