bubble/utils.py

513 lines
15 KiB
Python
Raw Permalink Normal View History

import cgi
import datetime
import hashlib
import re
import socket
import ssl
import time
import urllib.parse as urlparse
from OpenSSL import crypto
UTC = datetime.timezone.utc
GEMTEXT_MARKUP = re.compile(r'^(\s*=>\s*|\* |>\s*|##?#?)')
URI_PATTERN = re.compile(r'(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+(\s+—)?')
NONLINK_URI_PATTERN = re.compile(r'(?<!=>)(?<!=>\s)(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+')
INNER_LINK_PREFIX = ''
class GeminiError (Exception):
def __init__(self, code, msg):
super().__init__(msg)
self.code = code
def unescape_ini_gemtext(src):
unesc = []
for line in src.split('\n'):
line = line.strip()
if line == '|':
line = ''
elif line.startswith('&&&'):
line = '###' + line[3:]
elif line.startswith('&&'):
line = '##' + line[2:]
elif line.startswith('&'):
line = '#' + line[1:]
unesc.append(line)
return '\n'.join(unesc)
def is_valid_name(name):
if len(name) < 2 or len(name) > 30:
return False
return re.match(r'^[\w-]+$', name) != None
2023-05-21 11:54:27 +00:00
def plural_s(i, suffix='s'):
return '' if i == 1 else suffix
def plural(i, word, suffix='s'):
return f'{i} {word}{plural_s(i, suffix)}'
def parse_at_names(text) -> list:
names = set()
pattern = re.compile(r'(\bu/|@)([\w-]+)')
pos = 0
while pos < len(text):
found = pattern.search(text, pos)
if not found: break
names.add(found[2].lower())
pos = found.end()
return list(names)
def parse_likely_commit_hashes(text) -> list:
hashes = set()
pattern = re.compile(r'\b[0-9a-fA-F]{7,}\b')
pos = 0
while pos < len(text):
found = pattern.search(text, pos)
if not found: break
hashes.add(found[0].lower())
pos = found.end()
return list(hashes)
def clean_text(text):
text = strip_invalid(text)
# Clean up the text: ensure that preformatted is closed.
pre = False
for line in text.split('\n'):
if line[:3] == '```':
pre = not pre
if pre:
# Close the preformatted block.
if not text.endswith('\n'):
text += '\n'
text += '```'
return text.rstrip()
def split_paragraphs(text) -> list:
"""Split paragraphs unless the empty lines are found inside a preformatted block."""
paragraphs = []
pre = False
para = []
start = 0
empty_count = 0
lines = text.split('\n')
for i, line in enumerate(lines):
if not pre:
if len(line.strip()) == 0:
empty_count += 1
continue
if empty_count >= 1:
para = '\n'.join(lines[start:i]).strip()
if len(para):
paragraphs.append(para)
start = i
empty_count = 0
if line[:3] == '```':
pre = not pre
last = '\n'.join(lines[start:]).strip()
if len(last):
paragraphs.append(last)
return paragraphs
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix):]
return text
def strip_links(text, placeholder=True):
return URI_PATTERN.sub(r'[\1 link]' if placeholder else '', text)
def parse_nonlink_uris(text) -> list:
links = []
pos = 0
while pos < len(text):
found = NONLINK_URI_PATTERN.search(text, pos)
if not found: break
links.append(found[0])
pos = found.end()
return links
def clean_title(title):
# Strip `=>` and other Gemini syntax.
cleaned = []
pre = False
unlabeled_link_pattern = re.compile(r'(\w+://[^ ]+) — \1')
for line in title.split('\n'):
if line[:3] == '```':
if not pre:
pre_label = line[3:].strip()
if len(pre_label) == 0:
pre_label = 'preformatted'
line = f'[{pre_label}]'
cleaned.append(line)
pre = not pre
continue
if pre:
continue
found = GEMTEXT_MARKUP.match(line)
if found:
line = line[found.end():]
line = unlabeled_link_pattern.sub(r'\1', line)
line = line.replace('\t', ' ')
cleaned.append(line)
title = ' '.join(cleaned).strip()
return title
def clean_description(desc):
# Strip links but keep other formatting.
cleaned = []
pre = False
for line in desc.split('\n'):
line = line.strip()
if line.startswith('```'):
pre = not pre
if not pre:
if line.startswith('=>'):
continue
cleaned.append(line)
return '\n'.join(cleaned)
2023-05-09 19:48:35 +00:00
def clean_tinylog(text):
# Clean it up as per Tinylog specification.
clean = []
pre = False
for line in text.split('\n'):
if line.startswith('```'):
clean.append(line)
pre = not pre
continue
if pre:
clean.append(line)
continue
m = re.search(r'^(##?)[^#]', line) # only level 3 headings allowed
if m:
line = '###' + line[len(m[1]):]
2023-05-09 19:48:35 +00:00
clean.append(line)
return '\n'.join(clean)
def prefix_links(src, prefix):
"""Add a prefix to link labels."""
if not prefix:
return src
lines = []
pattern = re.compile(r'^\s*=>\s*([^ ]+)(\s+(.*))?$')
pre = False
for line in src.split('\n'):
if line.startswith('```'):
pre = not pre
elif not pre:
m = pattern.match(line)
if m:
label = m[3].strip() if m[3] and len(m[3]) else ''
if len(label) == 0:
label = m[1]
# Omit gemini scheme.
if label.startswith('gemini://'):
label = label[9:]
line = f'=> {m[1]} {prefix}{label}'
lines.append(line)
return '\n'.join(lines)
def strip_invalid(src):
return src.replace('\x00', '')
def shorten_text(text, n):
"""Truncate and cut at white or word boundary."""
if len(text) > n:
text = text[:n]
if text[-1] == ' ':
return text.strip() + '...'
m = re.search(r'[\w,.]+$', text)
if m:
2023-05-09 19:48:35 +00:00
return text[:m.start()].rstrip() + '...'
return text.rstrip() + '...'
return text.strip()
2023-05-09 18:33:07 +00:00
def time_delta_text(sec, date_ts, suffix='ago', now='Now',
date_prefix='',
date_fmt='%Y-%m-%d',
date_sep=' · ',
short_date_fmt='%b %d',
tz=None):
if sec < 2:
2023-05-09 18:33:07 +00:00
return now
if sec < 60:
2023-05-09 18:33:07 +00:00
return f'{sec} seconds {suffix}'
mins = int(sec / 60)
if sec < 3600:
2023-05-09 18:33:07 +00:00
return f'{mins} minute{plural_s(mins)} {suffix}'
hours = int(sec / 3600)
if hours <= 24:
2023-05-09 18:33:07 +00:00
return f'{hours} hour{plural_s(hours)} {suffix}'
days = round(sec / 3600 / 24)
dt = datetime.datetime.fromtimestamp(date_ts, UTC)
if tz:
dt = dt.astimezone(tz)
current_year = datetime.datetime.now().year
age = date_prefix + dt.strftime(short_date_fmt if dt.year == current_year else date_fmt)
2023-05-09 18:33:07 +00:00
if days < 14:
return age + f'{date_sep}{days} day{plural_s(days)} {suffix}'
weeks = round(days / 7)
2023-05-09 18:33:07 +00:00
if weeks <= 8:
return age + f'{date_sep}{weeks} week{plural_s(weeks)} {suffix}'
months = round(days / (365 / 12)) # average month length
2023-05-09 18:33:07 +00:00
if months < 12:
return age + f'{date_sep}{months} month{plural_s(months)} {suffix}'
years = round(days / 365)
2023-05-09 18:33:07 +00:00
return age + f'{date_sep}{years} year{plural_s(years)} {suffix}'
def ago_text(ts, suffix='ago', now='Now', tz=None):
2023-05-09 18:33:07 +00:00
sec = max(0, int(time.time()) - ts)
return time_delta_text(sec, ts, suffix, now, tz=tz)
def atom_timestamp(ts):
return datetime.datetime.fromtimestamp(ts, UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
def atom_escaped(text):
return text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').\
replace("'", '&apos;').replace('"', '&quot;')
def gemtext_to_html(src):
out = []
in_list = False
in_quote = False
in_pre = False
for line in src.rstrip().split('\n'):
rend = None
is_bullet = False
is_angle = False
if in_pre:
if line.startswith('```'):
in_pre = False
rend = '</pre>'
else:
rend = atom_escaped(line)
else:
if line.startswith('###'):
rend = f'<h3>{atom_escaped(line[3:].strip())}</h3>'
elif line.startswith('##'):
rend = f'<h2>{atom_escaped(line[2:].strip())}</h2>'
elif line.startswith('#'):
rend = f'<h1>{atom_escaped(line[1:].strip())}</h1>'
elif line.startswith('>'):
is_angle = True
#if not in_quote:
# in_quote = True
# out.append('<blockquote>')
rend = f'{atom_escaped(line[1:])}'
elif line.startswith('*'):
is_bullet = True
#if not in_list:
# in_list = True
# out.append('<ul>')
rend = f'<li>{atom_escaped(line[1:].strip())}</li>'
elif line.startswith('=>'):
link = re.match(r'=>\s*([^\s]+)(\s+.*)?', line)
if not link:
continue
url = link.group(1)
label = link.group(2)
if label is None:
label = url
label = label.strip()
parts = urlparse.urlparse(url)
scheme = parts.scheme if parts.scheme else 'gemini'
# if not parts.netloc:
# # Do something about a relative URL?
link_attr = ''
#if parts.path.endswith('.png') or parts.path.endswith('.jpg') or \
# parts.path.endswith('.webp'):
# # Render as an image.
# rend = f'<img src="{url}" title="{html_encode(label)}">'
#else:
rend = f'<p><a href="{url}">{atom_escaped(label)}</a></p>'
elif line.startswith('```'):
in_pre = True
rend = '<pre>'
else:
rend = f'<p>{atom_escaped(line)}</p>'
if rend is not None:
if not is_bullet and in_list:
out.append('</ul>')
in_list = False
if not is_angle and in_quote:
out.append('</blockquote>')
in_quote = False
if is_angle and not in_quote:
out.append('<blockquote>')
in_quote = True
if is_bullet and not in_list:
out.append('<ul>')
in_list = True
out.append(rend)
return '\n'.join(out)
def is_empty_query(req):
return req.query == None or len(req.query) == 0
def clean_query(req):
if req.query == None: return ''
return clean_text(urlparse.unquote(req.query)).strip()
def nonzero(value):
return 1 if value else 0
def is_zero(value):
return 0 if value else 1
def parse_link_segment_query(req) -> tuple:
if req.query == None:
return '', ''
q = urlparse.unquote(req.query).replace('\n', ' ')
found = re.match(r'^\s*(=>)?\s*([^\s]+)(\s+(.+))?\s*$', q)
if not found:
raise GeminiError(59, 'Invalid link syntax (enter URL followed by label, separated with space)')
seg_url = found.group(2)
if '://' not in seg_url:
seg_url = 'gemini://' + seg_url
parsed = urlparse.urlparse(seg_url)
if not parsed.scheme or not parsed.netloc:
raise GeminiError(59, 'Invalid URL')
if found[4]:
seg_text = clean_title(found[4])
else:
seg_text = ''
return seg_url, seg_text
def form_link(url_label: tuple):
url, label = url_label
if len(url) and len(label):
return url + ' ' + label
if len(label) == 0:
return url
return ''
def absolute_url(base, relative):
# This is straight from Solderpunk's gemini-demo.py.
if "://" not in relative:
# Python's URL tools somehow only work with known schemes?
base = base.replace("gemini://", "http://")
relative = urlparse.urljoin(base, relative)
relative = relative.replace("http://", "gemini://")
# Remove the default port.
port_pos = relative.find(':1965/')
if port_pos >= 10:
relative = relative[:port_pos] + relative[port_pos + 5:]
return relative
def gemini_fetch(url, redirect_count=0, max_data=None) -> tuple:
"""Returns tuple: (mime, mime_params, body)."""
if redirect_count == 5:
raise Exception("Too many redirects")
parts = urlparse.urlparse(url)
if parts.scheme != 'gemini':
raise Exception("Only Gemini URLs allowed")
try:
s = socket.create_connection((parts.hostname, parts.port if parts.port else 1965),
timeout=10)
context = ssl.SSLContext()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
s = context.wrap_socket(s, server_hostname=parts.hostname)
s.sendall((url + '\r\n').encode("UTF-8"))
got_header = False
incoming = bytes()
mime = 'application/octet-stream'
while True:
data = s.recv(4096 if not max_data else max(max_data, 1024))
incoming += data
if got_header and max_data and len(incoming) > max_data:
#return mime, incoming
break
if not got_header:
header_end = incoming.find(b'\r\n')
if header_end > 0:
got_header = True
header = incoming[:header_end].decode("UTF-8").strip()
incoming = incoming[header_end + 2:]
parts = header.split()
status = parts[0]
mime = ''.join(parts[1:])
# Follow redirects.
if status.startswith('3'):
s.close()
del s
return gemini_fetch(absolute_url(url, mime),
redirect_count=redirect_count + 1)
elif status.startswith('2'):
mime, mime_opts = cgi.parse_header(mime)
else:
print(url, 'gemini_fetch error:', header)
return None, None, None
if len(data) == 0:
break
s.close()
if got_header:
if mime.startswith('text/'):
return mime, mime_opts, incoming.decode(mime_opts.get("charset", "UTF-8"))
else:
return mime, mime_opts, incoming
except Exception as er:
print(str(er), '-- failed:', url)
return None, None, None
def certificate_sha256(cert):
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
m = hashlib.sha256()
m.update(der)
return m.hexdigest()
def pubkey_sha256(cert):
pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey())
m = hashlib.sha256()
m.update(pubkey)
return m.hexdigest()