Add CI checks, closes #6
This commit is contained in:
parent
0e8d5a4379
commit
76b001be8f
89
fprss.py
89
fprss.py
|
@ -1,24 +1,33 @@
|
|||
#!/usr/bin/env python3
|
||||
from collections import namedtuple
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from requests.exceptions import HTTPError
|
||||
from typing import Iterator, Optional
|
||||
from urllib.parse import urlparse
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterator, NamedTuple, Optional, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
import xmltodict
|
||||
import xmltodict # type: ignore
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
RSS_DATE_FORMAT = '%a, %d %b %Y %T %z'
|
||||
SANITIZE_REGEX = re.compile(r'(?:[\002\017\021\026\035\036\037]|\003(?:[0-9]{1,2}(?:,[0-9]{1,2})?)?|\004(?:[0-9A-F]{,6})?)', re.IGNORECASE)
|
||||
URL_REGEX = re.compile(r'https?://[A-Za-z0-9-._~:/?#[\]%@!$&\'()*+,;=]+', re.IGNORECASE)
|
||||
SANITIZE_REGEX = re.compile(
|
||||
r'(?:[\002\017\021\026\035\036\037]'
|
||||
r'|\003(?:[0-9]{1,2}(?:,[0-9]{1,2})?)?'
|
||||
r'|\004(?:[0-9A-F]{,6})?)',
|
||||
re.IGNORECASE
|
||||
)
|
||||
URL_REGEX = re.compile(
|
||||
r'https?://[A-Za-z0-9-._~:/?#[\]%@!$&\'()*+,;=]+',
|
||||
re.IGNORECASE
|
||||
)
|
||||
LOG_FILE = Path('~archangelic/irc/log').expanduser()
|
||||
CACHE_FILE = Path(__file__).absolute().parent / 'cache.json'
|
||||
OUTPUT_PATH = Path('~/public_html/fridaypostcard.xml').expanduser()
|
||||
|
||||
IGNORE_USERNAMES = {'quote_bot'}
|
||||
# We cannot safely assume we will know all image extensions, but there are some obvious and common extensions that we can ignore.
|
||||
# We cannot safely assume we will know all image extensions,
|
||||
# but there are some obvious and common extensions that we can ignore.
|
||||
IGNORE_EXTENSIONS = {'.html', '.htm', '.xml', '.json'}
|
||||
KNOWN_MIME_TYPES = {
|
||||
'.jpg': 'image/jpeg',
|
||||
|
@ -31,10 +40,17 @@ KNOWN_MIME_TYPES = {
|
|||
'.webp': 'image/webp',
|
||||
}
|
||||
|
||||
Postcard = namedtuple('Postcard', ['timestamp', 'username', 'url', 'message', 'mime_type', 'length'])
|
||||
|
||||
# MIME type and length cache to avoid making hundreds of requests each time
|
||||
cache = {}
|
||||
cache: Dict[str, Tuple[str, str]] = {}
|
||||
|
||||
|
||||
class Postcard(NamedTuple):
|
||||
timestamp: int
|
||||
username: str
|
||||
url: str
|
||||
message: str
|
||||
mime_type: str
|
||||
length: str
|
||||
|
||||
|
||||
def get_logs() -> Iterator[str]:
|
||||
|
@ -52,25 +68,28 @@ def parse_log(log: str) -> Optional[Postcard]:
|
|||
timestamp, username, message = log.split("\t", 3)
|
||||
|
||||
if username in IGNORE_USERNAMES:
|
||||
return
|
||||
return None
|
||||
|
||||
message = sanitize_message(message)
|
||||
match = URL_REGEX.search(message)
|
||||
# Ignore messages with invalid URLs
|
||||
if not match:
|
||||
return
|
||||
return None
|
||||
url_str = match.group()
|
||||
|
||||
message = message.replace(url_str, '').replace('#fridaypostcard', '').strip()
|
||||
message = message \
|
||||
.replace(url_str, '') \
|
||||
.replace('#fridaypostcard', '') \
|
||||
.strip()
|
||||
|
||||
try:
|
||||
url = urlparse(url_str)
|
||||
except:
|
||||
return
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
extension = Path(url.path).suffix
|
||||
if extension in IGNORE_EXTENSIONS:
|
||||
return
|
||||
return None
|
||||
|
||||
# Force-replace https with http to ensure PSP compatibility
|
||||
url_str = url_str.replace('https', 'http')
|
||||
|
@ -81,24 +100,32 @@ def parse_log(log: str) -> Optional[Postcard]:
|
|||
if extension not in KNOWN_MIME_TYPES:
|
||||
url_str += '.jpg'
|
||||
|
||||
mime_type, length = cache.get(url_str, ['', '0'])
|
||||
mime_type, length = cache.get(url_str, ('', '0'))
|
||||
|
||||
if not mime_type:
|
||||
try:
|
||||
with requests.get(url_str, allow_redirects=True, stream=True, timeout=5) as resp:
|
||||
with requests.get(
|
||||
url_str,
|
||||
allow_redirects=True,
|
||||
stream=True,
|
||||
timeout=5) as resp:
|
||||
resp.raise_for_status()
|
||||
length = resp.headers.get('Content-Length', '0')
|
||||
mime_type = resp.headers.get('Content-Type', KNOWN_MIME_TYPES.get(extension, ''))
|
||||
mime_type = resp.headers.get(
|
||||
'Content-Type',
|
||||
KNOWN_MIME_TYPES.get(extension, '')
|
||||
)
|
||||
except HTTPError as e:
|
||||
# Dirty hack to avoid repeating lots of requests for images that are now broken.
|
||||
# Dirty hack to avoid repeating lots of requests
|
||||
# for images that are now broken.
|
||||
if e.response.status_code >= 400 and e.response.status_code <= 500:
|
||||
mime_type = KNOWN_MIME_TYPES.get(extension, 'image/x-error')
|
||||
length = '0'
|
||||
cache[url_str] = [mime_type, length]
|
||||
return
|
||||
cache[url_str] = (mime_type, length)
|
||||
return None
|
||||
except Exception:
|
||||
return
|
||||
cache[url_str] = [mime_type, length]
|
||||
return None
|
||||
cache[url_str] = (mime_type, length)
|
||||
|
||||
return Postcard(
|
||||
timestamp=int(timestamp),
|
||||
|
@ -177,7 +204,13 @@ def main():
|
|||
"item": list({
|
||||
# Unique by GUID
|
||||
item["guid"]: item
|
||||
for item in map(to_item, filter(None, map(parse_log, get_logs())))
|
||||
for item in map(
|
||||
to_item,
|
||||
filter(
|
||||
None,
|
||||
map(parse_log, get_logs())
|
||||
)
|
||||
)
|
||||
}.values()),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user