molniya/zenit-reindexer.py

185 lines
6.2 KiB
Python

"""Zenit - the Molniya indexer.
Zenit was a series of military photoreconnaissance satellites launched by the Soviet Union between 1961 and 1994. In keeping with the Soviet spy satellite theme, I chose this name for the indexer."""
import json, urllib.parse, traceback, sys, ssl, socket, string, vcert
from config import *
# stolen from AV-98
urllib.parse.uses_relative.append("gemini")
urllib.parse.uses_netloc.append("gemini")
# Load URL list
URLS = [MAIN_PAGE]
try:
with open("orbit.json") as f:
URLS = json.load(f)["urls"]
except IOError as e: # we can be a bit more outgoing about our errors here
print(f"Error loading orbit.json: {e!r}")
print("Continuing on anyways with a list containing only the URL of the main page.")
except KeyError as e:
print("Malformed orbit.json: no urls list")
print("Continuing on anyways with a list containing only the URL of the main page.")
except:
print("Error loading orbit.json (not IOError or KeyError):")
traceback.print_exc()
print("Exiting.")
sys.exit(1)
# Utility function to parse a MIME type
def parse_mime(mimetype):
mimetype = mimetype.strip()
index = 0
type = ""
# type is everything before the /
while index<len(mimetype) and mimetype[index]!="/":
type+=mimetype[index]
index+=1
index+=1
subtype = ""
# subtype is everything after the slash and before the semicolon (if the latter exists)
while index<len(mimetype) and mimetype[index]!=";":
subtype+=mimetype[index]
index+=1
index+=1
# if there's no semicolon, there are no params
if index>=len(mimetype): return [type,subtype], dict()
params = dict()
while index<len(mimetype):
# skip whitespace
while index<len(mimetype) and mimetype[index] in string.whitespace:
index+=1
paramName = ""
# the parameter name is everything before the = or ;
while index<len(mimetype) and mimetype[index] not in "=;":
paramName+=mimetype[index]
index+=1
# if the string is over or there isn't an equals sign, there's no param value
if index>=len(mimetype) or mimetype[index]==";":
index+=1
params[paramName]=None
continue
# otherwise, grab the param value
index+=1
paramValue = ""
if mimetype[index]=='"':
index+=1
while True:
while index<len(mimetype) and mimetype[index] not in '\\"':
paramValue+=mimetype[index]
index+=1
if index>=len(mimetype): break
c = mimetype[index]
index+=1
if c=="\\":
if index>=len(mimetype):
paramValue+=c
break
paramValue+=mimetype[index]
index+=1
else:
break
# skip until next ;
while index<len(mimetype) and mimetype[index]!=";": index+=1
else:
while index<len(mimetype) and mimetype[index]!=";":
paramValue+=mimetype[index]
index+=1
if paramName: params[paramName]=paramValue
return [type, subtype], params
# Utility function to grab content from a URL
# Context setup courtesy of my own half-baked spartan client
def grab_content(url,redirect_num=0):
if redirect_num>=5:
return "Too many redirects!","text/plain"
parsed = urllib.parse.urlparse(url)
if "ctx" not in globals():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
globals()["ctx"]=ctx
else:
ctx = globals()["ctx"]
with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as s:
s.settimeout(5)
ss = ctx.wrap_socket(s,server_hostname=parsed.hostname)
try:
ss.connect((parsed.hostname,parsed.port or 1965))
vcert.validate_cert(parsed.hostname,ss.getpeercert(True))
except ConnectionRefusedError:
print("Connection refused!",file=sys.stderr)
return b'', 'application/octet-stream'
except socket.timeout:
print("Timeout!",file=sys.stderr)
return b'', 'application/octet-stream'
except ssl.CertificateError as e:
print(e.args[0],file=sys.stderr)
return b'', 'application/octet-stream'
ss.send((url.strip()+"\r\n").encode("UTF-8"))
out = b""
while (data:=ss.recv(2048)):
out+=data
header, content = out.split(b"\r\n",1)
status, meta = header.decode("utf-8").split(None,1)
assert len(meta)<1024
if status[0]=="2":
types, params = parse_mime(meta)
if types[0]=="text":
# assume UTF-8
charset = "utf-8"
# ...but if another charset is given accept it
if "charset" in params:
charset = params["charset"]
# decode and return
return content.decode(charset), meta
else:
# if it's not a text result, just return the content
return content, meta
elif status[0]=="3":
# if it's a redirect, then let's follow it
return grab_content(meta,redirect_num+1)
else:
# Either:
# 1x - it wants an input, which we have no agency to give
# 6x - it wants a client cert, which we have no agency to give
# 4x or 5x - there's an error
# Return the header with a mimetype of text/plain. If this were a real library I might throw an error here, but this is just to make Zenit work.
return header.decode("utf-8"), "text/plain"
CAPSULES_IN_ORBIT = set(["g.dumke.me"]) # temporary change to remove a certain link
import copy
links = copy.deepcopy(URLS)[1:] # skip main link
for link in links:
URLS.remove(link) # assume link doesn't belong
# Things to consider for a new link:
# Does its capsule already have representation in the orbit?
capsule = determine_capsule(urllib.parse.urlparse(link))
if capsule in CAPSULES_IN_ORBIT:
# skip
print(f"Skipping {link} (capsule already in orbit)...")
continue
# Does it link to any of the required links?
response, mime = grab_content(link)
try:
assert mime.startswith("text/gemini"), f"{mime} response isn't text/gemini and therefore can't link back"
links_to_orbit = False
for line in response.splitlines():
if line.startswith("=>"):
parts = line.replace("=>","=> ").replace("=> ","=> ").split(None,2)
for reqlink in REQUIRED_LINKS:
links_to_orbit=links_to_orbit or parts[1].startswith(reqlink)
assert links_to_orbit, "doesn't link back to orbit"
except AssertionError as e:
print(f"Skipping {link} ({e.args[0]})...")
continue
# If we haven't continue'd by now, the link meets all of the criteria
print(f"Adding {link} to the orbit...")
URLS.append(link)
CAPSULES_IN_ORBIT.add(capsule)
modified_orbit = True
if modified_orbit:
print("Saving modified orbit...")
with open("orbit.json","w") as f:
json.dump(dict(urls=URLS),f)