bubble/utils.py

513 lines
15 KiB
Python

import cgi
import datetime
import hashlib
import re
import socket
import ssl
import time
import urllib.parse as urlparse
from OpenSSL import crypto
UTC = datetime.timezone.utc
GEMTEXT_MARKUP = re.compile(r'^(\s*=>\s*|\* |>\s*|##?#?)')
URI_PATTERN = re.compile(r'(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+(\s+—)?')
NONLINK_URI_PATTERN = re.compile(r'(?<!=>)(?<!=>\s)(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+')
INNER_LINK_PREFIX = ''
class GeminiError (Exception):
def __init__(self, code, msg):
super().__init__(msg)
self.code = code
def unescape_ini_gemtext(src):
unesc = []
for line in src.split('\n'):
line = line.strip()
if line == '|':
line = ''
elif line.startswith('&&&'):
line = '###' + line[3:]
elif line.startswith('&&'):
line = '##' + line[2:]
elif line.startswith('&'):
line = '#' + line[1:]
unesc.append(line)
return '\n'.join(unesc)
def is_valid_name(name):
if len(name) < 2 or len(name) > 30:
return False
return re.match(r'^[\w-]+$', name) != None
def plural_s(i, suffix='s'):
return '' if i == 1 else suffix
def plural(i, word, suffix='s'):
return f'{i} {word}{plural_s(i, suffix)}'
def parse_at_names(text) -> list:
names = set()
pattern = re.compile(r'(\bu/|@)([\w-]+)')
pos = 0
while pos < len(text):
found = pattern.search(text, pos)
if not found: break
names.add(found[2].lower())
pos = found.end()
return list(names)
def parse_likely_commit_hashes(text) -> list:
hashes = set()
pattern = re.compile(r'\b[0-9a-fA-F]{7,}\b')
pos = 0
while pos < len(text):
found = pattern.search(text, pos)
if not found: break
hashes.add(found[0].lower())
pos = found.end()
return list(hashes)
def clean_text(text):
text = strip_invalid(text)
# Clean up the text: ensure that preformatted is closed.
pre = False
for line in text.split('\n'):
if line[:3] == '```':
pre = not pre
if pre:
# Close the preformatted block.
if not text.endswith('\n'):
text += '\n'
text += '```'
return text.rstrip()
def split_paragraphs(text) -> list:
"""Split paragraphs unless the empty lines are found inside a preformatted block."""
paragraphs = []
pre = False
para = []
start = 0
empty_count = 0
lines = text.split('\n')
for i, line in enumerate(lines):
if not pre:
if len(line.strip()) == 0:
empty_count += 1
continue
if empty_count >= 1:
para = '\n'.join(lines[start:i]).strip()
if len(para):
paragraphs.append(para)
start = i
empty_count = 0
if line[:3] == '```':
pre = not pre
last = '\n'.join(lines[start:]).strip()
if len(last):
paragraphs.append(last)
return paragraphs
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix):]
return text
def strip_links(text, placeholder=True):
return URI_PATTERN.sub(r'[\1 link]' if placeholder else '', text)
def parse_nonlink_uris(text) -> list:
links = []
pos = 0
while pos < len(text):
found = NONLINK_URI_PATTERN.search(text, pos)
if not found: break
links.append(found[0])
pos = found.end()
return links
def clean_title(title):
# Strip `=>` and other Gemini syntax.
cleaned = []
pre = False
unlabeled_link_pattern = re.compile(r'(\w+://[^ ]+) — \1')
for line in title.split('\n'):
if line[:3] == '```':
if not pre:
pre_label = line[3:].strip()
if len(pre_label) == 0:
pre_label = 'preformatted'
line = f'[{pre_label}]'
cleaned.append(line)
pre = not pre
continue
if pre:
continue
found = GEMTEXT_MARKUP.match(line)
if found:
line = line[found.end():]
line = unlabeled_link_pattern.sub(r'\1', line)
line = line.replace('\t', ' ')
cleaned.append(line)
title = ' '.join(cleaned).strip()
return title
def clean_description(desc):
# Strip links but keep other formatting.
cleaned = []
pre = False
for line in desc.split('\n'):
line = line.strip()
if line.startswith('```'):
pre = not pre
if not pre:
if line.startswith('=>'):
continue
cleaned.append(line)
return '\n'.join(cleaned)
def clean_tinylog(text):
# Clean it up as per Tinylog specification.
clean = []
pre = False
for line in text.split('\n'):
if line.startswith('```'):
clean.append(line)
pre = not pre
continue
if pre:
clean.append(line)
continue
m = re.search(r'^(##?)[^#]', line) # only level 3 headings allowed
if m:
line = '###' + line[len(m[1]):]
clean.append(line)
return '\n'.join(clean)
def prefix_links(src, prefix):
"""Add a prefix to link labels."""
if not prefix:
return src
lines = []
pattern = re.compile(r'^\s*=>\s*([^ ]+)(\s+(.*))?$')
pre = False
for line in src.split('\n'):
if line.startswith('```'):
pre = not pre
elif not pre:
m = pattern.match(line)
if m:
label = m[3].strip() if m[3] and len(m[3]) else ''
if len(label) == 0:
label = m[1]
# Omit gemini scheme.
if label.startswith('gemini://'):
label = label[9:]
line = f'=> {m[1]} {prefix}{label}'
lines.append(line)
return '\n'.join(lines)
def strip_invalid(src):
return src.replace('\x00', '')
def shorten_text(text, n):
"""Truncate and cut at white or word boundary."""
if len(text) > n:
text = text[:n]
if text[-1] == ' ':
return text.strip() + '...'
m = re.search(r'[\w,.]+$', text)
if m:
return text[:m.start()].rstrip() + '...'
return text.rstrip() + '...'
return text.strip()
def time_delta_text(sec, date_ts, suffix='ago', now='Now',
date_prefix='',
date_fmt='%Y-%m-%d',
date_sep=' · ',
short_date_fmt='%b %d',
tz=None):
if sec < 2:
return now
if sec < 60:
return f'{sec} seconds {suffix}'
mins = int(sec / 60)
if sec < 3600:
return f'{mins} minute{plural_s(mins)} {suffix}'
hours = int(sec / 3600)
if hours <= 24:
return f'{hours} hour{plural_s(hours)} {suffix}'
days = round(sec / 3600 / 24)
dt = datetime.datetime.fromtimestamp(date_ts, UTC)
if tz:
dt = dt.astimezone(tz)
current_year = datetime.datetime.now().year
age = date_prefix + dt.strftime(short_date_fmt if dt.year == current_year else date_fmt)
if days < 14:
return age + f'{date_sep}{days} day{plural_s(days)} {suffix}'
weeks = round(days / 7)
if weeks <= 8:
return age + f'{date_sep}{weeks} week{plural_s(weeks)} {suffix}'
months = round(days / (365 / 12)) # average month length
if months < 12:
return age + f'{date_sep}{months} month{plural_s(months)} {suffix}'
years = round(days / 365)
return age + f'{date_sep}{years} year{plural_s(years)} {suffix}'
def ago_text(ts, suffix='ago', now='Now', tz=None):
sec = max(0, int(time.time()) - ts)
return time_delta_text(sec, ts, suffix, now, tz=tz)
def atom_timestamp(ts):
return datetime.datetime.fromtimestamp(ts, UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
def atom_escaped(text):
return text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').\
replace("'", '&apos;').replace('"', '&quot;')
def gemtext_to_html(src):
out = []
in_list = False
in_quote = False
in_pre = False
for line in src.rstrip().split('\n'):
rend = None
is_bullet = False
is_angle = False
if in_pre:
if line.startswith('```'):
in_pre = False
rend = '</pre>'
else:
rend = atom_escaped(line)
else:
if line.startswith('###'):
rend = f'<h3>{atom_escaped(line[3:].strip())}</h3>'
elif line.startswith('##'):
rend = f'<h2>{atom_escaped(line[2:].strip())}</h2>'
elif line.startswith('#'):
rend = f'<h1>{atom_escaped(line[1:].strip())}</h1>'
elif line.startswith('>'):
is_angle = True
#if not in_quote:
# in_quote = True
# out.append('<blockquote>')
rend = f'{atom_escaped(line[1:])}'
elif line.startswith('*'):
is_bullet = True
#if not in_list:
# in_list = True
# out.append('<ul>')
rend = f'<li>{atom_escaped(line[1:].strip())}</li>'
elif line.startswith('=>'):
link = re.match(r'=>\s*([^\s]+)(\s+.*)?', line)
if not link:
continue
url = link.group(1)
label = link.group(2)
if label is None:
label = url
label = label.strip()
parts = urlparse.urlparse(url)
scheme = parts.scheme if parts.scheme else 'gemini'
# if not parts.netloc:
# # Do something about a relative URL?
link_attr = ''
#if parts.path.endswith('.png') or parts.path.endswith('.jpg') or \
# parts.path.endswith('.webp'):
# # Render as an image.
# rend = f'<img src="{url}" title="{html_encode(label)}">'
#else:
rend = f'<p><a href="{url}">{atom_escaped(label)}</a></p>'
elif line.startswith('```'):
in_pre = True
rend = '<pre>'
else:
rend = f'<p>{atom_escaped(line)}</p>'
if rend is not None:
if not is_bullet and in_list:
out.append('</ul>')
in_list = False
if not is_angle and in_quote:
out.append('</blockquote>')
in_quote = False
if is_angle and not in_quote:
out.append('<blockquote>')
in_quote = True
if is_bullet and not in_list:
out.append('<ul>')
in_list = True
out.append(rend)
return '\n'.join(out)
def is_empty_query(req):
return req.query == None or len(req.query) == 0
def clean_query(req):
if req.query == None: return ''
return clean_text(urlparse.unquote(req.query)).strip()
def nonzero(value):
return 1 if value else 0
def is_zero(value):
return 0 if value else 1
def parse_link_segment_query(req) -> tuple:
if req.query == None:
return '', ''
q = urlparse.unquote(req.query).replace('\n', ' ')
found = re.match(r'^\s*(=>)?\s*([^\s]+)(\s+(.+))?\s*$', q)
if not found:
raise GeminiError(59, 'Invalid link syntax (enter URL followed by label, separated with space)')
seg_url = found.group(2)
if '://' not in seg_url:
seg_url = 'gemini://' + seg_url
parsed = urlparse.urlparse(seg_url)
if not parsed.scheme or not parsed.netloc:
raise GeminiError(59, 'Invalid URL')
if found[4]:
seg_text = clean_title(found[4])
else:
seg_text = ''
return seg_url, seg_text
def form_link(url_label: tuple):
url, label = url_label
if len(url) and len(label):
return url + ' ' + label
if len(label) == 0:
return url
return ''
def absolute_url(base, relative):
# This is straight from Solderpunk's gemini-demo.py.
if "://" not in relative:
# Python's URL tools somehow only work with known schemes?
base = base.replace("gemini://", "http://")
relative = urlparse.urljoin(base, relative)
relative = relative.replace("http://", "gemini://")
# Remove the default port.
port_pos = relative.find(':1965/')
if port_pos >= 10:
relative = relative[:port_pos] + relative[port_pos + 5:]
return relative
def gemini_fetch(url, redirect_count=0, max_data=None) -> tuple:
"""Returns tuple: (mime, mime_params, body)."""
if redirect_count == 5:
raise Exception("Too many redirects")
parts = urlparse.urlparse(url)
if parts.scheme != 'gemini':
raise Exception("Only Gemini URLs allowed")
try:
s = socket.create_connection((parts.hostname, parts.port if parts.port else 1965),
timeout=10)
context = ssl.SSLContext()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
s = context.wrap_socket(s, server_hostname=parts.hostname)
s.sendall((url + '\r\n').encode("UTF-8"))
got_header = False
incoming = bytes()
mime = 'application/octet-stream'
while True:
data = s.recv(4096 if not max_data else max(max_data, 1024))
incoming += data
if got_header and max_data and len(incoming) > max_data:
#return mime, incoming
break
if not got_header:
header_end = incoming.find(b'\r\n')
if header_end > 0:
got_header = True
header = incoming[:header_end].decode("UTF-8").strip()
incoming = incoming[header_end + 2:]
parts = header.split()
status = parts[0]
mime = ''.join(parts[1:])
# Follow redirects.
if status.startswith('3'):
s.close()
del s
return gemini_fetch(absolute_url(url, mime),
redirect_count=redirect_count + 1)
elif status.startswith('2'):
mime, mime_opts = cgi.parse_header(mime)
else:
print(url, 'gemini_fetch error:', header)
return None, None, None
if len(data) == 0:
break
s.close()
if got_header:
if mime.startswith('text/'):
return mime, mime_opts, incoming.decode(mime_opts.get("charset", "UTF-8"))
else:
return mime, mime_opts, incoming
except Exception as er:
print(str(er), '-- failed:', url)
return None, None, None
def certificate_sha256(cert):
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
m = hashlib.sha256()
m.update(der)
return m.hexdigest()
def pubkey_sha256(cert):
pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey())
m = hashlib.sha256()
m.update(pubkey)
return m.hexdigest()