diff --git a/netcache.py b/netcache.py index 10dcf74..3c2c6f2 100755 --- a/netcache.py +++ b/netcache.py @@ -9,6 +9,8 @@ import getpass import socket import ssl import glob +import datetime +import hashlib from ssl import CertificateError try: import chardet @@ -16,6 +18,14 @@ try: except ModuleNotFoundError: _HAS_CHARDET = False +try: + from cryptography import x509 + from cryptography.hazmat.backends import default_backend + _HAS_CRYPTOGRAPHY = True + _BACKEND = default_backend() +except ModuleNotFoundError: + _HAS_CRYPTOGRAPHY = False + _home = os.path.expanduser('~') cache_home = os.environ.get('XDG_CACHE_HOME') or\ os.path.join(_home,'.cache') @@ -557,6 +567,137 @@ def _handle_cert_request(meta): print("Giving up.") raise UserAbortException() + +def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None): + """ + Validate a TLS certificate in TOFU mode. + + If the cryptography module is installed: + - Check the certificate Common Name or SAN matches `host` + - Check the certificate's not valid before date is in the past + - Check the certificate's not valid after date is in the future + + Whether the cryptography module is installed or not, check the + certificate's fingerprint against the TOFU database to see if we've + previously encountered a different certificate for this IP address and + hostname. + """ + now = datetime.datetime.utcnow() + if _HAS_CRYPTOGRAPHY: + # Using the cryptography module we can get detailed access + # to the properties of even self-signed certs, unlike in + # the standard ssl library... + c = x509.load_der_x509_certificate(cert, _BACKEND) + # Check certificate validity dates + if accept_bad_ssl: + if c.not_valid_before >= now: + raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before)) + elif c.not_valid_after <= now: + raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after)) + + # Check certificate hostnames + names = [] + common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) + if common_name: + names.append(common_name[0].value) + try: + names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value]) + except x509.ExtensionNotFound: + pass + names = set(names) + for name in names: + try: + ssl._dnsname_match(str(name), host) + break + except CertificateError: + continue + else: + # If we didn't break out, none of the names were valid + raise CertificateError("Hostname does not match certificate common name or any alternative names.") + + sha = hashlib.sha256() + sha.update(cert) + fingerprint = sha.hexdigest() + + db_path = os.path.join(_CONFIG_DIR, "tofu.db") + db_conn = sqlite3.connect(db_path) + db_cur = db_conn.cursor() + + db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache + (hostname text, address text, fingerprint text, + first_seen date, last_seen date, count integer)""") + # Have we been here before? + db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count + FROM cert_cache + WHERE hostname=? AND address=?""", (host, address)) + cached_certs = db_cur.fetchall() + + # If so, check for a match + if cached_certs: + max_count = 0 + most_frequent_cert = None + for cached_fingerprint, first, last, count in cached_certs: + if count > max_count: + max_count = count + most_frequent_cert = cached_fingerprint + if fingerprint == cached_fingerprint: + # Matched! + db_cur.execute("""UPDATE cert_cache + SET last_seen=?, count=? + WHERE hostname=? AND address=? AND fingerprint=?""", + (now, count+1, host, address, fingerprint)) + db_conn.commit() + break + else: + certdir = os.path.join(_CONFIG_DIR, "cert_cache") + with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp: + previous_cert = fp.read() + if _HAS_CRYPTOGRAPHY: + # Load the most frequently seen certificate to see if it has + # expired + previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND) + previous_ttl = previous_cert.not_valid_after - now + print(previous_ttl) + + print("****************************************") + print("[SECURITY WARNING] Unrecognised certificate!") + print("The certificate presented for {} ({}) has never been seen before.".format(host, address)) + print("This MIGHT be a Man-in-the-Middle attack.") + print("A different certificate has previously been seen {} times.".format(max_count)) + if _HAS_CRYPTOGRAPHY: + if previous_ttl < datetime.timedelta(): + print("That certificate has expired, which reduces suspicion somewhat.") + else: + print("That certificate is still valid for: {}".format(previous_ttl)) + print("****************************************") + print("Attempt to verify the new certificate fingerprint out-of-band:") + print(fingerprint) + if automatic_choice: + choice = automatic_choice + else: + choice = input("Accept this new certificate? Y/N ").strip().lower() + if choice in ("y", "yes"): + self.db_cur.execute("""INSERT INTO cert_cache + VALUES (?, ?, ?, ?, ?, ?)""", + (host, address, fingerprint, now, now, 1)) + self.db_conn.commit() + with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp: + fp.write(cert) + else: + raise Exception("TOFU Failure!") + + # If not, cache this cert + else: + db_cur.execute("""INSERT INTO cert_cache + VALUES (?, ?, ?, ?, ?, ?)""", + (host, address, fingerprint, now, now, 1)) + db_conn.commit() + certdir = os.path.join(_CONFIG_DIR, "cert_cache") + if not os.path.exists(certdir): + os.makedirs(certdir) + with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp: + fp.write(cert) + def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs): cache = None url_parts = urllib.parse.urlparse(url) @@ -667,7 +808,8 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs): cert = s.getpeercert(binary_form=True) # TODO: another cert handling to refactor # Remember that we showed the current cert to this domain... -# self._validate_cert(address[4][0], host, cert) + #TODO : accept badssl and automatic choice + validate_cert(address[4][0], host, cert,automatic_choice="y") # if self.client_certs["active"]: # self.active_cert_domains.append(host) # self.client_certs[host] = self.client_certs["active"] diff --git a/offpunk.py b/offpunk.py index 7831513..67143f4 100755 --- a/offpunk.py +++ b/offpunk.py @@ -26,8 +26,6 @@ import argparse import cmd import datetime import fnmatch -import glob -import hashlib import io import os import os.path @@ -57,13 +55,6 @@ except ModuleNotFoundError: _HAS_XSEL = shutil.which('xsel') _HAS_XDGOPEN = shutil.which('xdg-open') -try: - from cryptography import x509 - from cryptography.hazmat.backends import default_backend - _HAS_CRYPTOGRAPHY = True - _BACKEND = default_backend() -except ModuleNotFoundError: - _HAS_CRYPTOGRAPHY = False try: import requests @@ -526,7 +517,6 @@ class GeminiClient(cmd.Cmd): "cache_hits": 0, } - self._connect_to_tofu_db() def complete_list(self,text,line,begidx,endidx): allowed = [] @@ -567,18 +557,15 @@ class GeminiClient(cmd.Cmd): def complete_move(self,text,line,begidx,endidx): return self.complete_add(text,line,begidx,endidx) - def _connect_to_tofu_db(self): - - db_path = os.path.join(_CONFIG_DIR, "tofu.db") - self.db_conn = sqlite3.connect(db_path) - self.db_cur = self.db_conn.cursor() - - self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache - (hostname text, address text, fingerprint text, - first_seen date, last_seen date, count integer)""") - #TODO: go_to_gi should take an URL as parameter, not gi #it should also only be called to really go, not for all links + def _go_to_url(self,url,**kwargs): + if "name" in kwargs.keys(): + gi= GeminiItem(url,name=kwargs["name"]) + else: + gi= GeminiItem(url) + return _go_to_url(GeminiItem(url),kwargs) + def _go_to_gi(self, gi, update_hist=True, check_cache=True, handle=True,\ mode=None,limit_size=False): """This method might be considered "the heart of Offpunk". @@ -612,8 +599,7 @@ class GeminiClient(cmd.Cmd): mode = gi.last_mode # Obey permanent redirects if url in self.permanent_redirects: - new_gi = GeminiItem(self.permanent_redirects[url], name=gi.name) - self._go_to_gi(new_gi,mode=mode) + self._go_to_url(self.permanent_redirects[url],name=gi.name,mode=mode) return # check if local file exists. @@ -677,131 +663,6 @@ class GeminiClient(cmd.Cmd): - def _validate_cert(self, address, host, cert): - """ - Validate a TLS certificate in TOFU mode. - - If the cryptography module is installed: - - Check the certificate Common Name or SAN matches `host` - - Check the certificate's not valid before date is in the past - - Check the certificate's not valid after date is in the future - - Whether the cryptography module is installed or not, check the - certificate's fingerprint against the TOFU database to see if we've - previously encountered a different certificate for this IP address and - hostname. - """ - now = datetime.datetime.utcnow() - if _HAS_CRYPTOGRAPHY: - # Using the cryptography module we can get detailed access - # to the properties of even self-signed certs, unlike in - # the standard ssl library... - c = x509.load_der_x509_certificate(cert, _BACKEND) - # Check certificate validity dates - if not self.options["accept_bad_ssl_certificates"]: - if c.not_valid_before >= now: - raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before)) - elif c.not_valid_after <= now: - raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after)) - - # Check certificate hostnames - names = [] - common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) - if common_name: - names.append(common_name[0].value) - try: - names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value]) - except x509.ExtensionNotFound: - pass - names = set(names) - for name in names: - try: - ssl._dnsname_match(str(name), host) - break - except CertificateError: - continue - else: - # If we didn't break out, none of the names were valid - raise CertificateError("Hostname does not match certificate common name or any alternative names.") - - sha = hashlib.sha256() - sha.update(cert) - fingerprint = sha.hexdigest() - - # Have we been here before? - self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count - FROM cert_cache - WHERE hostname=? AND address=?""", (host, address)) - cached_certs = self.db_cur.fetchall() - - # If so, check for a match - if cached_certs: - max_count = 0 - most_frequent_cert = None - for cached_fingerprint, first, last, count in cached_certs: - if count > max_count: - max_count = count - most_frequent_cert = cached_fingerprint - if fingerprint == cached_fingerprint: - # Matched! - self._debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint)) - self.db_cur.execute("""UPDATE cert_cache - SET last_seen=?, count=? - WHERE hostname=? AND address=? AND fingerprint=?""", - (now, count+1, host, address, fingerprint)) - self.db_conn.commit() - break - else: - certdir = os.path.join(_CONFIG_DIR, "cert_cache") - with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp: - previous_cert = fp.read() - if _HAS_CRYPTOGRAPHY: - # Load the most frequently seen certificate to see if it has - # expired - previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND) - previous_ttl = previous_cert.not_valid_after - now - print(previous_ttl) - - self._debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint)) - print("****************************************") - print("[SECURITY WARNING] Unrecognised certificate!") - print("The certificate presented for {} ({}) has never been seen before.".format(host, address)) - print("This MIGHT be a Man-in-the-Middle attack.") - print("A different certificate has previously been seen {} times.".format(max_count)) - if _HAS_CRYPTOGRAPHY: - if previous_ttl < datetime.timedelta(): - print("That certificate has expired, which reduces suspicion somewhat.") - else: - print("That certificate is still valid for: {}".format(previous_ttl)) - print("****************************************") - print("Attempt to verify the new certificate fingerprint out-of-band:") - print(fingerprint) - if self.sync_only: - choice = self.automatic_choice - else: - choice = input("Accept this new certificate? Y/N ").strip().lower() - if choice in ("y", "yes"): - self.db_cur.execute("""INSERT INTO cert_cache - VALUES (?, ?, ?, ?, ?, ?)""", - (host, address, fingerprint, now, now, 1)) - self.db_conn.commit() - with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp: - fp.write(cert) - else: - raise Exception("TOFU Failure!") - - # If not, cache this cert - else: - self._debug("TOFU: Blindly trusting first ever certificate for this host!") - self.db_cur.execute("""INSERT INTO cert_cache - VALUES (?, ?, ?, ?, ?, ?)""", - (host, address, fingerprint, now, now, 1)) - self.db_conn.commit() - certdir = os.path.join(_CONFIG_DIR, "cert_cache") - if not os.path.exists(certdir): - os.makedirs(certdir) - with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp: - fp.write(cert) def _get_handler_cmd(self, mimetype): # Now look for a handler for this mimetype @@ -1326,7 +1187,7 @@ Marks are temporary until shutdown (not saved to disk).""" output = "Offpunk " + __version__ + "\n" output += "===========\n" output += "Highly recommended:\n" - output += " - python-cryptography : " + has(_HAS_CRYPTOGRAPHY) + output += " - python-cryptography : " + has(netcache._HAS_CRYPTOGRAPHY) output += " - xdg-open : " + has(_HAS_XDGOPEN) output += "\nWeb browsing:\n" output += " - python-requests : " + has(_DO_HTTP) @@ -2214,10 +2075,6 @@ Argument : duration of cache validity (in seconds).""" def unlink(filename): if filename and os.path.exists(filename): os.unlink(filename) - # Close TOFU DB - self.db_conn.commit() - self.db_conn.close() - # Clean up after ourself for cert in self.transient_certs_created: for ext in (".crt", ".key"): @@ -2303,7 +2160,7 @@ def main(): # Act on args if args.tls_cert: # If tls_key is None, python will attempt to load the key from tls_cert. - gc._activate_client_cert(args.tls_cert, args.tls_key) + netcache._activate_client_cert(args.tls_cert, args.tls_key) if args.bookmarks: torun_queue.append("bookmarks") elif args.url: