From b745b04f7aae1a1653e9d12229ed8189091adc6f Mon Sep 17 00:00:00 2001 From: Lionel Dricot Date: Tue, 18 Jul 2023 00:39:06 +0200 Subject: [PATCH] netcache now works for gemini --- netcache.py | 249 +++++++++++++++++++++++++--------------------------- offpunk.py | 5 -- 2 files changed, 121 insertions(+), 133 deletions(-) diff --git a/netcache.py b/netcache.py index bcfd09a..b9c1847 100755 --- a/netcache.py +++ b/netcache.py @@ -1,8 +1,11 @@ #!/bin/python import os +import sys import urllib.parse import argparse import requests +import codecs +import getpass import socket import ssl from ssl import CertificateError @@ -34,6 +37,10 @@ standard_ports = { } default_protocol = "gemini" +CRLF = '\r\n' +DEFAULT_TIMEOUT = 10 +_MAX_REDIRECTS = 5 + def parse_mime(mime): options = {} if mime: @@ -343,7 +350,7 @@ def _fetch_gopher(url,timeout=10): else: # by default, we should consider Gopher mime = "text/gopher" - cache = write_body(response,mime) + cache = write_body(url,response,mime) return cache def _fetch_finger(url,timeout=10): @@ -384,7 +391,7 @@ def _fetch_spartan(url): body = fp.read() if meta.startswith("text"): body = body.decode("UTF-8") - cache = write_body(body,meta) + cache = write_body(url,body,meta) elif code == 3: redirect_url = url_parts._replace(path=meta).geturl() else: @@ -395,7 +402,7 @@ def _fetch_spartan(url): cache = _fetch_spartan(redirect_url) return cache -def _fetch_gemini(url): +def _fetch_gemini(url,options={}): cache = None url_parts = urllib.parse.urlparse(url) host = url_parts.hostname @@ -404,34 +411,35 @@ def _fetch_gemini(url): query = url_parts.query # Be careful with client certificates! # Are we crossing a domain boundary? - if self.active_cert_domains and host not in self.active_cert_domains: - if self.active_is_transient: - print("Permanently delete currently active transient certificate?") - resp = input("Y/N? ") - if resp.strip().lower() in ("y", "yes"): - print("Destroying certificate.") - self._deactivate_client_cert() - else: - print("Staying here.") - raise UserAbortException() - else: - print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?") - resp = input("Y/N? ") - if resp.strip().lower() in ("n", "no"): - print("Keeping certificate active for {}".format(host)) - else: - print("Deactivating certificate.") - self._deactivate_client_cert() - - # Suggest reactivating previous certs - if not self.client_certs["active"] and host in self.client_certs: - print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host)) - resp = input("Y/N? ") - if resp.strip().lower() in ("y", "yes"): - self._activate_client_cert(*self.client_certs[host]) - else: - print("Remaining unidentified.") - self.client_certs.pop(host) + # TODO : code should be adapted to netcache +# if self.active_cert_domains and host not in self.active_cert_domains: +# if self.active_is_transient: +# print("Permanently delete currently active transient certificate?") +# resp = input("Y/N? ") +# if resp.strip().lower() in ("y", "yes"): +# print("Destroying certificate.") +# self._deactivate_client_cert() +# else: +# print("Staying here.") +# raise UserAbortException() +# else: +# print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?") +# resp = input("Y/N? ") +# if resp.strip().lower() in ("n", "no"): +# print("Keeping certificate active for {}".format(host)) +# else: +# print("Deactivating certificate.") +# self._deactivate_client_cert() +# +# # Suggest reactivating previous certs +# if not self.client_certs["active"] and host in self.client_certs: +# print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host)) +# resp = input("Y/N? ") +# if resp.strip().lower() in ("y", "yes"): +# self._activate_client_cert(*self.client_certs[host]) +# else: +# print("Remaining unidentified.") +# self.client_certs.pop(host) # In AV-98, this was the _send_request method #Send a selector to a given host and port. @@ -457,16 +465,6 @@ def _fetch_gemini(url): # Prepare TLS context protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2 context = ssl.SSLContext(protocol) - - # Use CAs or TOFU - #TODO : should we care about this options? - #if self.options["tls_mode"] == "ca": - # context.verify_mode = ssl.CERT_REQUIRED - # context.check_hostname = True - # context.load_default_certs() - #else: - # context.check_hostname = False - # context.verify_mode = ssl.CERT_NONE context.check_hostname=False context.verify_mode = ssl.CERT_NONE # Impose minimum TLS version @@ -487,57 +485,50 @@ def _fetch_gemini(url): # Rely on the server to only support sensible things, I guess... pass - #TODO: I’m here in the refactor - # Load client certificate if needed - if self.client_certs["active"]: - certfile, keyfile = self.client_certs["active"] - context.load_cert_chain(certfile, keyfile) + #TODO: certificate handling to refactor +# # Load client certificate if needed +# if self.client_certs["active"]: +# certfile, keyfile = self.client_certs["active"] +# context.load_cert_chain(certfile, keyfile) # Connect to remote host by any address possible - err = None - for address in addresses: - self._debug("Connecting to: " + str(address[4])) - s = socket.socket(address[0], address[1]) - if self.sync_only: - timeout = self.options["short_timeout"] - else: - timeout = self.options["timeout"] - s.settimeout(timeout) - s = context.wrap_socket(s, server_hostname = host) - try: - s.connect(address[4]) - break - except OSError as e: - err = e + err = None + for address in addresses: + s = socket.socket(address[0], address[1]) + if "timeout" in options: + timeout = options["timeout"] else: - # If we couldn't connect to *any* of the addresses, just - # bubble up the exception from the last attempt and deny - # knowledge of earlier failures. - raise err - if sys.version_info.minor >=5: - self._debug("Established {} connection.".format(s.version())) - self._debug("Cipher is: {}.".format(s.cipher())) - # Do TOFU - if self.options["tls_mode"] != "ca": - cert = s.getpeercert(binary_form=True) - self._validate_cert(address[4][0], host, cert) - # Remember that we showed the current cert to this domain... - if self.client_certs["active"]: - self.active_cert_domains.append(host) - self.client_certs[host] = self.client_certs["active"] - # Send request and wrap response in a file descriptor - url = urllib.parse.urlparse(gi.url) - new_netloc = host - if port != 1965: - new_netloc += ":" + str(port) - url = urllib.parse.urlunparse(url._replace(netloc=new_netloc)) - self._debug("Sending %s" % url) - s.sendall((url + CRLF).encode("UTF-8")) - mf= s.makefile(mode = "rb") - return address, mf - ## - ## end of send_request - TODO :address, f = self._send_request(gi) + timeout = DEFAULT_TIMEOUT + s.settimeout(timeout) + s = context.wrap_socket(s, server_hostname = host) + try: + s.connect(address[4]) + break + except OSError as e: + err = e + else: + # If we couldn't connect to *any* of the addresses, just + # bubble up the exception from the last attempt and deny + # knowledge of earlier failures. + raise err + + # Do TOFU + cert = s.getpeercert(binary_form=True) + # TODO: another cert handling to refactor + # Remember that we showed the current cert to this domain... +# self._validate_cert(address[4][0], host, cert) +# if self.client_certs["active"]: +# self.active_cert_domains.append(host) +# self.client_certs[host] = self.client_certs["active"] + # Send request and wrap response in a file descriptor + url = urllib.parse.urlparse(url) + new_netloc = host + if port != standard_ports["gemini"]: + new_netloc += ":" + str(port) + url = urllib.parse.urlunparse(url._replace(netloc=new_netloc)) + s.sendall((url + CRLF).encode("UTF-8")) + f= s.makefile(mode = "rb") + ## end of send_request in AV98 # Spec dictates should not exceed 1024 bytes, # so maximum valid header length is 1027 bytes. header = f.readline(1027) @@ -545,7 +536,6 @@ def _fetch_gemini(url): if not header or header[-1] != '\n': raise RuntimeError("Received invalid header from server!") header = header.strip() - self._debug("Response header: %s." % header) # Validate header status, meta = header.split(maxsplit=1) if len(meta) > 1024 or len(status) != 2 or not status.isnumeric(): @@ -553,62 +543,63 @@ def _fetch_gemini(url): raise RuntimeError("Received invalid header from server!") # Update redirect loop/maze escaping state if not status.startswith("3"): - self.previous_redirectors = set() + previous_redirectors = set() + #TODO FIXME + else: + #we set a previous_redirectors anyway because refactoring in progress + previous_redirectors = set() # Handle non-SUCCESS headers, which don't have a response body # Inputs if status.startswith("1"): - if self.sync_only: - return None + print(meta) + if status == "11": + user_input = getpass.getpass("> ") else: - print(meta) - if status == "11": - user_input = getpass.getpass("> ") - else: - user_input = input("> ") - return self._fetch_over_network(query(user_input)) + user_input = input("> ") + return _fetch_gemini(query(user_input)) # Redirects elif status.startswith("3"): - new_gi = GeminiItem(gi.absolutise_url(meta)) - if new_gi.url == gi.url: + newurl = urllib.parse.urljoin(url,meta) + if newurl == url: raise RuntimeError("URL redirects to itself!") - elif new_gi.url in self.previous_redirectors: + elif newurl in previous_redirectors: raise RuntimeError("Caught in redirect loop!") - elif len(self.previous_redirectors) == _MAX_REDIRECTS: + elif len(previous_redirectors) == _MAX_REDIRECTS: raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS) - elif self.sync_only: - follow = self.automatic_choice - # Never follow cross-domain redirects without asking - elif new_gi.host.encode("idna") != gi.host.encode("idna"): - follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) - # Never follow cross-protocol redirects without asking - elif new_gi.scheme != gi.scheme: - follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) - # Don't follow *any* redirect without asking if auto-follow is off - elif not self.options["auto_follow_redirects"]: - follow = input("Follow redirect to %s? (y/n) " % new_gi.url) - # Otherwise, follow away +# TODO: redirections handling should be refactored +# elif "interactive" in options and not options["interactive"]: +# follow = self.automatic_choice +# # Never follow cross-domain redirects without asking +# elif new_gi.host.encode("idna") != gi.host.encode("idna"): +# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) +# # Never follow cross-protocol redirects without asking +# elif new_gi.scheme != gi.scheme: +# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) +# # Don't follow *any* redirect without asking if auto-follow is off +# elif not self.options["auto_follow_redirects"]: +# follow = input("Follow redirect to %s? (y/n) " % new_gi.url) +# # Otherwise, follow away else: follow = "yes" if follow.strip().lower() not in ("y", "yes"): raise UserAbortException() - self._debug("Following redirect to %s." % new_gi.url) - self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors)) - self.previous_redirectors.add(gi.url) - if status == "31": - # Permanent redirect - self.permanent_redirects[gi.url] = new_gi.url - return self._fetch_over_network(new_gi) + previous_redirectors.add(url) +# if status == "31": +# # Permanent redirect +# self.permanent_redirects[gi.url] = new_gi.url + return _fetch_gemini(newurl) # Errors elif status.startswith("4") or status.startswith("5"): raise RuntimeError(meta) # Client cert - elif status.startswith("6"): - self._handle_cert_request(meta) - return self._fetch_over_network(gi) +# elif status.startswith("6"): +# self._handle_cert_request(meta) +# return self._fetch_over_network(gi) # Invalid status elif not status.startswith("2"): raise RuntimeError("Server returned undefined status code %s!" % status) # If we're here, this must be a success and there's a response body + print("status : %s"%status) assert status.startswith("2") mime = meta # Read the response body over the network @@ -634,14 +625,14 @@ def _fetch_gemini(url): encoding declared in header!" % encoding) else: body = fbody - gi.write_body(body,mime) - return gi + cache = write_body(url,body,mime) + return cache def fetch(url): url = normalize_url(url) path=None - if "://" in url + if "://" in url: scheme = url.split("://")[0] if scheme not in standard_ports: print("%s is not a supported protocol"%scheme) @@ -651,6 +642,8 @@ def fetch(url): path=_fetch_gopher(url) elif scheme == "finger": path=_fetch_finger(url) + elif scheme == "gemini": + patch=_fetch_gemini(url) else: print("scheme %s not implemented yet") else: diff --git a/offpunk.py b/offpunk.py index f661d16..54d3572 100755 --- a/offpunk.py +++ b/offpunk.py @@ -21,10 +21,8 @@ __version__ = "1.9.2" import argparse import cmd -import codecs import datetime import fnmatch -import getpass import glob import hashlib import io @@ -89,7 +87,6 @@ if os.path.exists(_old_config): #if no XDG .local/share and not XDG .config, we use the old config if not os.path.exists(data_home) and os.path.exists(_old_config): _DATA_DIR = _CONFIG_DIR -_MAX_REDIRECTS = 5 _MAX_CACHE_SIZE = 10 _MAX_CACHE_AGE_SECS = 180 @@ -548,7 +545,6 @@ class GeminiItem(): def to_map_line(self): return "=> {} {}\n".format(self.url_mode(), self.get_page_title()) -CRLF = '\r\n' # Cheap and cheerful URL detector def looks_like_url(word): @@ -611,7 +607,6 @@ class GeminiClient(cmd.Cmd): self.marks = {} self.page_index = 0 self.permanent_redirects = {} - self.previous_redirectors = set() # Sync-only mode is restriced by design self.visited_hosts = set() self.offline_only = False