Factor out client certificate management into its own class/file.

This commit is contained in:
Solderpunk 2023-11-18 14:22:32 +01:00
parent 607223c25a
commit de7e5dc254
2 changed files with 286 additions and 252 deletions

305
av98.py
View File

@ -19,7 +19,6 @@ import cgi
import codecs
import fnmatch
import getpass
import glob
import logging
import mimetypes
import os
@ -34,7 +33,6 @@ import tempfile
import time
import traceback
import urllib.parse
import uuid
import webbrowser
try:
@ -44,6 +42,8 @@ except ModuleNotFoundError:
from cache import Cache
from tofu import TofuStore
from clientcerts import ClientCertificateManager
_VERSION = "1.0.2dev"
_MAX_REDIRECTS = 5
@ -258,13 +258,6 @@ class GeminiClient(cmd.Cmd):
self.visited_hosts = set()
self.waypoints = []
self.client_certs = {
"active": None
}
self.active_cert_domains = []
self.active_is_transient = False
self.transient_certs_created = []
self.options = {
"debug" : False,
"ipv6" : True,
@ -294,6 +287,7 @@ class GeminiClient(cmd.Cmd):
}
self.tofu_store = TofuStore(self.config_dir)
self.client_cert_manager = ClientCertificateManager(self.config_dir)
self.cache = Cache()
ui_out.debug("Raw buffer: ", self.raw_file_buffer)
@ -459,7 +453,13 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
# Client cert
elif status.startswith("6"):
self._handle_cert_request(meta, status, gi.host)
if self.restricted:
print("The server is requesting a client certificate.")
print("These are not supported in restricted mode, sorry.")
raise UserAbortException()
if not self.client_cert_manager.handle_cert_request(meta, status, gi.host):
raise UserAbortException()
continue
# Invalid status
@ -509,37 +509,6 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
host, port = self.options["http_proxy"].rsplit(":",1)
ui_out.debug("Using http proxy: " + self.options["http_proxy"])
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.active_cert_domains and gi.host not in self.active_cert_domains:
if self.active_is_transient:
print("Permanently delete currently active transient certificate?")
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
raise UserAbortException()
else:
print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
resp = input("Y/N? ")
if resp.strip().lower() in ("n", "no"):
print("Keeping certificate active for {}".format(gi.host))
else:
print("Deactivating certificate.")
self._deactivate_client_cert()
# Suggest reactivating previous certs
if not self.client_certs["active"] and gi.host in self.client_certs:
print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host))
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
self._activate_client_cert(*self.client_certs[gi.host])
else:
print("Remaining unidentified.")
self.client_certs.pop(gi.host)
# Do DNS resolution
try:
addresses = self._get_addresses(host, port)
@ -549,44 +518,9 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
raise err
# Prepare TLS context
def _newest_supported_protocol():
if sys.version_info >= (3, 10):
return ssl.PROTOCOL_TLS_CLIENT
elif sys.version_info >= (3, 6):
return ssl.PROTOCOL_TLS
else:
return ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(_newest_supported_protocol())
# Use CAs or TOFU
if self.options["tls_mode"] == "ca":
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_default_certs()
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
# Load client certificate if needed
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
context = self._prepare_SSL_context(self.options["tls_mode"])
if not self.client_cert_manager.associate_client_cert(context, gi):
raise UserAbortException()
# Connect to remote host by any address possible
err = None
@ -622,11 +556,6 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
cert = s.getpeercert(binary_form=True)
self.tofu_store.validate_cert(address[4][0], host, cert)
# Remember that we showed the current cert to this domain...
if self.client_certs["active"]:
self.active_cert_domains.append(gi.host)
self.client_certs[gi.host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
ui_out.debug("Sending %s<CRLF>" % gi.url)
s.sendall((gi.url + CRLF).encode("UTF-8"))
@ -708,47 +637,45 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return addresses
def _handle_cert_request(self, meta, status, host):
# Don't do client cert stuff in restricted mode, as in principle
# it could be used to fill up the disk by creating a whole lot of
# certificates
if self.restricted:
print("The server is requesting a client certificate.")
print("These are not supported in restricted mode, sorry.")
raise UserAbortException()
print("SERVER SAYS: ", meta)
# Present different messages for different 6x statuses, but
# handle them the same.
if status in ("64", "65"):
print("The server rejected your certificate because it is either expired or not yet valid.")
elif status == "63":
print("The server did not accept your certificate.")
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
def _prepare_SSL_context(self, cert_validation_mode="tofu"):
# Flail against version churn
if sys.version_info >= (3, 10):
_newest_supported_protocol = ssl.PROTOCOL_TLS_CLIENT
elif sys.version_info >= (3, 6):
_newest_supported_protocol = ssl.PROTOCOL_TLS
else:
print("The site {} is requesting a client certificate.".format(host))
print("This will allow the site to recognise you across requests.")
_newest_supported_protocol = ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(_newest_supported_protocol)
# Give the user choices
print("What do you want to do?")
print("1. Give up.")
print("2. Generate a new transient certificate.")
print("3. Generate a new persistent certificate.")
print("4. Load a previously generated certificate.")
print("5. Load a certificate from an external file.")
choice = input("> ").strip()
if choice == "2":
self._generate_transient_cert_cert()
elif choice == "3":
self._generate_persistent_client_cert()
elif choice == "4":
self._choose_client_cert()
elif choice == "5":
self._load_client_cert()
# Use CAs or TOFU
if cert_validation_mode == "ca":
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_default_certs()
else:
print("Giving up.")
raise UserAbortException()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
return context
def _get_handler_cmd(self, mimetype):
# Now look for a handler for this mimetype
@ -848,110 +775,6 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
self.log["ipv6_requests"] += 1
self.log["ipv6_bytes_recvd"] += size
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
format.
"""
print("Loading client certificate file, in PEM format (blank line to cancel)")
certfile = input("Certfile path: ").strip()
if not certfile:
print("Aborting.")
return
certfile = os.path.expanduser(certfile)
if not os.path.isfile(certfile):
print("Certificate file {} does not exist.".format(certfile))
return
print("Loading private key file, in PEM format (blank line to cancel)")
keyfile = input("Keyfile path: ").strip()
if not keyfile:
print("Aborting.")
return
keyfile = os.path.expanduser(keyfile)
if not os.path.isfile(keyfile):
print("Private key file {} does not exist.".format(keyfile))
return
self._activate_client_cert(certfile, keyfile)
def _generate_transient_cert_cert(self):
"""
Use `openssl` command to generate a new transient client certificate
with 24 hours of validity.
"""
certdir = os.path.join(self.config_dir, "transient_certs")
name = str(uuid.uuid4())
self._generate_client_cert(certdir, name, transient=True)
self.active_is_transient = True
self.transient_certs_created.append(name)
def _generate_persistent_client_cert(self):
"""
Interactively use `openssl` command to generate a new persistent client
certificate with one year of validity.
"""
certdir = os.path.join(self.config_dir, "client_certs")
print("What do you want to name this new certificate?")
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
name = input("> ")
if not name.strip():
print("Aborting.")
return
self._generate_client_cert(certdir, name)
def _generate_client_cert(self, certdir, basename, transient=False):
"""
Use `openssl` binary to generate a client certificate (which may be
transient or persistent) and save the certificate and private key to the
specified directory with the specified basename.
"""
if not os.path.exists(certdir):
os.makedirs(certdir)
certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, basename+".key")
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
if transient:
cmd += " -subj '/CN={}'".format(basename)
os.system(cmd)
self._activate_client_cert(certfile, keyfile)
def _choose_client_cert(self):
"""
Interactively select a previously generated client certificate and
activate it.
"""
certdir = os.path.join(self.config_dir, "client_certs")
certs = glob.glob(os.path.join(certdir, "*.crt"))
if len(certs) == 0:
print("There are no previously generated certificates.")
return
certdir = {}
for n, cert in enumerate(certs):
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
choice = input("> ").strip()
if choice in certdir:
certfile, keyfile = certdir[choice]
self._activate_client_cert(certfile, keyfile)
else:
print("What?")
def _activate_client_cert(self, certfile, keyfile):
self.client_certs["active"] = (certfile, keyfile)
self.active_cert_domains = []
self.prompt = self.cert_prompt
ui_out.debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self):
if self.active_is_transient:
for filename in self.client_certs["active"]:
os.remove(filename)
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self.client_certs["active"] = None
self.active_cert_domains = []
self.prompt = self.no_cert_prompt
self.active_is_transient = False
# Cmd implementation follows
def default(self, line):
@ -1047,25 +870,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
def do_cert(self, line):
"""Manage client certificates"""
print("Managing client certificates")
if self.client_certs["active"]:
print("Active certificate: {}".format(self.client_certs["active"][0]))
print("1. Deactivate client certificate.")
print("2. Generate new certificate.")
print("3. Load previously generated certificate.")
print("4. Load externally created client certificate from file.")
print("Enter blank line to exit certificate manager.")
choice = input("> ").strip()
if choice == "1":
print("Deactivating client certificate.")
self._deactivate_client_cert()
elif choice == "2":
self._generate_persistent_client_cert()
elif choice == "3":
self._choose_client_cert()
elif choice == "4":
self._load_client_cert()
else:
print("Aborting.")
self.client_cert_manager.manage()
@restricted
def do_handler(self, line):
@ -1460,12 +1265,8 @@ current gemini browsing session."""
# Clean up after ourself
os.unlink(self.raw_file_buffer)
os.unlink(self.rendered_file_buffer)
for cert in self.transient_certs_created:
for ext in (".crt", ".key"):
certfile = os.path.join(self.config_dir, "transient_certs", cert+ext)
if os.path.exists(certfile):
os.remove(certfile)
self.client_cert_manager.cleanup()
# Say goodbye
print()
print("Thank you for flying AV-98!")
sys.exit()
@ -1522,7 +1323,7 @@ def main():
# Act on args
if args.tls_cert:
# If tls_key is None, python will attempt to load the key from tls_cert.
gc._activate_client_cert(args.tls_cert, args.tls_key)
gc.client_cert_manager._activate_client_cert(args.tls_cert, args.tls_key)
if args.bookmarks:
gc.cmdqueue.append("bookmarks")
elif args.url:

233
clientcerts.py Normal file
View File

@ -0,0 +1,233 @@
import glob
import logging
import os
import os.path
import uuid
ui_out = logging.getLogger("av98_logger")
class ClientCertificateManager:
def __init__(self, config_dir):
self.config_dir = config_dir
self.client_certs = {
"active": None
}
self.active_cert_domains = []
self.active_is_transient = False
self.transient_certs_created = []
def cleanup(self):
for cert in self.transient_certs_created:
for ext in (".crt", ".key"):
certfile = os.path.join(self.config_dir, "transient_certs", cert+ext)
if os.path.exists(certfile):
os.remove(certfile)
def manage(self):
if self.client_certs["active"]:
print("Active certificate: {}".format(self.client_certs["active"][0]))
print("1. Deactivate client certificate.")
print("2. Generate new certificate.")
print("3. Load previously generated certificate.")
print("4. Load externally created client certificate from file.")
print("Enter blank line to exit certificate manager.")
choice = input("> ").strip()
if choice == "1":
print("Deactivating client certificate.")
self._deactivate_client_cert()
elif choice == "2":
self._generate_persistent_client_cert()
elif choice == "3":
self._choose_client_cert()
elif choice == "4":
self._load_client_cert()
else:
print("Aborting.")
def associate_client_cert(self, context, gi):
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.client_certs["active"] and gi.host not in self.active_cert_domains:
if self.active_is_transient:
print("Permanently delete currently active transient certificate?")
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
return False
else:
print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
resp = input("Y/N? ")
if resp.strip().lower() in ("n", "no"):
print("Keeping certificate active for {}".format(gi.host))
self.active_cert_domains.append(gi.host)
self.client_certs[gi.host] = self.client_certs["active"]
else:
print("Deactivating certificate.")
self._deactivate_client_cert()
# Suggest reactivating previous certs
if not self.client_certs["active"] and gi.host in self.client_certs:
print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host))
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
self._activate_client_cert(*self.client_certs[gi.host])
else:
print("Remaining unidentified.")
self.client_certs.pop(gi.host)
# Associate certs to context based on above
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
return True
def handle_cert_request(self, meta, status, host):
# Don't do client cert stuff in restricted mode, as in principle
# it could be used to fill up the disk by creating a whole lot of
# certificates
print("SERVER SAYS: ", meta)
# Present different messages for different 6x statuses, but
# handle them the same.
if status in ("64", "65"):
print("The server rejected your certificate because it is either expired or not yet valid.")
elif status == "63":
print("The server did not accept your certificate.")
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
else:
print("The site {} is requesting a client certificate.".format(host))
print("This will allow the site to recognise you across requests.")
# Give the user choices
print("What do you want to do?")
print("1. Give up.")
print("2. Generate a new transient certificate.")
print("3. Generate a new persistent certificate.")
print("4. Load a previously generated certificate.")
print("5. Load a certificate from an external file.")
choice = input("> ").strip()
if choice == "2":
self._generate_transient_cert_cert()
elif choice == "3":
self._generate_persistent_client_cert()
elif choice == "4":
self._choose_client_cert()
elif choice == "5":
self._load_client_cert()
else:
print("Giving up.")
return False
if self.client_certs["active"]:
self.active_cert_domains.append(host)
self.client_certs[host] = self.client_certs["active"]
return True
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
format.
"""
print("Loading client certificate file, in PEM format (blank line to cancel)")
certfile = input("Certfile path: ").strip()
if not certfile:
print("Aborting.")
return
certfile = os.path.expanduser(certfile)
if not os.path.isfile(certfile):
print("Certificate file {} does not exist.".format(certfile))
return
print("Loading private key file, in PEM format (blank line to cancel)")
keyfile = input("Keyfile path: ").strip()
if not keyfile:
print("Aborting.")
return
keyfile = os.path.expanduser(keyfile)
if not os.path.isfile(keyfile):
print("Private key file {} does not exist.".format(keyfile))
return
self._activate_client_cert(certfile, keyfile)
def _generate_transient_cert_cert(self):
"""
Use `openssl` command to generate a new transient client certificate
with 24 hours of validity.
"""
certdir = os.path.join(self.config_dir, "transient_certs")
name = str(uuid.uuid4())
self._generate_client_cert(certdir, name, transient=True)
self.active_is_transient = True
self.transient_certs_created.append(name)
def _generate_persistent_client_cert(self):
"""
Interactively use `openssl` command to generate a new persistent client
certificate with one year of validity.
"""
certdir = os.path.join(self.config_dir, "client_certs")
print("What do you want to name this new certificate?")
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
name = input("> ")
if not name.strip():
print("Aborting.")
return
self._generate_client_cert(certdir, name)
def _generate_client_cert(self, certdir, basename, transient=False):
"""
Use `openssl` binary to generate a client certificate (which may be
transient or persistent) and save the certificate and private key to the
specified directory with the specified basename.
"""
if not os.path.exists(certdir):
os.makedirs(certdir)
certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, basename+".key")
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
if transient:
cmd += " -subj '/CN={}'".format(basename)
os.system(cmd)
self._activate_client_cert(certfile, keyfile)
def _choose_client_cert(self):
"""
Interactively select a previously generated client certificate and
activate it.
"""
certdir = os.path.join(self.config_dir, "client_certs")
certs = glob.glob(os.path.join(certdir, "*.crt"))
if len(certs) == 0:
print("There are no previously generated certificates.")
return
certdir = {}
for n, cert in enumerate(certs):
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
choice = input("> ").strip()
if choice in certdir:
certfile, keyfile = certdir[choice]
self._activate_client_cert(certfile, keyfile)
else:
print("What?")
def _activate_client_cert(self, certfile, keyfile):
self.client_certs["active"] = (certfile, keyfile)
self.active_cert_domains = []
ui_out.debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self):
if self.active_is_transient:
for filename in self.client_certs["active"]:
os.remove(filename)
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self.client_certs["active"] = None
self.active_cert_domains = []
self.active_is_transient = False