Factor out certificate validation into its own class/file.

This commit is contained in:
Solderpunk 2023-11-12 15:36:54 +01:00
parent 79a6187eac
commit 053dcb7254
2 changed files with 164 additions and 151 deletions

155
av98.py
View File

@ -18,11 +18,9 @@ import cmd
import cgi
import codecs
import collections
import datetime
import fnmatch
import getpass
import glob
import hashlib
import io
import logging
import mimetypes
@ -32,9 +30,7 @@ import random
import shlex
import shutil
import socket
import sqlite3
import ssl
from ssl import CertificateError
import subprocess
import sys
import tempfile
@ -48,16 +44,8 @@ try:
except ModuleNotFoundError:
import textwrap
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except ModuleNotFoundError:
_HAS_CRYPTOGRAPHY = False
from cache import Cache
from tofu import TofuStore
_VERSION = "1.0.2dev"
_MAX_REDIRECTS = 5
@ -303,20 +291,9 @@ class GeminiClient(cmd.Cmd):
"cache_hits": 0,
}
self._connect_to_tofu_db()
self.tofu_store = TofuStore(self.config_dir)
self.cache = Cache()
def _connect_to_tofu_db(self):
db_path = os.path.join(self.config_dir, "tofu.db")
self.db_conn = sqlite3.connect(db_path)
self.db_cur = self.db_conn.cursor()
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
def _go_to_gi(self, gi, update_hist=True, check_cache=True, handle=True):
"""This method might be considered "the heart of AV-98".
Everything involved in fetching a gemini resource happens here:
@ -632,7 +609,7 @@ you'll be able to transparently follow links to Gopherspace!""")
# Do TOFU
if self.options["tls_mode"] != "ca":
cert = s.getpeercert(binary_form=True)
self._validate_cert(address[4][0], host, cert)
self.tofu_store.validate_cert(address[4][0], host, cert)
# Remember that we showed the current cert to this domain...
if self.client_certs["active"]:
@ -705,129 +682,6 @@ you'll be able to transparently follow links to Gopherspace!""")
print("Giving up.")
raise UserAbortException()
def _validate_cert(self, address, host, cert):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if c.not_valid_before >= now:
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(name, host)
break
except CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
# Have we been here before?
self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = self.db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
ui_out.debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
self.db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
self.db_conn.commit()
break
else:
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
certdir = os.path.join(self.config_dir, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
ui_out.debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
ui_out.debug("TOFU: Blindly trusting first ever certificate for this host!")
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
certdir = os.path.join(self.config_dir, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
def _get_handler_cmd(self, mimetype):
# Now look for a handler for this mimetype
# Consider exact matches before wildcard matches
@ -1530,8 +1384,7 @@ current gemini browsing session."""
def do_quit(self, *args):
"""Exit AV-98."""
# Close TOFU DB
self.db_conn.commit()
self.db_conn.close()
self.tofu_store.close()
# Clean up after ourself
self.cache.empty()
if self.tmp_filename and os.path.exists(self.tmp_filename):

160
tofu.py Normal file
View File

@ -0,0 +1,160 @@
import datetime
import hashlib
import logging
import os
import os.path
import sqlite3
import ssl
import time
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except ModuleNotFoundError:
_HAS_CRYPTOGRAPHY = False
ui_out = logging.getLogger("av98_logger")
class TofuStore:
def __init__(self, config_dir):
self.config_dir = config_dir
db_path = os.path.join(self.config_dir, "tofu.db")
self.db_conn = sqlite3.connect(db_path)
self.db_cur = self.db_conn.cursor()
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
def close(self):
self.db_conn.commit()
self.db_conn.close()
def validate_cert(self, address, host, cert):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if c.not_valid_before >= now:
raise ssl.CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise ssl.CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(name, host)
break
except ssl.CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise ssl.CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
# Have we been here before?
self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = self.db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
ui_out.debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
self.db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
self.db_conn.commit()
break
else:
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
certdir = os.path.join(self.config_dir, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
ui_out.debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
ui_out.debug("TOFU: Blindly trusting first ever certificate for this host!")
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
certdir = os.path.join(self.config_dir, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)