"""Zenit - the Molniya indexer. Zenit was a series of military photoreconnaissance satellites launched by the Soviet Union between 1961 and 1994. In keeping with the Soviet spy satellite theme, I chose this name for the indexer.""" import json, urllib.parse, traceback, sys, ssl, socket, string, vcert from config import * # stolen from AV-98 urllib.parse.uses_relative.append("gemini") urllib.parse.uses_netloc.append("gemini") # Load URL list URLS = [MAIN_PAGE] try: with open("orbit.json") as f: URLS = json.load(f)["urls"] except IOError as e: # we can be a bit more outgoing about our errors here print(f"Error loading orbit.json: {e!r}") print("Continuing on anyways with a list containing only the URL of the main page.") except KeyError as e: print("Malformed orbit.json: no urls list") print("Continuing on anyways with a list containing only the URL of the main page.") except: print("Error loading orbit.json (not IOError or KeyError):") traceback.print_exc() print("Exiting.") sys.exit(1) # Utility function to parse a MIME type def parse_mime(mimetype): mimetype = mimetype.strip() index = 0 type = "" # type is everything before the / while index=len(mimetype): return [type,subtype], dict() params = dict() while index=len(mimetype) or mimetype[index]==";": index+=1 params[paramName]=None continue # otherwise, grab the param value index+=1 paramValue = "" if mimetype[index]=='"': index+=1 while True: while index=len(mimetype): break c = mimetype[index] index+=1 if c=="\\": if index>=len(mimetype): paramValue+=c break paramValue+=mimetype[index] index+=1 else: break # skip until next ; while index=5: return "Too many redirects!","text/plain" parsed = urllib.parse.urlparse(url) if "ctx" not in globals(): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE globals()["ctx"]=ctx else: ctx = globals()["ctx"] with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as s: s.settimeout(5) ss = ctx.wrap_socket(s,server_hostname=parsed.hostname) try: ss.connect((parsed.hostname,parsed.port or 1965)) vcert.validate_cert(parsed.hostname,ss.getpeercert(True)) except ConnectionRefusedError: print("Connection refused!",file=sys.stderr) return b'', 'application/octet-stream' except socket.timeout: print("Timeout!",file=sys.stderr) return b'', 'application/octet-stream' except ssl.CertificateError as e: print(e.args[0],file=sys.stderr) return b'', 'application/octet-stream' ss.send((url.strip()+"\r\n").encode("UTF-8")) out = b"" while (data:=ss.recv(2048)): out+=data header, content = out.split(b"\r\n",1) status, meta = header.decode("utf-8").split(None,1) assert len(meta)<1024 if status[0]=="2": types, params = parse_mime(meta) if types[0]=="text": # assume UTF-8 charset = "utf-8" # ...but if another charset is given accept it if "charset" in params: charset = params["charset"] # decode and return return content.decode(charset), meta else: # if it's not a text result, just return the content return content, meta elif status[0]=="3": # if it's a redirect, then let's follow it return grab_content(meta,redirect_num+1) else: # Either: # 1x - it wants an input, which we have no agency to give # 6x - it wants a client cert, which we have no agency to give # 4x or 5x - there's an error # Return the header with a mimetype of text/plain. If this were a real library I might throw an error here, but this is just to make Zenit work. return header.decode("utf-8"), "text/plain" CAPSULES_IN_ORBIT = set(["g.dumke.me"]) # temporary change to remove a certain link import copy links = copy.deepcopy(URLS)[1:] # skip main link for link in links: URLS.remove(link) # assume link doesn't belong # Things to consider for a new link: # Does its capsule already have representation in the orbit? capsule = determine_capsule(urllib.parse.urlparse(link)) if capsule in CAPSULES_IN_ORBIT: # skip print(f"Skipping {link} (capsule already in orbit)...") continue # Does it link to any of the required links? response, mime = grab_content(link) try: assert mime.startswith("text/gemini"), f"{mime} response isn't text/gemini and therefore can't link back" links_to_orbit = False for line in response.splitlines(): if line.startswith("=>"): parts = line.replace("=>","=> ").replace("=> ","=> ").split(None,2) for reqlink in REQUIRED_LINKS: links_to_orbit=links_to_orbit or parts[1].startswith(reqlink) assert links_to_orbit, "doesn't link back to orbit" except AssertionError as e: print(f"Skipping {link} ({e.args[0]})...") continue # If we haven't continue'd by now, the link meets all of the criteria print(f"Adding {link} to the orbit...") URLS.append(link) CAPSULES_IN_ORBIT.add(capsule) modified_orbit = True if modified_orbit: print("Saving modified orbit...") with open("orbit.json","w") as f: json.dump(dict(urls=URLS),f)