mirror of https://git.skyjake.fi/gemini/bubble.git
513 lines
15 KiB
Python
513 lines
15 KiB
Python
import cgi
|
|
import datetime
|
|
import hashlib
|
|
import re
|
|
import socket
|
|
import ssl
|
|
import time
|
|
import urllib.parse as urlparse
|
|
from OpenSSL import crypto
|
|
|
|
|
|
UTC = datetime.timezone.utc
|
|
GEMTEXT_MARKUP = re.compile(r'^(\s*=>\s*|\* |>\s*|##?#?)')
|
|
URI_PATTERN = re.compile(r'(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+(\s+—)?')
|
|
NONLINK_URI_PATTERN = re.compile(r'(?<!=>)(?<!=>\s)(gemini|finger|gopher|spartan|nex|guppy|mailto|data|file|https?|fdroidrepos?:):(//)?[^`") ]+')
|
|
INNER_LINK_PREFIX = '— '
|
|
|
|
|
|
class GeminiError (Exception):
|
|
def __init__(self, code, msg):
|
|
super().__init__(msg)
|
|
self.code = code
|
|
|
|
|
|
def unescape_ini_gemtext(src):
|
|
unesc = []
|
|
for line in src.split('\n'):
|
|
line = line.strip()
|
|
if line == '|':
|
|
line = ''
|
|
elif line.startswith('&&&'):
|
|
line = '###' + line[3:]
|
|
elif line.startswith('&&'):
|
|
line = '##' + line[2:]
|
|
elif line.startswith('&'):
|
|
line = '#' + line[1:]
|
|
unesc.append(line)
|
|
return '\n'.join(unesc)
|
|
|
|
|
|
def is_valid_name(name):
|
|
if len(name) < 2 or len(name) > 30:
|
|
return False
|
|
return re.match(r'^[\w-]+$', name) != None
|
|
|
|
|
|
def plural_s(i, suffix='s'):
|
|
return '' if i == 1 else suffix
|
|
|
|
|
|
def plural(i, word, suffix='s'):
|
|
return f'{i} {word}{plural_s(i, suffix)}'
|
|
|
|
|
|
def parse_at_names(text) -> list:
|
|
names = set()
|
|
pattern = re.compile(r'(\bu/|@)([\w-]+)')
|
|
pos = 0
|
|
while pos < len(text):
|
|
found = pattern.search(text, pos)
|
|
if not found: break
|
|
names.add(found[2].lower())
|
|
pos = found.end()
|
|
return list(names)
|
|
|
|
|
|
def parse_likely_commit_hashes(text) -> list:
|
|
hashes = set()
|
|
pattern = re.compile(r'\b[0-9a-fA-F]{7,}\b')
|
|
pos = 0
|
|
while pos < len(text):
|
|
found = pattern.search(text, pos)
|
|
if not found: break
|
|
hashes.add(found[0].lower())
|
|
pos = found.end()
|
|
return list(hashes)
|
|
|
|
|
|
def clean_text(text):
|
|
text = strip_invalid(text)
|
|
# Clean up the text: ensure that preformatted is closed.
|
|
pre = False
|
|
for line in text.split('\n'):
|
|
if line[:3] == '```':
|
|
pre = not pre
|
|
if pre:
|
|
# Close the preformatted block.
|
|
if not text.endswith('\n'):
|
|
text += '\n'
|
|
text += '```'
|
|
return text.rstrip()
|
|
|
|
|
|
def split_paragraphs(text) -> list:
|
|
"""Split paragraphs unless the empty lines are found inside a preformatted block."""
|
|
paragraphs = []
|
|
pre = False
|
|
para = []
|
|
start = 0
|
|
empty_count = 0
|
|
lines = text.split('\n')
|
|
for i, line in enumerate(lines):
|
|
if not pre:
|
|
if len(line.strip()) == 0:
|
|
empty_count += 1
|
|
continue
|
|
if empty_count >= 1:
|
|
para = '\n'.join(lines[start:i]).strip()
|
|
if len(para):
|
|
paragraphs.append(para)
|
|
start = i
|
|
empty_count = 0
|
|
if line[:3] == '```':
|
|
pre = not pre
|
|
last = '\n'.join(lines[start:]).strip()
|
|
if len(last):
|
|
paragraphs.append(last)
|
|
return paragraphs
|
|
|
|
|
|
def remove_prefix(text, prefix):
|
|
if text.startswith(prefix):
|
|
return text[len(prefix):]
|
|
return text
|
|
|
|
|
|
def strip_links(text, placeholder=True):
|
|
return URI_PATTERN.sub(r'[\1 link]' if placeholder else '', text)
|
|
|
|
|
|
def parse_nonlink_uris(text) -> list:
|
|
links = []
|
|
pos = 0
|
|
while pos < len(text):
|
|
found = NONLINK_URI_PATTERN.search(text, pos)
|
|
if not found: break
|
|
links.append(found[0])
|
|
pos = found.end()
|
|
return links
|
|
|
|
|
|
def clean_title(title):
|
|
# Strip `=>` and other Gemini syntax.
|
|
cleaned = []
|
|
pre = False
|
|
unlabeled_link_pattern = re.compile(r'(\w+://[^ ]+) — \1')
|
|
for line in title.split('\n'):
|
|
if line[:3] == '```':
|
|
if not pre:
|
|
pre_label = line[3:].strip()
|
|
if len(pre_label) == 0:
|
|
pre_label = 'preformatted'
|
|
line = f'[{pre_label}]'
|
|
cleaned.append(line)
|
|
pre = not pre
|
|
continue
|
|
if pre:
|
|
continue
|
|
found = GEMTEXT_MARKUP.match(line)
|
|
if found:
|
|
line = line[found.end():]
|
|
line = unlabeled_link_pattern.sub(r'\1', line)
|
|
line = line.replace('\t', ' ')
|
|
cleaned.append(line)
|
|
title = ' '.join(cleaned).strip()
|
|
return title
|
|
|
|
|
|
def clean_description(desc):
|
|
# Strip links but keep other formatting.
|
|
cleaned = []
|
|
pre = False
|
|
for line in desc.split('\n'):
|
|
line = line.strip()
|
|
if line.startswith('```'):
|
|
pre = not pre
|
|
if not pre:
|
|
if line.startswith('=>'):
|
|
continue
|
|
cleaned.append(line)
|
|
return '\n'.join(cleaned)
|
|
|
|
|
|
def clean_tinylog(text):
|
|
# Clean it up as per Tinylog specification.
|
|
clean = []
|
|
pre = False
|
|
for line in text.split('\n'):
|
|
if line.startswith('```'):
|
|
clean.append(line)
|
|
pre = not pre
|
|
continue
|
|
if pre:
|
|
clean.append(line)
|
|
continue
|
|
m = re.search(r'^(##?)[^#]', line) # only level 3 headings allowed
|
|
if m:
|
|
line = '###' + line[len(m[1]):]
|
|
clean.append(line)
|
|
return '\n'.join(clean)
|
|
|
|
|
|
def prefix_links(src, prefix):
|
|
"""Add a prefix to link labels."""
|
|
if not prefix:
|
|
return src
|
|
lines = []
|
|
pattern = re.compile(r'^\s*=>\s*([^ ]+)(\s+(.*))?$')
|
|
pre = False
|
|
for line in src.split('\n'):
|
|
if line.startswith('```'):
|
|
pre = not pre
|
|
elif not pre:
|
|
m = pattern.match(line)
|
|
if m:
|
|
label = m[3].strip() if m[3] and len(m[3]) else ''
|
|
if len(label) == 0:
|
|
label = m[1]
|
|
# Omit gemini scheme.
|
|
if label.startswith('gemini://'):
|
|
label = label[9:]
|
|
line = f'=> {m[1]} {prefix}{label}'
|
|
lines.append(line)
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def strip_invalid(src):
|
|
return src.replace('\x00', '')
|
|
|
|
|
|
def shorten_text(text, n):
|
|
"""Truncate and cut at white or word boundary."""
|
|
if len(text) > n:
|
|
text = text[:n]
|
|
if text[-1] == ' ':
|
|
return text.strip() + '...'
|
|
m = re.search(r'[\w,.]+$', text)
|
|
if m:
|
|
return text[:m.start()].rstrip() + '...'
|
|
return text.rstrip() + '...'
|
|
return text.strip()
|
|
|
|
|
|
def time_delta_text(sec, date_ts, suffix='ago', now='Now',
|
|
date_prefix='',
|
|
date_fmt='%Y-%m-%d',
|
|
date_sep=' · ',
|
|
short_date_fmt='%b %d',
|
|
tz=None):
|
|
if sec < 2:
|
|
return now
|
|
if sec < 60:
|
|
return f'{sec} seconds {suffix}'
|
|
mins = int(sec / 60)
|
|
if sec < 3600:
|
|
return f'{mins} minute{plural_s(mins)} {suffix}'
|
|
hours = int(sec / 3600)
|
|
if hours <= 24:
|
|
return f'{hours} hour{plural_s(hours)} {suffix}'
|
|
days = round(sec / 3600 / 24)
|
|
dt = datetime.datetime.fromtimestamp(date_ts, UTC)
|
|
if tz:
|
|
dt = dt.astimezone(tz)
|
|
current_year = datetime.datetime.now().year
|
|
age = date_prefix + dt.strftime(short_date_fmt if dt.year == current_year else date_fmt)
|
|
if days < 14:
|
|
return age + f'{date_sep}{days} day{plural_s(days)} {suffix}'
|
|
weeks = round(days / 7)
|
|
if weeks <= 8:
|
|
return age + f'{date_sep}{weeks} week{plural_s(weeks)} {suffix}'
|
|
months = round(days / (365 / 12)) # average month length
|
|
if months < 12:
|
|
return age + f'{date_sep}{months} month{plural_s(months)} {suffix}'
|
|
years = round(days / 365)
|
|
return age + f'{date_sep}{years} year{plural_s(years)} {suffix}'
|
|
|
|
|
|
def ago_text(ts, suffix='ago', now='Now', tz=None):
|
|
sec = max(0, int(time.time()) - ts)
|
|
return time_delta_text(sec, ts, suffix, now, tz=tz)
|
|
|
|
|
|
def atom_timestamp(ts):
|
|
return datetime.datetime.fromtimestamp(ts, UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def atom_escaped(text):
|
|
return text.replace('&', '&').replace('<', '<').replace('>', '>').\
|
|
replace("'", ''').replace('"', '"')
|
|
|
|
|
|
def gemtext_to_html(src):
|
|
out = []
|
|
|
|
in_list = False
|
|
in_quote = False
|
|
in_pre = False
|
|
|
|
for line in src.rstrip().split('\n'):
|
|
rend = None
|
|
is_bullet = False
|
|
is_angle = False
|
|
if in_pre:
|
|
if line.startswith('```'):
|
|
in_pre = False
|
|
rend = '</pre>'
|
|
else:
|
|
rend = atom_escaped(line)
|
|
else:
|
|
if line.startswith('###'):
|
|
rend = f'<h3>{atom_escaped(line[3:].strip())}</h3>'
|
|
elif line.startswith('##'):
|
|
rend = f'<h2>{atom_escaped(line[2:].strip())}</h2>'
|
|
elif line.startswith('#'):
|
|
rend = f'<h1>{atom_escaped(line[1:].strip())}</h1>'
|
|
elif line.startswith('>'):
|
|
is_angle = True
|
|
#if not in_quote:
|
|
# in_quote = True
|
|
# out.append('<blockquote>')
|
|
rend = f'{atom_escaped(line[1:])}'
|
|
elif line.startswith('*'):
|
|
is_bullet = True
|
|
#if not in_list:
|
|
# in_list = True
|
|
# out.append('<ul>')
|
|
rend = f'<li>{atom_escaped(line[1:].strip())}</li>'
|
|
elif line.startswith('=>'):
|
|
link = re.match(r'=>\s*([^\s]+)(\s+.*)?', line)
|
|
if not link:
|
|
continue
|
|
url = link.group(1)
|
|
label = link.group(2)
|
|
if label is None:
|
|
label = url
|
|
label = label.strip()
|
|
parts = urlparse.urlparse(url)
|
|
scheme = parts.scheme if parts.scheme else 'gemini'
|
|
# if not parts.netloc:
|
|
# # Do something about a relative URL?
|
|
link_attr = ''
|
|
#if parts.path.endswith('.png') or parts.path.endswith('.jpg') or \
|
|
# parts.path.endswith('.webp'):
|
|
# # Render as an image.
|
|
# rend = f'<img src="{url}" title="{html_encode(label)}">'
|
|
#else:
|
|
rend = f'<p><a href="{url}">{atom_escaped(label)}</a></p>'
|
|
elif line.startswith('```'):
|
|
in_pre = True
|
|
rend = '<pre>'
|
|
else:
|
|
rend = f'<p>{atom_escaped(line)}</p>'
|
|
|
|
if rend is not None:
|
|
if not is_bullet and in_list:
|
|
out.append('</ul>')
|
|
in_list = False
|
|
if not is_angle and in_quote:
|
|
out.append('</blockquote>')
|
|
in_quote = False
|
|
if is_angle and not in_quote:
|
|
out.append('<blockquote>')
|
|
in_quote = True
|
|
if is_bullet and not in_list:
|
|
out.append('<ul>')
|
|
in_list = True
|
|
out.append(rend)
|
|
|
|
return '\n'.join(out)
|
|
|
|
|
|
def is_empty_query(req):
|
|
return req.query == None or len(req.query) == 0
|
|
|
|
|
|
def clean_query(req):
|
|
if req.query == None: return ''
|
|
return clean_text(urlparse.unquote(req.query)).strip()
|
|
|
|
|
|
def nonzero(value):
|
|
return 1 if value else 0
|
|
|
|
|
|
def is_zero(value):
|
|
return 0 if value else 1
|
|
|
|
|
|
def parse_link_segment_query(req) -> tuple:
|
|
if req.query == None:
|
|
return '', ''
|
|
q = urlparse.unquote(req.query).replace('\n', ' ')
|
|
found = re.match(r'^\s*(=>)?\s*([^\s]+)(\s+(.+))?\s*$', q)
|
|
if not found:
|
|
raise GeminiError(59, 'Invalid link syntax (enter URL followed by label, separated with space)')
|
|
seg_url = found.group(2)
|
|
if '://' not in seg_url:
|
|
seg_url = 'gemini://' + seg_url
|
|
parsed = urlparse.urlparse(seg_url)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
raise GeminiError(59, 'Invalid URL')
|
|
if found[4]:
|
|
seg_text = clean_title(found[4])
|
|
else:
|
|
seg_text = ''
|
|
return seg_url, seg_text
|
|
|
|
|
|
def form_link(url_label: tuple):
|
|
url, label = url_label
|
|
if len(url) and len(label):
|
|
return url + ' ' + label
|
|
if len(label) == 0:
|
|
return url
|
|
return ''
|
|
|
|
|
|
def absolute_url(base, relative):
|
|
# This is straight from Solderpunk's gemini-demo.py.
|
|
if "://" not in relative:
|
|
# Python's URL tools somehow only work with known schemes?
|
|
base = base.replace("gemini://", "http://")
|
|
relative = urlparse.urljoin(base, relative)
|
|
relative = relative.replace("http://", "gemini://")
|
|
# Remove the default port.
|
|
port_pos = relative.find(':1965/')
|
|
if port_pos >= 10:
|
|
relative = relative[:port_pos] + relative[port_pos + 5:]
|
|
return relative
|
|
|
|
|
|
def gemini_fetch(url, redirect_count=0, max_data=None) -> tuple:
|
|
"""Returns tuple: (mime, mime_params, body)."""
|
|
|
|
if redirect_count == 5:
|
|
raise Exception("Too many redirects")
|
|
parts = urlparse.urlparse(url)
|
|
if parts.scheme != 'gemini':
|
|
raise Exception("Only Gemini URLs allowed")
|
|
try:
|
|
s = socket.create_connection((parts.hostname, parts.port if parts.port else 1965),
|
|
timeout=10)
|
|
context = ssl.SSLContext()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
s = context.wrap_socket(s, server_hostname=parts.hostname)
|
|
s.sendall((url + '\r\n').encode("UTF-8"))
|
|
got_header = False
|
|
incoming = bytes()
|
|
mime = 'application/octet-stream'
|
|
|
|
while True:
|
|
data = s.recv(4096 if not max_data else max(max_data, 1024))
|
|
incoming += data
|
|
|
|
if got_header and max_data and len(incoming) > max_data:
|
|
#return mime, incoming
|
|
break
|
|
|
|
if not got_header:
|
|
header_end = incoming.find(b'\r\n')
|
|
if header_end > 0:
|
|
got_header = True
|
|
header = incoming[:header_end].decode("UTF-8").strip()
|
|
incoming = incoming[header_end + 2:]
|
|
parts = header.split()
|
|
status = parts[0]
|
|
mime = ''.join(parts[1:])
|
|
|
|
# Follow redirects.
|
|
if status.startswith('3'):
|
|
s.close()
|
|
del s
|
|
return gemini_fetch(absolute_url(url, mime),
|
|
redirect_count=redirect_count + 1)
|
|
|
|
elif status.startswith('2'):
|
|
mime, mime_opts = cgi.parse_header(mime)
|
|
|
|
else:
|
|
print(url, 'gemini_fetch error:', header)
|
|
return None, None, None
|
|
|
|
if len(data) == 0:
|
|
break
|
|
|
|
s.close()
|
|
|
|
if got_header:
|
|
if mime.startswith('text/'):
|
|
return mime, mime_opts, incoming.decode(mime_opts.get("charset", "UTF-8"))
|
|
else:
|
|
return mime, mime_opts, incoming
|
|
|
|
except Exception as er:
|
|
print(str(er), '-- failed:', url)
|
|
|
|
return None, None, None
|
|
|
|
|
|
def certificate_sha256(cert):
|
|
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
|
|
m = hashlib.sha256()
|
|
m.update(der)
|
|
return m.hexdigest()
|
|
|
|
|
|
def pubkey_sha256(cert):
|
|
pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey())
|
|
m = hashlib.sha256()
|
|
m.update(pubkey)
|
|
return m.hexdigest()
|