more certificate management in netcache

This commit is contained in:
Lionel Dricot 2023-07-28 18:31:04 +02:00
parent dade6a3bda
commit 3051f60253
2 changed files with 153 additions and 154 deletions

View File

@ -9,6 +9,8 @@ import getpass
import socket
import ssl
import glob
import datetime
import hashlib
from ssl import CertificateError
try:
import chardet
@ -16,6 +18,14 @@ try:
except ModuleNotFoundError:
_HAS_CHARDET = False
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except ModuleNotFoundError:
_HAS_CRYPTOGRAPHY = False
_home = os.path.expanduser('~')
cache_home = os.environ.get('XDG_CACHE_HOME') or\
os.path.join(_home,'.cache')
@ -557,6 +567,137 @@ def _handle_cert_request(meta):
print("Giving up.")
raise UserAbortException()
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if accept_bad_ssl:
if c.not_valid_before >= now:
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(str(name), host)
break
except CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
db_conn = sqlite3.connect(db_path)
db_cur = db_conn.cursor()
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
# Have we been here before?
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
db_conn.commit()
break
else:
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
if automatic_choice:
choice = automatic_choice
else:
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
db_conn.commit()
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
@ -667,7 +808,8 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs):
cert = s.getpeercert(binary_form=True)
# TODO: another cert handling to refactor
# Remember that we showed the current cert to this domain...
# self._validate_cert(address[4][0], host, cert)
#TODO : accept badssl and automatic choice
validate_cert(address[4][0], host, cert,automatic_choice="y")
# if self.client_certs["active"]:
# self.active_cert_domains.append(host)
# self.client_certs[host] = self.client_certs["active"]

View File

@ -26,8 +26,6 @@ import argparse
import cmd
import datetime
import fnmatch
import glob
import hashlib
import io
import os
import os.path
@ -57,13 +55,6 @@ except ModuleNotFoundError:
_HAS_XSEL = shutil.which('xsel')
_HAS_XDGOPEN = shutil.which('xdg-open')
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except ModuleNotFoundError:
_HAS_CRYPTOGRAPHY = False
try:
import requests
@ -526,7 +517,6 @@ class GeminiClient(cmd.Cmd):
"cache_hits": 0,
}
self._connect_to_tofu_db()
def complete_list(self,text,line,begidx,endidx):
allowed = []
@ -567,18 +557,15 @@ class GeminiClient(cmd.Cmd):
def complete_move(self,text,line,begidx,endidx):
return self.complete_add(text,line,begidx,endidx)
def _connect_to_tofu_db(self):
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
self.db_conn = sqlite3.connect(db_path)
self.db_cur = self.db_conn.cursor()
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
#TODO: go_to_gi should take an URL as parameter, not gi
#it should also only be called to really go, not for all links
def _go_to_url(self,url,**kwargs):
if "name" in kwargs.keys():
gi= GeminiItem(url,name=kwargs["name"])
else:
gi= GeminiItem(url)
return _go_to_url(GeminiItem(url),kwargs)
def _go_to_gi(self, gi, update_hist=True, check_cache=True, handle=True,\
mode=None,limit_size=False):
"""This method might be considered "the heart of Offpunk".
@ -612,8 +599,7 @@ class GeminiClient(cmd.Cmd):
mode = gi.last_mode
# Obey permanent redirects
if url in self.permanent_redirects:
new_gi = GeminiItem(self.permanent_redirects[url], name=gi.name)
self._go_to_gi(new_gi,mode=mode)
self._go_to_url(self.permanent_redirects[url],name=gi.name,mode=mode)
return
# check if local file exists.
@ -677,131 +663,6 @@ class GeminiClient(cmd.Cmd):
def _validate_cert(self, address, host, cert):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if not self.options["accept_bad_ssl_certificates"]:
if c.not_valid_before >= now:
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(str(name), host)
break
except CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
# Have we been here before?
self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = self.db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
self._debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
self.db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
self.db_conn.commit()
break
else:
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
self._debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
if self.sync_only:
choice = self.automatic_choice
else:
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
self._debug("TOFU: Blindly trusting first ever certificate for this host!")
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
def _get_handler_cmd(self, mimetype):
# Now look for a handler for this mimetype
@ -1326,7 +1187,7 @@ Marks are temporary until shutdown (not saved to disk)."""
output = "Offpunk " + __version__ + "\n"
output += "===========\n"
output += "Highly recommended:\n"
output += " - python-cryptography : " + has(_HAS_CRYPTOGRAPHY)
output += " - python-cryptography : " + has(netcache._HAS_CRYPTOGRAPHY)
output += " - xdg-open : " + has(_HAS_XDGOPEN)
output += "\nWeb browsing:\n"
output += " - python-requests : " + has(_DO_HTTP)
@ -2214,10 +2075,6 @@ Argument : duration of cache validity (in seconds)."""
def unlink(filename):
if filename and os.path.exists(filename):
os.unlink(filename)
# Close TOFU DB
self.db_conn.commit()
self.db_conn.close()
# Clean up after ourself
for cert in self.transient_certs_created:
for ext in (".crt", ".key"):
@ -2303,7 +2160,7 @@ def main():
# Act on args
if args.tls_cert:
# If tls_key is None, python will attempt to load the key from tls_cert.
gc._activate_client_cert(args.tls_cert, args.tls_key)
netcache._activate_client_cert(args.tls_cert, args.tls_key)
if args.bookmarks:
torun_queue.append("bookmarks")
elif args.url: