Add support for transient client certificates.

This commit is contained in:
Solderpunk 2020-05-11 22:22:24 +02:00
parent cf92e12653
commit 676ab85a9e
1 changed files with 63 additions and 20 deletions

83
av98.py
View File

@ -22,13 +22,13 @@ import random
import shlex import shlex
import shutil import shutil
import socket import socket
import ssl
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import urllib.parse
import ssl
import sys
import time import time
import urllib.parse
import uuid
import webbrowser import webbrowser
try: try:
@ -243,6 +243,8 @@ class GeminiClient(cmd.Cmd):
"active": None "active": None
} }
self.active_cert_domains = [] self.active_cert_domains = []
self.active_is_transient = False
self.transient_certs_created = []
self.options = { self.options = {
"debug" : False, "debug" : False,
@ -286,18 +288,14 @@ class GeminiClient(cmd.Cmd):
self._go_to_gi(new_gi) self._go_to_gi(new_gi)
return return
# Be careful with client certificates # Be careful with client certificates!
# Are we crossing a domain boundary?
if self.active_cert_domains and gi.host not in self.active_cert_domains: if self.active_cert_domains and gi.host not in self.active_cert_domains:
if self.active_is_transient: if self.active_is_transient:
print("Permanently delete currently active transient certificate?") print("Permanently delete currently active transient certificate?")
resp = input("Y/N? ") resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"): if resp.strip().lower() in ("y", "yes"):
print("Destroying certificate.") print("Destroying certificate.")
#for filename in self.client_certs["active"]:
# permadelete(filename)
# TODO - kill 'em on exit, too!
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self._deactivate_client_cert() self._deactivate_client_cert()
else: else:
print("Staying here.") print("Staying here.")
@ -397,10 +395,19 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return return
# Client cert # Client cert
elif status.startswith("6"): elif status.startswith("6"):
# We don't do transient certs yet # Transient certs are a special case
if status == "61": if status == "61":
print("Transient client certificates not yet supported.") print("The server is asking to start a transient client certificate session.")
return print("What do you want to do?")
print("1. Start a transient session.")
print("2. Refuse.")
choice = input("> ").strip()
if choice.strip() == "1":
self._generate_transient_cert_cert()
self._go_to_gi(gi, update_hist, handle)
return
else:
return
# Present different messages for different 6x statuses, but # Present different messages for different 6x statuses, but
# handle them the same. # handle them the same.
@ -421,7 +428,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
self._load_client_cert() self._load_client_cert()
self._go_to_gi(gi, update_hist, handle) self._go_to_gi(gi, update_hist, handle)
elif choice == "3": elif choice == "3":
self._generate_client_cert() self._generate_persistent_client_cert()
self._go_to_gi(gi, update_hist, handle) self._go_to_gi(gi, update_hist, handle)
else: else:
print("Giving up.") print("Giving up.")
@ -433,6 +440,17 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
# If we're here, this must be a success and there's a response body # If we're here, this must be a success and there's a response body
assert status.startswith("2") assert status.startswith("2")
# Can we terminate a transient client session?
if status == "21":
# Make sure we're actually in such a session
if self.active_is_transient:
self._deactivate_client_cert()
print("INFO: Server terminated transient client certificate session.")
else:
# Huh, that's weird
self._debug("Server issues a 21 but we're not in transient session?")
mime = meta mime = meta
if mime == "": if mime == "":
mime = "text/gemini; charset=utf-8" mime = "text/gemini; charset=utf-8"
@ -706,19 +724,32 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return return
self._activate_client_cert(certfile, keyfile) self._activate_client_cert(certfile, keyfile)
def _generate_client_cert(self): def _generate_transient_cert_cert(self):
certdir = os.path.join(self.config_dir, "transient_certs")
name = str(uuid.uuid4())
self._generate_client_cert(certdir, name, prompt=False)
self.active_is_transient = True
self.transient_certs_created.append(name)
def _generate_persistent_client_cert(self):
print("What do you want to name this new certificate?") print("What do you want to name this new certificate?")
print("Answering `mycert` will create `~/.av98/certs/mycert.crt` and `~/.av98/certs/mycert.key`") print("Answering `mycert` will create `~/.av98/certs/mycert.crt` and `~/.av98/certs/mycert.key`")
name = input() name = input()
if not name.strip(): if not name.strip():
print("Aborting.") print("Aborting.")
return return
certdir = os.path.expanduser("~/.av98/certs") certdir = os.path.join(self.config_dir, "certs")
self._generate_client_cert(self, certdir, name)
def _generate_client_cert(self, certdir, basename, prompt=True):
if not os.path.exists(certdir): if not os.path.exists(certdir):
os.makedirs(certdir) os.makedirs(certdir)
certfile = os.path.join(certdir, name+".crt") certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, name+".key") keyfile = os.path.join(certdir, basename+".key")
os.system("openssl req -x509 -newkey rsa:2048 -days 365 -nodes -keyout {} -out {}".format(keyfile, certfile)) cmd = "openssl req -x509 -newkey rsa:2048 -days 365 -nodes -keyout {} -out {}".format(keyfile, certfile)
if not prompt:
cmd += " -subj='/CN={}'".format(basename)
os.system(cmd)
self._activate_client_cert(certfile, keyfile) self._activate_client_cert(certfile, keyfile)
def _activate_client_cert(self, certfile, keyfile): def _activate_client_cert(self, certfile, keyfile):
@ -728,9 +759,15 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
self._debug("Using ID {} / {}.".format(*self.client_certs["active"])) self._debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self): def _deactivate_client_cert(self):
if self.active_is_transient:
for filename in self.client_certs["active"]:
os.remove(filename)
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self.client_certs["active"] = None self.client_certs["active"] = None
self.active_cert_domains = [] self.active_cert_domains = []
self.prompt = self.no_cert_prompt self.prompt = self.no_cert_prompt
self.active_is_transient = False
# Cmd implementation follows # Cmd implementation follows
@ -1119,7 +1156,7 @@ Use 'ls -l' to see URLs."""
def do_add(self, line): def do_add(self, line):
"""Add the current URL to the bookmarks menu. """Add the current URL to the bookmarks menu.
Optionally, specify the new name for the bookmark.""" Optionally, specify the new name for the bookmark."""
with open(os.path.join(self.config_dir, "bookmarks.txt"), "a") as fp: with open(os.path.join(self.config_dir, "bookmarks.gmi"), "a") as fp:
fp.write(self.gi.to_map_line(line)) fp.write(self.gi.to_map_line(line))
def do_bookmarks(self, line): def do_bookmarks(self, line):
@ -1127,7 +1164,7 @@ Optionally, specify the new name for the bookmark."""
'bookmarks' shows all bookmarks. 'bookmarks' shows all bookmarks.
'bookmarks n' navigates immediately to item n in the bookmark menu. 'bookmarks n' navigates immediately to item n in the bookmark menu.
Bookmarks are stored using the 'add' command.""" Bookmarks are stored using the 'add' command."""
bm_file = os.path.join(self.config_dir, "bookmarks.txt") bm_file = os.path.join(self.config_dir, "bookmarks.gmi")
if not os.path.exists(bm_file): if not os.path.exists(bm_file):
print("You need to 'add' some bookmarks, first!") print("You need to 'add' some bookmarks, first!")
return return
@ -1193,6 +1230,12 @@ current gemini browsing session."""
os.unlink(self.tmp_filename) os.unlink(self.tmp_filename)
if self.idx_filename: if self.idx_filename:
os.unlink(self.idx_filename) os.unlink(self.idx_filename)
for cert in self.transient_certs_created:
for ext in (".crt", ".key"):
certfile = os.path.join(self.config_dir, "transient_certs", cert+ext)
if os.path.exists(certfile):
os.remove(certfile)
print() print()
print("Thank you for flying AV-98!") print("Thank you for flying AV-98!")
sys.exit() sys.exit()