diff --git a/netcache.py b/netcache.py index 4b22cd5..f7e651e 100755 --- a/netcache.py +++ b/netcache.py @@ -4,6 +4,8 @@ import urllib.parse import argparse import requests import socket +import ssl +from ssl import CertificateError try: import chardet _HAS_CHARDET = True @@ -361,7 +363,7 @@ def _fetch_spartan(url): cache = None url_parts = urllib.parse.urlparse(url) host = url_parts.hostname - port = url_parts.port or 300 + port = url_parts.port or standard_ports["spartan"] path = url_parts.path or "/" query = url_parts.query redirect_url = None @@ -393,6 +395,243 @@ def _fetch_spartan(url): cache = _fetch_spartan(redirect_url) return cache +def _fetch_gemini(url): + cache = None + url_parts = urllib.parse.urlparse(url) + host = url_parts.hostname + port = url_parts.port or standard_ports["gemini"] + path = url_parts.path or "/" + query = url_parts.query + # Be careful with client certificates! + # Are we crossing a domain boundary? + if self.active_cert_domains and host not in self.active_cert_domains: + if self.active_is_transient: + print("Permanently delete currently active transient certificate?") + resp = input("Y/N? ") + if resp.strip().lower() in ("y", "yes"): + print("Destroying certificate.") + self._deactivate_client_cert() + else: + print("Staying here.") + raise UserAbortException() + else: + print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?") + resp = input("Y/N? ") + if resp.strip().lower() in ("n", "no"): + print("Keeping certificate active for {}".format(host)) + else: + print("Deactivating certificate.") + self._deactivate_client_cert() + + # Suggest reactivating previous certs + if not self.client_certs["active"] and host in self.client_certs: + print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host)) + resp = input("Y/N? ") + if resp.strip().lower() in ("y", "yes"): + self._activate_client_cert(*self.client_certs[host]) + else: + print("Remaining unidentified.") + self.client_certs.pop(host) + + # In AV-98, this was the _send_request method + #Send a selector to a given host and port. + #Returns the resolved address and binary file with the reply.""" + host = host.encode("idna").decode() + # Do DNS resolution + # DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled + if ":" in host: + # This is likely a literal IPv6 address, so we can *only* ask for + # IPv6 addresses or getaddrinfo will complain + family_mask = socket.AF_INET6 + elif socket.has_ipv6: + # Accept either IPv4 or IPv6 addresses + family_mask = 0 + else: + # IPv4 only + family_mask = socket.AF_INET + addresses = socket.getaddrinfo(host, port, family=family_mask, + type=socket.SOCK_STREAM) + # Sort addresses so IPv6 ones come first + addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True) + ## Continuation of send_request + # Prepare TLS context + protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2 + context = ssl.SSLContext(protocol) + + # Use CAs or TOFU + if self.options["tls_mode"] == "ca": + context.verify_mode = ssl.CERT_REQUIRED + context.check_hostname = True + context.load_default_certs() + else: + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + # Impose minimum TLS version + ## In 3.7 and above, this is easy... + if sys.version_info.minor >= 7: + context.minimum_version = ssl.TLSVersion.TLSv1_2 + ## Otherwise, it seems very hard... + ## The below is less strict than it ought to be, but trying to disable + ## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures + ## with recent versions of OpenSSL. What a mess... + else: + context.options |= ssl.OP_NO_SSLv3 + context.options |= ssl.OP_NO_SSLv2 + # Try to enforce sensible ciphers + try: + context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH") + except ssl.SSLError: + # Rely on the server to only support sensible things, I guess... + pass + # Load client certificate if needed + if self.client_certs["active"]: + certfile, keyfile = self.client_certs["active"] + context.load_cert_chain(certfile, keyfile) + + # Connect to remote host by any address possible + err = None + for address in addresses: + self._debug("Connecting to: " + str(address[4])) + s = socket.socket(address[0], address[1]) + if self.sync_only: + timeout = self.options["short_timeout"] + else: + timeout = self.options["timeout"] + s.settimeout(timeout) + s = context.wrap_socket(s, server_hostname = host) + try: + s.connect(address[4]) + break + except OSError as e: + err = e + else: + # If we couldn't connect to *any* of the addresses, just + # bubble up the exception from the last attempt and deny + # knowledge of earlier failures. + raise err + if sys.version_info.minor >=5: + self._debug("Established {} connection.".format(s.version())) + self._debug("Cipher is: {}.".format(s.cipher())) + # Do TOFU + if self.options["tls_mode"] != "ca": + cert = s.getpeercert(binary_form=True) + self._validate_cert(address[4][0], host, cert) + # Remember that we showed the current cert to this domain... + if self.client_certs["active"]: + self.active_cert_domains.append(host) + self.client_certs[host] = self.client_certs["active"] + # Send request and wrap response in a file descriptor + url = urllib.parse.urlparse(gi.url) + new_netloc = host + if port != 1965: + new_netloc += ":" + str(port) + url = urllib.parse.urlunparse(url._replace(netloc=new_netloc)) + self._debug("Sending %s" % url) + s.sendall((url + CRLF).encode("UTF-8")) + mf= s.makefile(mode = "rb") + return address, mf + ## + ## end of send_request + TODO :address, f = self._send_request(gi) + # Spec dictates should not exceed 1024 bytes, + # so maximum valid header length is 1027 bytes. + header = f.readline(1027) + header = urllib.parse.unquote(header.decode("UTF-8")) + if not header or header[-1] != '\n': + raise RuntimeError("Received invalid header from server!") + header = header.strip() + self._debug("Response header: %s." % header) + # Validate header + status, meta = header.split(maxsplit=1) + if len(meta) > 1024 or len(status) != 2 or not status.isnumeric(): + f.close() + raise RuntimeError("Received invalid header from server!") + # Update redirect loop/maze escaping state + if not status.startswith("3"): + self.previous_redirectors = set() + # Handle non-SUCCESS headers, which don't have a response body + # Inputs + if status.startswith("1"): + if self.sync_only: + return None + else: + print(meta) + if status == "11": + user_input = getpass.getpass("> ") + else: + user_input = input("> ") + return self._fetch_over_network(query(user_input)) + # Redirects + elif status.startswith("3"): + new_gi = GeminiItem(gi.absolutise_url(meta)) + if new_gi.url == gi.url: + raise RuntimeError("URL redirects to itself!") + elif new_gi.url in self.previous_redirectors: + raise RuntimeError("Caught in redirect loop!") + elif len(self.previous_redirectors) == _MAX_REDIRECTS: + raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS) + elif self.sync_only: + follow = self.automatic_choice + # Never follow cross-domain redirects without asking + elif new_gi.host.encode("idna") != gi.host.encode("idna"): + follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) + # Never follow cross-protocol redirects without asking + elif new_gi.scheme != gi.scheme: + follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) + # Don't follow *any* redirect without asking if auto-follow is off + elif not self.options["auto_follow_redirects"]: + follow = input("Follow redirect to %s? (y/n) " % new_gi.url) + # Otherwise, follow away + else: + follow = "yes" + if follow.strip().lower() not in ("y", "yes"): + raise UserAbortException() + self._debug("Following redirect to %s." % new_gi.url) + self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors)) + self.previous_redirectors.add(gi.url) + if status == "31": + # Permanent redirect + self.permanent_redirects[gi.url] = new_gi.url + return self._fetch_over_network(new_gi) + # Errors + elif status.startswith("4") or status.startswith("5"): + raise RuntimeError(meta) + # Client cert + elif status.startswith("6"): + self._handle_cert_request(meta) + return self._fetch_over_network(gi) + # Invalid status + elif not status.startswith("2"): + raise RuntimeError("Server returned undefined status code %s!" % status) + # If we're here, this must be a success and there's a response body + assert status.startswith("2") + mime = meta + # Read the response body over the network + fbody = f.read() + # DEFAULT GEMINI MIME + if mime == "": + mime = "text/gemini; charset=utf-8" + shortmime, mime_options = parse_mime(mime) + if "charset" in mime_options: + try: + codecs.lookup(mime_options["charset"]) + except LookupError: + #raise RuntimeError("Header declared unknown encoding %s" % mime_options) + #If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header + mime_options["charset"] = "UTF-8" + if shortmime.startswith("text/"): + #Get the charset and default to UTF-8 in none + encoding = mime_options.get("charset", "UTF-8") + try: + body = fbody.decode(encoding) + except UnicodeError: + raise RuntimeError("Could not decode response body using %s\ + encoding declared in header!" % encoding) + else: + body = fbody + gi.write_body(body,mime) + return gi + def fetch(url): url = normalize_url(url) diff --git a/offpunk.py b/offpunk.py index f19e4d1..f661d16 100755 --- a/offpunk.py +++ b/offpunk.py @@ -37,8 +37,6 @@ import shlex import shutil import socket import sqlite3 -import ssl -from ssl import CertificateError import sys import time import urllib.parse @@ -631,7 +629,6 @@ class GeminiClient(cmd.Cmd): self.options = { "debug" : False, "beta" : False, - "ipv6" : True, "timeout" : 600, "short_timeout" : 5, "width" : 72, @@ -882,311 +879,7 @@ class GeminiClient(cmd.Cmd): print("You can use the ! command to specify another handler program or pipeline.") - def _fetch_finger(self,gi,timeout=10): - if not looks_like_url(gi.url): - print("%s is not a valid url" %gi.url) - parsed = urllib.parse.urlparse(gi.url) - host = parsed.hostname - port = parsed.port or standard_ports["finger"] - query = parsed.path.lstrip("/") + "\r\n" - with socket.create_connection((host,port)) as sock: - sock.settimeout(timeout) - sock.send(query.encode()) - response = sock.makefile("rb").read().decode("UTF-8") - gi.write_body(response,"text/plain") - return gi - # Copied from reference spartan client by Michael Lazar - def _fetch_spartan(self,gi): - url_parts = urllib.parse.urlparse(gi.url) - host = url_parts.hostname - port = url_parts.port or 300 - path = url_parts.path or "/" - query = url_parts.query - - redirect_url = None - - with socket.create_connection((host,port)) as sock: - if query: - data = urllib.parse.unquote_to_bytes(query) - else: - data = b"" - encoded_host = host.encode("idna") - ascii_path = urllib.parse.unquote_to_bytes(path) - encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii") - sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data))) - fp = sock.makefile("rb") - response = fp.readline(4096).decode("ascii").strip("\r\n") - parts = response.split(" ",maxsplit=1) - code,meta = int(parts[0]),parts[1] - if code == 2: - body = fp.read() - if meta.startswith("text"): - body = body.decode("UTF-8") - gi.write_body(body,meta) - elif code == 3: - redirect_url = url_parts._replace(path=meta).geturl() - else: - gi.set_error("Spartan code %s: Error %s"%(code,meta)) - if redirect_url: - gi = GeminiItem(redirect_url) - self._fetch_spartan(gi) - return gi - - # fetch_over_network will modify with gi.write_body(body,mime) - # before returning the gi - def _fetch_over_network(self, gi): - - # Be careful with client certificates! - # Are we crossing a domain boundary? - if self.active_cert_domains and gi.host not in self.active_cert_domains: - if self.active_is_transient: - print("Permanently delete currently active transient certificate?") - resp = input("Y/N? ") - if resp.strip().lower() in ("y", "yes"): - print("Destroying certificate.") - self._deactivate_client_cert() - else: - print("Staying here.") - raise UserAbortException() - else: - print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?") - resp = input("Y/N? ") - if resp.strip().lower() in ("n", "no"): - print("Keeping certificate active for {}".format(gi.host)) - else: - print("Deactivating certificate.") - self._deactivate_client_cert() - - # Suggest reactivating previous certs - if not self.client_certs["active"] and gi.host in self.client_certs: - print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host)) - resp = input("Y/N? ") - if resp.strip().lower() in ("y", "yes"): - self._activate_client_cert(*self.client_certs[gi.host]) - else: - print("Remaining unidentified.") - self.client_certs.pop(gi.host) - - # Is this a local file? - if gi.local: - address, f = None, open(gi.path, "rb") - else: - address, f = self._send_request(gi) - - # Spec dictates should not exceed 1024 bytes, - # so maximum valid header length is 1027 bytes. - header = f.readline(1027) - header = urllib.parse.unquote(header.decode("UTF-8")) - if not header or header[-1] != '\n': - raise RuntimeError("Received invalid header from server!") - header = header.strip() - self._debug("Response header: %s." % header) - # Validate header - status, meta = header.split(maxsplit=1) - if len(meta) > 1024 or len(status) != 2 or not status.isnumeric(): - f.close() - raise RuntimeError("Received invalid header from server!") - - # Update redirect loop/maze escaping state - if not status.startswith("3"): - self.previous_redirectors = set() - - # Handle non-SUCCESS headers, which don't have a response body - # Inputs - if status.startswith("1"): - if self.sync_only: - return None - else: - print(meta) - if status == "11": - user_input = getpass.getpass("> ") - else: - user_input = input("> ") - return self._fetch_over_network(gi.query(user_input)) - - # Redirects - elif status.startswith("3"): - new_gi = GeminiItem(gi.absolutise_url(meta)) - if new_gi.url == gi.url: - raise RuntimeError("URL redirects to itself!") - elif new_gi.url in self.previous_redirectors: - raise RuntimeError("Caught in redirect loop!") - elif len(self.previous_redirectors) == _MAX_REDIRECTS: - raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS) - elif self.sync_only: - follow = self.automatic_choice - # Never follow cross-domain redirects without asking - elif new_gi.host.encode("idna") != gi.host.encode("idna"): - follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) - # Never follow cross-protocol redirects without asking - elif new_gi.scheme != gi.scheme: - follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) - # Don't follow *any* redirect without asking if auto-follow is off - elif not self.options["auto_follow_redirects"]: - follow = input("Follow redirect to %s? (y/n) " % new_gi.url) - # Otherwise, follow away - else: - follow = "yes" - if follow.strip().lower() not in ("y", "yes"): - raise UserAbortException() - self._debug("Following redirect to %s." % new_gi.url) - self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors)) - self.previous_redirectors.add(gi.url) - if status == "31": - # Permanent redirect - self.permanent_redirects[gi.url] = new_gi.url - return self._fetch_over_network(new_gi) - - # Errors - elif status.startswith("4") or status.startswith("5"): - raise RuntimeError(meta) - - # Client cert - elif status.startswith("6"): - self._handle_cert_request(meta) - return self._fetch_over_network(gi) - - # Invalid status - elif not status.startswith("2"): - raise RuntimeError("Server returned undefined status code %s!" % status) - - # If we're here, this must be a success and there's a response body - assert status.startswith("2") - - mime = meta - # Read the response body over the network - fbody = f.read() - # DEFAULT GEMINI MIME - if mime == "": - mime = "text/gemini; charset=utf-8" - shortmime, mime_options = parse_mime(mime) - if "charset" in mime_options: - try: - codecs.lookup(mime_options["charset"]) - except LookupError: - #raise RuntimeError("Header declared unknown encoding %s" % mime_options) - #If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header - mime_options["charset"] = "UTF-8" - if shortmime.startswith("text/"): - #Get the charset and default to UTF-8 in none - encoding = mime_options.get("charset", "UTF-8") - try: - body = fbody.decode(encoding) - except UnicodeError: - raise RuntimeError("Could not decode response body using %s\ - encoding declared in header!" % encoding) - else: - body = fbody - gi.write_body(body,mime) - return gi - - def _send_request(self, gi): - """Send a selector to a given host and port. - Returns the resolved address and binary file with the reply.""" - host, port = gi.host, gi.port - host = host.encode("idna").decode() - # Do DNS resolution - addresses = self._get_addresses(host, port) - - # Prepare TLS context - protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2 - context = ssl.SSLContext(protocol) - # Use CAs or TOFU - if self.options["tls_mode"] == "ca": - context.verify_mode = ssl.CERT_REQUIRED - context.check_hostname = True - context.load_default_certs() - else: - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - # Impose minimum TLS version - ## In 3.7 and above, this is easy... - if sys.version_info.minor >= 7: - context.minimum_version = ssl.TLSVersion.TLSv1_2 - ## Otherwise, it seems very hard... - ## The below is less strict than it ought to be, but trying to disable - ## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures - ## with recent versions of OpenSSL. What a mess... - else: - context.options |= ssl.OP_NO_SSLv3 - context.options |= ssl.OP_NO_SSLv2 - # Try to enforce sensible ciphers - try: - context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH") - except ssl.SSLError: - # Rely on the server to only support sensible things, I guess... - pass - # Load client certificate if needed - if self.client_certs["active"]: - certfile, keyfile = self.client_certs["active"] - context.load_cert_chain(certfile, keyfile) - - # Connect to remote host by any address possible - err = None - for address in addresses: - self._debug("Connecting to: " + str(address[4])) - s = socket.socket(address[0], address[1]) - if self.sync_only: - timeout = self.options["short_timeout"] - else: - timeout = self.options["timeout"] - s.settimeout(timeout) - s = context.wrap_socket(s, server_hostname = host) - try: - s.connect(address[4]) - break - except OSError as e: - err = e - else: - # If we couldn't connect to *any* of the addresses, just - # bubble up the exception from the last attempt and deny - # knowledge of earlier failures. - raise err - - if sys.version_info.minor >=5: - self._debug("Established {} connection.".format(s.version())) - self._debug("Cipher is: {}.".format(s.cipher())) - - # Do TOFU - if self.options["tls_mode"] != "ca": - cert = s.getpeercert(binary_form=True) - self._validate_cert(address[4][0], host, cert) - - # Remember that we showed the current cert to this domain... - if self.client_certs["active"]: - self.active_cert_domains.append(host) - self.client_certs[host] = self.client_certs["active"] - - # Send request and wrap response in a file descriptor - url = urllib.parse.urlparse(gi.url) - new_netloc = host - if port != 1965: - new_netloc += ":" + str(port) - url = urllib.parse.urlunparse(url._replace(netloc=new_netloc)) - self._debug("Sending %s" % url) - s.sendall((url + CRLF).encode("UTF-8")) - mf= s.makefile(mode = "rb") - return address, mf - - def _get_addresses(self, host, port): - # DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled - if ":" in host: - # This is likely a literal IPv6 address, so we can *only* ask for - # IPv6 addresses or getaddrinfo will complain - family_mask = socket.AF_INET6 - elif socket.has_ipv6 and self.options["ipv6"]: - # Accept either IPv4 or IPv6 addresses - family_mask = 0 - else: - # IPv4 only - family_mask = socket.AF_INET - addresses = socket.getaddrinfo(host, port, family=family_mask, - type=socket.SOCK_STREAM) - # Sort addresses so IPv6 ones come first - addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True) - - return addresses def _handle_cert_request(self, meta):