fedmon/fedmon/cli.py

154 lines
4.3 KiB
Python
Raw Permalink Normal View History

2022-01-21 21:00:04 +00:00
import csv
import functools
import itertools
import subprocess
2022-01-21 21:00:04 +00:00
import time
from datetime import datetime
from enum import Enum
from io import StringIO
from typing import List, Optional, Sequence, Tuple
2022-01-21 21:00:04 +00:00
import cachetools.func
import httpx
import typer
from pydantic import BaseModel, BaseSettings, Field
from tabulate import tabulate as tab
from tabulate import tabulate_formats
2022-01-21 21:00:04 +00:00
from fedmon import appname
2022-01-21 21:00:04 +00:00
app = typer.Typer()
tabulate_formats = set(tabulate_formats)
AllowedFormat = Enum(
"AllowedFormat", {"json": "json", "csv": "csv", **{v: v for v in tabulate_formats}}
)
2022-01-21 21:00:04 +00:00
class AppConfig(BaseSettings):
base_url: str = "https://analytics.usa.gov/data/live/all-pages-realtime.csv"
poll: int = 10 * 60
class Config:
case_sensitive = False
env_prefix = "fedmon_"
config = AppConfig()
@cachetools.func.ttl_cache(ttl=config.poll)
def get_data(base_url: str) -> Tuple[str]:
2022-01-21 21:00:04 +00:00
with httpx.Client() as client:
resp = client.get(base_url)
resp.raise_for_status()
return tuple(resp.content.decode().splitlines())
class FedSite(BaseModel):
datetime: str = Field(default_factory=lambda: datetime.now().isoformat())
2022-05-26 00:57:15 +00:00
active_visitors: Optional[int]
page_title: str
page: str
2022-01-21 21:00:04 +00:00
class Config:
frozen = True
class Summary(BaseModel):
2022-01-22 13:14:23 +00:00
sites: Sequence[FedSite]
2022-01-21 21:00:04 +00:00
@functools.lru_cache()
def parse_data(data: Tuple[str]) -> Tuple[FedSite]:
2022-01-21 21:00:04 +00:00
return tuple(FedSite.parse_obj(line) for line in csv.DictReader(data))
@functools.lru_cache()
def analyze_data(data: Tuple[FedSite], to_ignore: List[str]) -> Summary:
if to_ignore is None:
to_ignore = []
usps = {x for x in data if "tools.usps.com" in x.page}
usps_visitors = max(x.active_visitors for x in usps)
hot = tuple(
sorted(
(
x
for x in data
if x.active_visitors >= usps_visitors
and x not in usps
and not any(i in x.page for i in to_ignore)
),
key=lambda x: x.active_visitors,
)
)
return Summary(sites=hot)
def format_response(summary: Summary, format: AllowedFormat, count: int) -> str:
if format == AllowedFormat.json:
return summary.json(indent=4)
elif format == AllowedFormat.csv:
data = StringIO()
to_write = summary.dict()["sites"]
headers = FedSite.schema()["properties"].keys()
writer = csv.DictWriter(data, fieldnames=headers)
if count == 0:
writer.writeheader()
writer.writerows(to_write)
return data.getvalue()
else:
data = summary.dict()["sites"]
for site in data:
2022-01-22 03:26:51 +00:00
site["active_visitors"] = str(site["active_visitors"]).rjust(19)
site["page_title"] = site["page_title"].rjust(25)
site["page"] = site["page"].rjust(26)
if count == 0:
return tab(data, tablefmt=format.value, headers="keys")
else:
return tab(data, tablefmt=format.value)
2022-01-21 21:00:04 +00:00
def notify(summary: Summary, counter) -> None:
try:
subprocess.check_call(
[
"notify-send",
appname,
format_response(summary, AllowedFormat.json, next(counter)),
]
)
except subprocess.CalledProcessError as e:
typer.echo(e, err=True)
2022-01-21 21:00:04 +00:00
@app.command()
def main(
format: AllowedFormat = typer.Option(
default="json", show_choices=True, help="The format to display results in."
),
send_notification: bool = typer.Option(
default=False,
help="If set, use `notify-send` to raise a desktop notification, if available.",
),
ignore: Optional[List[str]] = typer.Option(None),
):
2022-01-21 21:00:04 +00:00
prev = None
counter = itertools.count()
2022-01-21 21:00:04 +00:00
while True:
2022-01-22 20:32:30 +00:00
try:
data = get_data(config.base_url)
data = parse_data(data)
summary = analyze_data(data, to_ignore=ignore)
2022-01-22 20:32:30 +00:00
if summary != prev:
if summary.sites:
c = next(counter)
typer.echo(format_response(summary, format, c))
2022-01-22 20:32:30 +00:00
if send_notification:
notify(summary, c)
2022-01-22 20:32:30 +00:00
prev = summary
except (httpx.ConnectError, httpx.ReadTimeout) as e:
2022-01-22 20:32:30 +00:00
typer.echo(e, err=True)
finally:
time.sleep(config.poll)