Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
dluciv | 5afb946160 |
|
@ -1,26 +0,0 @@
|
||||||
[build-system]
|
|
||||||
requires = ["setuptools>=61.0"]
|
|
||||||
build-backend = "setuptools.build_meta"
|
|
||||||
[project]
|
|
||||||
name = "AV-98"
|
|
||||||
dynamic = ["version"]
|
|
||||||
description = "Command line Gemini client"
|
|
||||||
authors = [{name="Solderpunk", email="solderpunk@posteo.net"}]
|
|
||||||
classifiers = [
|
|
||||||
"License :: OSI Approved :: BSD License",
|
|
||||||
"Programming Language :: Python :: 3 :: Only",
|
|
||||||
"Topic :: Communications",
|
|
||||||
"Intended Audience :: End Users/Desktop",
|
|
||||||
"Environment :: Console",
|
|
||||||
"Development Status :: 5 - Production/Stable",
|
|
||||||
]
|
|
||||||
[project.urls]
|
|
||||||
Homepage = "https://tildegit.org/solderpunk/AV-98/"
|
|
||||||
Issues = "https://tildegit.org/solderpunk/AV-98/issues"
|
|
||||||
[project.scripts]
|
|
||||||
av98 = "av98.main:main"
|
|
||||||
[project.optional-dependencies]
|
|
||||||
tofu = ["cryptography"]
|
|
||||||
colour = ["ansiwrap"]
|
|
||||||
[tool.setuptools.dynamic]
|
|
||||||
version = {attr = "av98.__version__"}
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name='AV-98',
|
||||||
|
version='1.0.2dev',
|
||||||
|
description="Command line Gemini client.",
|
||||||
|
author="Solderpunk",
|
||||||
|
author_email="solderpunk@sdf.org",
|
||||||
|
url='https://tildegit.org/solderpunk/AV-98/',
|
||||||
|
classifiers=[
|
||||||
|
'License :: OSI Approved :: BSD License',
|
||||||
|
'Programming Language :: Python :: 3 :: Only',
|
||||||
|
'Topic :: Communications',
|
||||||
|
'Intended Audience :: End Users/Desktop',
|
||||||
|
'Environment :: Console',
|
||||||
|
'Development Status :: 4 - Beta',
|
||||||
|
],
|
||||||
|
py_modules = ["av98"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": ["av98=av98:main"]
|
||||||
|
},
|
||||||
|
install_requires=[],
|
||||||
|
)
|
|
@ -1 +0,0 @@
|
||||||
__version__ = "1.1.0dev"
|
|
|
@ -1,76 +0,0 @@
|
||||||
_MAX_CACHE_SIZE = 10
|
|
||||||
_MAX_CACHE_AGE_SECS = 180
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import os.path
|
|
||||||
import shutil
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
|
|
||||||
ui_out = logging.getLogger("av98_logger")
|
|
||||||
|
|
||||||
class Cache:
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
self.cache = {}
|
|
||||||
self.cache_timestamps = {}
|
|
||||||
self.tempdir = tempfile.TemporaryDirectory()
|
|
||||||
|
|
||||||
def check(self, url):
|
|
||||||
if url not in self.cache:
|
|
||||||
return False
|
|
||||||
now = time.time()
|
|
||||||
cached = self.cache_timestamps[url]
|
|
||||||
if now - cached > _MAX_CACHE_AGE_SECS:
|
|
||||||
ui_out.debug("Expiring old cached copy of resource.")
|
|
||||||
self._remove(url)
|
|
||||||
return False
|
|
||||||
ui_out.debug("Found cached copy of resource.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _remove(self, url):
|
|
||||||
self.cache_timestamps.pop(url)
|
|
||||||
mime, filename = self.cache.pop(url)
|
|
||||||
os.unlink(filename)
|
|
||||||
self.validatecache()
|
|
||||||
|
|
||||||
def add(self, url, mime, filename):
|
|
||||||
# Copy client's buffer file to new cache file
|
|
||||||
tmpf = tempfile.NamedTemporaryFile(dir=self.tempdir.name, delete=False)
|
|
||||||
tmpf.close()
|
|
||||||
shutil.copyfile(filename, tmpf.name)
|
|
||||||
# Remember details
|
|
||||||
self.cache_timestamps[url] = time.time()
|
|
||||||
self.cache[url] = (mime, tmpf.name)
|
|
||||||
if len(self.cache) > _MAX_CACHE_SIZE:
|
|
||||||
self._trim()
|
|
||||||
self.validatecache()
|
|
||||||
|
|
||||||
def _trim(self):
|
|
||||||
# Order cache entries by age
|
|
||||||
lru = [(t, u) for (u, t) in self.cache_timestamps.items()]
|
|
||||||
lru.sort()
|
|
||||||
# Drop the oldest entry no matter what
|
|
||||||
_, url = lru[0]
|
|
||||||
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
|
|
||||||
self._remove(url)
|
|
||||||
# Drop other entries if they are older than the limit
|
|
||||||
now = time.time()
|
|
||||||
for cached, url in lru[1:]:
|
|
||||||
if now - cached > _MAX_CACHE_AGE_SECS:
|
|
||||||
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
|
|
||||||
self._remove(url)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
self.validatecache()
|
|
||||||
|
|
||||||
def get(self, url):
|
|
||||||
return self.cache[url]
|
|
||||||
|
|
||||||
def validatecache(self):
|
|
||||||
assert self.cache.keys() == self.cache_timestamps.keys()
|
|
||||||
for _, filename in self.cache.values():
|
|
||||||
assert os.path.isfile(filename)
|
|
||||||
|
|
|
@ -1,233 +0,0 @@
|
||||||
import glob
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import os.path
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import av98.util as util
|
|
||||||
|
|
||||||
ui_out = logging.getLogger("av98_logger")
|
|
||||||
|
|
||||||
class ClientCertificateManager:
|
|
||||||
|
|
||||||
def __init__(self, config_dir):
|
|
||||||
|
|
||||||
self.config_dir = config_dir
|
|
||||||
self.client_certs = {
|
|
||||||
"active": None
|
|
||||||
}
|
|
||||||
self.active_cert_domains = []
|
|
||||||
self.active_is_transient = False
|
|
||||||
self.transient_certs_created = []
|
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
for cert in self.transient_certs_created:
|
|
||||||
for ext in (".crt", ".key"):
|
|
||||||
certfile = os.path.join(self.config_dir, "transient_certs", cert+ext)
|
|
||||||
if os.path.exists(certfile):
|
|
||||||
os.remove(certfile)
|
|
||||||
|
|
||||||
def manage(self):
|
|
||||||
if self.client_certs["active"]:
|
|
||||||
print("Active certificate: {}".format(self.client_certs["active"][0]))
|
|
||||||
print("1. Deactivate client certificate.")
|
|
||||||
print("2. Generate new certificate.")
|
|
||||||
print("3. Load previously generated certificate.")
|
|
||||||
print("4. Load externally created client certificate from file.")
|
|
||||||
print("Enter blank line to exit certificate manager.")
|
|
||||||
choice = input("> ").strip()
|
|
||||||
if choice == "1":
|
|
||||||
print("Deactivating client certificate.")
|
|
||||||
self._deactivate_client_cert()
|
|
||||||
elif choice == "2":
|
|
||||||
self._generate_persistent_client_cert()
|
|
||||||
elif choice == "3":
|
|
||||||
self._choose_client_cert()
|
|
||||||
elif choice == "4":
|
|
||||||
self._load_client_cert()
|
|
||||||
else:
|
|
||||||
print("Aborting.")
|
|
||||||
|
|
||||||
def associate_client_cert(self, context, gi):
|
|
||||||
# Be careful with client certificates!
|
|
||||||
# Are we crossing a domain boundary?
|
|
||||||
if self.client_certs["active"] and gi.host not in self.active_cert_domains:
|
|
||||||
if self.active_is_transient:
|
|
||||||
if util.ask_yes_no("Permanently delete currently active transient certificate?"):
|
|
||||||
print("Destroying certificate.")
|
|
||||||
self._deactivate_client_cert()
|
|
||||||
else:
|
|
||||||
print("Staying here.")
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
if util.ask_yes_no("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?"):
|
|
||||||
print("Deactivating certificate.")
|
|
||||||
self._deactivate_client_cert()
|
|
||||||
else:
|
|
||||||
print("Keeping certificate active for {}".format(gi.host))
|
|
||||||
self.active_cert_domains.append(gi.host)
|
|
||||||
self.client_certs[gi.host] = self.client_certs["active"]
|
|
||||||
|
|
||||||
# Suggest reactivating previous certs
|
|
||||||
if not self.client_certs["active"] and gi.host in self.client_certs:
|
|
||||||
if util.ask_yes_no("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host)):
|
|
||||||
self._activate_client_cert(*self.client_certs[gi.host])
|
|
||||||
self.active_cert_domains.append(gi.host)
|
|
||||||
else:
|
|
||||||
print("Remaining unidentified.")
|
|
||||||
self.client_certs.pop(gi.host)
|
|
||||||
|
|
||||||
# Associate certs to context based on above
|
|
||||||
if self.client_certs["active"]:
|
|
||||||
certfile, keyfile = self.client_certs["active"]
|
|
||||||
context.load_cert_chain(certfile, keyfile)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def is_cert_active(self):
|
|
||||||
return self.client_certs["active"] != None
|
|
||||||
|
|
||||||
def handle_cert_request(self, meta, status, host):
|
|
||||||
|
|
||||||
# Don't do client cert stuff in restricted mode, as in principle
|
|
||||||
# it could be used to fill up the disk by creating a whole lot of
|
|
||||||
# certificates
|
|
||||||
print("SERVER SAYS: ", meta)
|
|
||||||
# Present different messages for different 6x statuses, but
|
|
||||||
# handle them the same.
|
|
||||||
if status in ("64", "65"):
|
|
||||||
print("The server rejected your certificate because it is either expired or not yet valid.")
|
|
||||||
elif status == "63":
|
|
||||||
print("The server did not accept your certificate.")
|
|
||||||
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
|
|
||||||
else:
|
|
||||||
print("The site {} is requesting a client certificate.".format(host))
|
|
||||||
print("This will allow the site to recognise you across requests.")
|
|
||||||
|
|
||||||
# Give the user choices
|
|
||||||
print("What do you want to do?")
|
|
||||||
print("1. Give up.")
|
|
||||||
print("2. Generate a new transient certificate.")
|
|
||||||
print("3. Generate a new persistent certificate.")
|
|
||||||
print("4. Load a previously generated certificate.")
|
|
||||||
print("5. Load a certificate from an external file.")
|
|
||||||
choice = input("> ").strip()
|
|
||||||
if choice == "2":
|
|
||||||
self._generate_transient_cert_cert()
|
|
||||||
elif choice == "3":
|
|
||||||
self._generate_persistent_client_cert()
|
|
||||||
elif choice == "4":
|
|
||||||
self._choose_client_cert()
|
|
||||||
elif choice == "5":
|
|
||||||
self._load_client_cert()
|
|
||||||
else:
|
|
||||||
print("Giving up.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if self.client_certs["active"]:
|
|
||||||
self.active_cert_domains.append(host)
|
|
||||||
self.client_certs[host] = self.client_certs["active"]
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _load_client_cert(self):
|
|
||||||
"""
|
|
||||||
Interactively load a TLS client certificate from the filesystem in PEM
|
|
||||||
format.
|
|
||||||
"""
|
|
||||||
print("Loading client certificate file, in PEM format (blank line to cancel)")
|
|
||||||
certfile = input("Certfile path: ").strip()
|
|
||||||
if not certfile:
|
|
||||||
print("Aborting.")
|
|
||||||
return
|
|
||||||
certfile = os.path.expanduser(certfile)
|
|
||||||
if not os.path.isfile(certfile):
|
|
||||||
print("Certificate file {} does not exist.".format(certfile))
|
|
||||||
return
|
|
||||||
print("Loading private key file, in PEM format (blank line to cancel)")
|
|
||||||
keyfile = input("Keyfile path: ").strip()
|
|
||||||
if not keyfile:
|
|
||||||
print("Aborting.")
|
|
||||||
return
|
|
||||||
keyfile = os.path.expanduser(keyfile)
|
|
||||||
if not os.path.isfile(keyfile):
|
|
||||||
print("Private key file {} does not exist.".format(keyfile))
|
|
||||||
return
|
|
||||||
self._activate_client_cert(certfile, keyfile)
|
|
||||||
|
|
||||||
def _generate_transient_cert_cert(self):
|
|
||||||
"""
|
|
||||||
Use `openssl` command to generate a new transient client certificate
|
|
||||||
with 24 hours of validity.
|
|
||||||
"""
|
|
||||||
certdir = os.path.join(self.config_dir, "transient_certs")
|
|
||||||
name = str(uuid.uuid4())
|
|
||||||
self._generate_client_cert(certdir, name, transient=True)
|
|
||||||
self.active_is_transient = True
|
|
||||||
self.transient_certs_created.append(name)
|
|
||||||
|
|
||||||
def _generate_persistent_client_cert(self):
|
|
||||||
"""
|
|
||||||
Interactively use `openssl` command to generate a new persistent client
|
|
||||||
certificate with one year of validity.
|
|
||||||
"""
|
|
||||||
certdir = os.path.join(self.config_dir, "client_certs")
|
|
||||||
print("What do you want to name this new certificate?")
|
|
||||||
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
|
|
||||||
name = input("> ")
|
|
||||||
if not name.strip():
|
|
||||||
print("Aborting.")
|
|
||||||
return
|
|
||||||
self._generate_client_cert(certdir, name)
|
|
||||||
|
|
||||||
def _generate_client_cert(self, certdir, basename, transient=False):
|
|
||||||
"""
|
|
||||||
Use `openssl` binary to generate a client certificate (which may be
|
|
||||||
transient or persistent) and save the certificate and private key to the
|
|
||||||
specified directory with the specified basename.
|
|
||||||
"""
|
|
||||||
if not os.path.exists(certdir):
|
|
||||||
os.makedirs(certdir)
|
|
||||||
certfile = os.path.join(certdir, basename+".crt")
|
|
||||||
keyfile = os.path.join(certdir, basename+".key")
|
|
||||||
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
|
|
||||||
if transient:
|
|
||||||
cmd += " -subj '/CN={}'".format(basename)
|
|
||||||
os.system(cmd)
|
|
||||||
self._activate_client_cert(certfile, keyfile)
|
|
||||||
|
|
||||||
def _choose_client_cert(self):
|
|
||||||
"""
|
|
||||||
Interactively select a previously generated client certificate and
|
|
||||||
activate it.
|
|
||||||
"""
|
|
||||||
certdir = os.path.join(self.config_dir, "client_certs")
|
|
||||||
certs = glob.glob(os.path.join(certdir, "*.crt"))
|
|
||||||
if len(certs) == 0:
|
|
||||||
print("There are no previously generated certificates.")
|
|
||||||
return
|
|
||||||
certdir = {}
|
|
||||||
for n, cert in enumerate(certs):
|
|
||||||
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
|
|
||||||
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
|
|
||||||
choice = input("> ").strip()
|
|
||||||
if choice in certdir:
|
|
||||||
certfile, keyfile = certdir[choice]
|
|
||||||
self._activate_client_cert(certfile, keyfile)
|
|
||||||
else:
|
|
||||||
print("What?")
|
|
||||||
|
|
||||||
def _activate_client_cert(self, certfile, keyfile):
|
|
||||||
self.client_certs["active"] = (certfile, keyfile)
|
|
||||||
self.active_cert_domains = []
|
|
||||||
ui_out.debug("Using ID {} / {}.".format(*self.client_certs["active"]))
|
|
||||||
|
|
||||||
def _deactivate_client_cert(self):
|
|
||||||
if self.active_is_transient:
|
|
||||||
for filename in self.client_certs["active"]:
|
|
||||||
os.remove(filename)
|
|
||||||
for domain in self.active_cert_domains:
|
|
||||||
self.client_certs.pop(domain)
|
|
||||||
self.client_certs["active"] = None
|
|
||||||
self.active_cert_domains = []
|
|
||||||
self.active_is_transient = False
|
|
1416
src/av98/client.py
1416
src/av98/client.py
File diff suppressed because it is too large
Load Diff
130
src/av98/main.py
130
src/av98/main.py
|
@ -1,130 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
# AV-98 Gemini client
|
|
||||||
# Dervied from VF-1 (https://github.com/solderpunk/VF-1),
|
|
||||||
# (C) 2019, 2020, 2023 Solderpunk <solderpunk@posteo.net>
|
|
||||||
# With contributions from:
|
|
||||||
# - danceka <hannu.hartikainen@gmail.com>
|
|
||||||
# - <jprjr@tilde.club>
|
|
||||||
# - <vee@vnsf.xyz>
|
|
||||||
# - Klaus Alexander Seistrup <klaus@seistrup.dk>
|
|
||||||
# - govynnus <govynnus@sdf.org>
|
|
||||||
# - Nik <nic@tilde.team>
|
|
||||||
# - <sario528@ctrl-c.club>
|
|
||||||
# - rmgr
|
|
||||||
# - Aleksey Ryndin
|
|
||||||
import argparse
|
|
||||||
import os.path
|
|
||||||
import shutil
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from av98 import __version__
|
|
||||||
from av98.client import GeminiClient
|
|
||||||
|
|
||||||
def main():
|
|
||||||
|
|
||||||
# Parse args
|
|
||||||
parser = argparse.ArgumentParser(description='A command line gemini client.')
|
|
||||||
parser.add_argument('--bookmarks', action='store_true',
|
|
||||||
help='start with your list of bookmarks')
|
|
||||||
parser.add_argument('--dl', '--download', action='store_true',
|
|
||||||
help='download a single URL and quit')
|
|
||||||
parser.add_argument('-o', '--output', metavar='FILE',
|
|
||||||
help='filename to save --dl URL to')
|
|
||||||
parser.add_argument('--tls-cert', metavar='FILE', help='TLS client certificate file')
|
|
||||||
parser.add_argument('--tls-key', metavar='FILE', help='TLS client certificate private key file')
|
|
||||||
parser.add_argument('--restricted', action="store_true", help='Disallow shell, add, and save commands')
|
|
||||||
parser.add_argument('--version', action='store_true',
|
|
||||||
help='display version information and quit')
|
|
||||||
parser.add_argument('url', metavar='URL', nargs='*',
|
|
||||||
help='start with this URL')
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# Handle --version
|
|
||||||
if args.version:
|
|
||||||
print("AV-98 " + __version__)
|
|
||||||
sys.exit()
|
|
||||||
|
|
||||||
# Instantiate client
|
|
||||||
gc = GeminiClient(args.restricted)
|
|
||||||
|
|
||||||
# Activate client certs now in case they are needed for --download below
|
|
||||||
if args.tls_cert and args.tls_key:
|
|
||||||
gc.client_cert_manager._activate_client_cert(args.tls_cert, args.tls_key)
|
|
||||||
for url in args.url:
|
|
||||||
gi = GeminiItem(url)
|
|
||||||
gc.client_cert_manager.active_cert_domains.append(gi.host)
|
|
||||||
|
|
||||||
# Handle --download
|
|
||||||
if args.dl:
|
|
||||||
gc.onecmd("set debug True")
|
|
||||||
# Download
|
|
||||||
gi = GeminiItem(args.url[0])
|
|
||||||
gi, mime = gc._fetch_over_network(gi)
|
|
||||||
# Decide on a filename
|
|
||||||
if args.output:
|
|
||||||
filename = args.output
|
|
||||||
else:
|
|
||||||
if mime == "text/gemini":
|
|
||||||
# Parse gemtext in the hopes of getting a gi.name for the filename
|
|
||||||
gc.active_raw_file = gc.raw_file_buffer
|
|
||||||
gc._handle_gemtext(gi)
|
|
||||||
filename = gi.derive_filename(mime)
|
|
||||||
# Copy from temp file to pwd with a nice name
|
|
||||||
shutil.copyfile(gc.raw_file_buffer, filename)
|
|
||||||
size = os.path.getsize(filename)
|
|
||||||
# Notify user where the file ended up
|
|
||||||
print("Wrote %d byte %s response to %s." % (size, mime, filename))
|
|
||||||
gc.do_quit()
|
|
||||||
sys.exit()
|
|
||||||
|
|
||||||
# Process config file
|
|
||||||
rcfile = os.path.join(gc.config_dir, "av98rc")
|
|
||||||
if os.path.exists(rcfile):
|
|
||||||
print("Using config %s" % rcfile)
|
|
||||||
with open(rcfile, "r") as fp:
|
|
||||||
for line in fp:
|
|
||||||
line = line.strip()
|
|
||||||
if ((args.bookmarks or args.url) and
|
|
||||||
any((line.startswith(x) for x in ("go", "g", "tour", "t")))
|
|
||||||
):
|
|
||||||
if args.bookmarks:
|
|
||||||
print("Skipping rc command \"%s\" due to --bookmarks option." % line)
|
|
||||||
else:
|
|
||||||
print("Skipping rc command \"%s\" due to provided URLs." % line)
|
|
||||||
continue
|
|
||||||
gc.cmdqueue.append(line)
|
|
||||||
|
|
||||||
# Say hi
|
|
||||||
print("Welcome to AV-98!")
|
|
||||||
if args.restricted:
|
|
||||||
print("Restricted mode engaged!")
|
|
||||||
print("Enjoy your patrol through Geminispace...")
|
|
||||||
|
|
||||||
# Add commands to the queue based on command line arguments
|
|
||||||
if args.bookmarks:
|
|
||||||
gc.cmdqueue.append("bookmarks")
|
|
||||||
elif args.url:
|
|
||||||
if len(args.url) == 1:
|
|
||||||
gc.cmdqueue.append("go %s" % args.url[0])
|
|
||||||
else:
|
|
||||||
for url in args.url:
|
|
||||||
if not url.startswith("gemini://"):
|
|
||||||
url = "gemini://" + url
|
|
||||||
gc.cmdqueue.append("tour %s" % url)
|
|
||||||
gc.cmdqueue.append("tour")
|
|
||||||
|
|
||||||
# Endless interpret loop until user quits
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
gc.cmdloop()
|
|
||||||
break
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("")
|
|
||||||
|
|
||||||
# Say goodbye
|
|
||||||
print()
|
|
||||||
print("Thank you for patrolling with AV-98!")
|
|
||||||
sys.exit()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
210
src/av98/tofu.py
210
src/av98/tofu.py
|
@ -1,210 +0,0 @@
|
||||||
import datetime
|
|
||||||
import hashlib
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import os.path
|
|
||||||
import sqlite3
|
|
||||||
import ssl
|
|
||||||
import time
|
|
||||||
|
|
||||||
try:
|
|
||||||
from cryptography import x509
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
|
||||||
_HAS_CRYPTOGRAPHY = True
|
|
||||||
_BACKEND = default_backend()
|
|
||||||
except ModuleNotFoundError:
|
|
||||||
_HAS_CRYPTOGRAPHY = False
|
|
||||||
|
|
||||||
import av98.util as util
|
|
||||||
ui_out = logging.getLogger("av98_logger")
|
|
||||||
|
|
||||||
class TofuStore:
|
|
||||||
|
|
||||||
def __init__(self, config_dir):
|
|
||||||
|
|
||||||
self.config_dir = config_dir
|
|
||||||
self.certdir = os.path.join(config_dir, "cert_cache")
|
|
||||||
if not os.path.exists(self.certdir):
|
|
||||||
os.makedirs(self.certdir)
|
|
||||||
db_path = os.path.join(self.config_dir, "tofu.db")
|
|
||||||
self.db_conn = sqlite3.connect(db_path)
|
|
||||||
self.db_cur = self.db_conn.cursor()
|
|
||||||
|
|
||||||
self.create_db()
|
|
||||||
self.update_db()
|
|
||||||
|
|
||||||
def create_db(self):
|
|
||||||
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
|
|
||||||
(hostname text, port integer, address text, fingerprint text,
|
|
||||||
first_seen date, last_seen date, count integer)""")
|
|
||||||
|
|
||||||
def update_db(self):
|
|
||||||
# Update 1 - check for port column
|
|
||||||
try:
|
|
||||||
self.db_cur.execute("""SELECT port FROM cert_cache where 1=0""")
|
|
||||||
has_port = True
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
has_port = False
|
|
||||||
if not has_port:
|
|
||||||
self.db_cur.execute("""ALTER TABLE cert_cache ADD COLUMN port integer""")
|
|
||||||
self.db_cur.execute("""UPDATE cert_cache SET port= 1965 WHERE count > 0""")
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
|
|
||||||
self.db_conn.commit()
|
|
||||||
self.db_conn.close()
|
|
||||||
|
|
||||||
def validate_cert(self, address, port, host, cert):
|
|
||||||
"""
|
|
||||||
Validate a TLS certificate in TOFU mode.
|
|
||||||
|
|
||||||
If the cryptography module is installed:
|
|
||||||
- Check the certificate Common Name or SAN matches `host`
|
|
||||||
- Check the certificate's not valid before date is in the past
|
|
||||||
- Check the certificate's not valid after date is in the future
|
|
||||||
|
|
||||||
Whether the cryptography module is installed or not, check the
|
|
||||||
certificate's fingerprint against the TOFU database to see if we've
|
|
||||||
previously encountered a different certificate for this hostname and
|
|
||||||
port
|
|
||||||
"""
|
|
||||||
|
|
||||||
now = datetime.datetime.utcnow()
|
|
||||||
|
|
||||||
# Do 'advanced' checks if Cryptography library is installed
|
|
||||||
if _HAS_CRYPTOGRAPHY:
|
|
||||||
self.check_cert_expiry_and_names(cert, host, now)
|
|
||||||
|
|
||||||
# Compute SHA256 fingerprint
|
|
||||||
sha = hashlib.sha256()
|
|
||||||
sha.update(cert)
|
|
||||||
fingerprint = sha.hexdigest()
|
|
||||||
|
|
||||||
# Have we been here before?
|
|
||||||
self.db_cur.execute("""SELECT fingerprint, address, first_seen, last_seen, count
|
|
||||||
FROM cert_cache WHERE hostname=? AND port=?""", (host, port))
|
|
||||||
cached_certs = self.db_cur.fetchall()
|
|
||||||
|
|
||||||
# If not, cache this first cert and we're done
|
|
||||||
if not cached_certs:
|
|
||||||
ui_out.debug("TOFU: Blindly trusting first ever certificate for this host!")
|
|
||||||
self.cache_new_cert(cert, host, port, address, fingerprint, now)
|
|
||||||
return
|
|
||||||
|
|
||||||
# If we have, check the received cert against the cache
|
|
||||||
if self.find_cert_in_cache(host, port, fingerprint, cached_certs, now):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Handle an unrecognised cert
|
|
||||||
ui_out.debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
|
|
||||||
|
|
||||||
## Find the most recently seen previous cert for reporting
|
|
||||||
most_recent = None
|
|
||||||
for cached_fingerprint, cached_address, first, last, count in cached_certs:
|
|
||||||
if not most_recent or last > most_recent:
|
|
||||||
most_recent = last
|
|
||||||
most_recent_cert = cached_fingerprint
|
|
||||||
most_recent_address = cached_address
|
|
||||||
most_recent_count = count
|
|
||||||
|
|
||||||
## Report the situation
|
|
||||||
print("****************************************")
|
|
||||||
print("[SECURITY WARNING] Unrecognised certificate!")
|
|
||||||
print("The certificate presented for {}:{} ({}) has never been seen before.".format(host, port, address))
|
|
||||||
print("This MIGHT be a Man-in-the-Middle attack.")
|
|
||||||
print("A different certificate has previously been seen {} times.".format(most_recent_count))
|
|
||||||
if _HAS_CRYPTOGRAPHY:
|
|
||||||
previous_ttl = self.get_cached_cert_expiry(most_recent_cert) - now
|
|
||||||
if previous_ttl < datetime.timedelta():
|
|
||||||
print("That certificate has expired, which reduces suspicion somewhat.")
|
|
||||||
else:
|
|
||||||
print("That certificate is still valid for: {}".format(previous_ttl))
|
|
||||||
if most_recent_address == address:
|
|
||||||
print("The new certificate is being served from the same IP address as the previous one.")
|
|
||||||
else:
|
|
||||||
print("The new certificate is being served from a DIFFERNET IP address as the previous one.")
|
|
||||||
print("****************************************")
|
|
||||||
print("Attempt to verify the new certificate fingerprint out-of-band:")
|
|
||||||
print(fingerprint)
|
|
||||||
|
|
||||||
## Ask the question
|
|
||||||
if util.ask_yes_no("Accept this new certificate?"):
|
|
||||||
self.cache_new_cert(cert, host, port, address, fingerprint, now)
|
|
||||||
else:
|
|
||||||
raise Exception("TOFU Failure!")
|
|
||||||
|
|
||||||
def cache_new_cert(self, cert, host, port, address, fingerprint, now):
|
|
||||||
"""
|
|
||||||
Accept a new certificate for a given host/port combo.
|
|
||||||
"""
|
|
||||||
# Save cert to disk
|
|
||||||
with open(os.path.join(self.certdir, fingerprint+".crt"), "wb") as fp:
|
|
||||||
fp.write(cert)
|
|
||||||
|
|
||||||
# Record in DB
|
|
||||||
self.db_cur.execute("""INSERT INTO cert_cache
|
|
||||||
(hostname, port, address, fingerprint, first_seen, last_seen, count)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
||||||
(host, port, address, fingerprint, now, now, 1))
|
|
||||||
self.db_conn.commit()
|
|
||||||
|
|
||||||
def check_cert_expiry_and_names(self, cert, host, now):
|
|
||||||
"""
|
|
||||||
- Check the certificate Common Name or SAN matches `host`
|
|
||||||
- Check the certificate's not valid before date is in the past
|
|
||||||
- Check the certificate's not valid after date is in the future
|
|
||||||
"""
|
|
||||||
c = x509.load_der_x509_certificate(cert, _BACKEND)
|
|
||||||
|
|
||||||
# Check certificate validity dates
|
|
||||||
if c.not_valid_before >= now:
|
|
||||||
raise ssl.CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
|
|
||||||
elif c.not_valid_after <= now:
|
|
||||||
raise ssl.CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
|
|
||||||
|
|
||||||
# Check certificate hostnames
|
|
||||||
names = []
|
|
||||||
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
|
|
||||||
if common_name:
|
|
||||||
names.append(common_name[0].value)
|
|
||||||
try:
|
|
||||||
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
|
|
||||||
except x509.ExtensionNotFound:
|
|
||||||
pass
|
|
||||||
names = set(names)
|
|
||||||
for name in names:
|
|
||||||
try:
|
|
||||||
ssl._dnsname_match(name, host)
|
|
||||||
break
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
# If we didn't break out, none of the names were valid
|
|
||||||
raise ssl.CertificateError("Hostname does not match certificate common name or any alternative names.")
|
|
||||||
|
|
||||||
def find_cert_in_cache(self, host, port, fingerprint, cached_certs, now):
|
|
||||||
"""
|
|
||||||
Try to find a cached certificate for the given host:port matching the
|
|
||||||
given fingerprint. If one is found, update the "last seen" DB value.
|
|
||||||
"""
|
|
||||||
for cached_fingerprint, cached_address, first, last, count in cached_certs:
|
|
||||||
if fingerprint == cached_fingerprint:
|
|
||||||
# Matched!
|
|
||||||
ui_out.debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
|
|
||||||
self.db_cur.execute("""UPDATE cert_cache
|
|
||||||
SET last_seen=?, count=?
|
|
||||||
WHERE hostname=? AND port=? AND fingerprint=?""",
|
|
||||||
(now, count+1, host, port, fingerprint))
|
|
||||||
self.db_conn.commit()
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_cached_cert_expiry(self, fingerprint):
|
|
||||||
"""
|
|
||||||
Parse the stored certificate with a given fingerprint and return its
|
|
||||||
expiry date.
|
|
||||||
"""
|
|
||||||
with open(os.path.join(self.certdir, fingerprint+".crt"), "rb") as fp:
|
|
||||||
previous_cert = fp.read()
|
|
||||||
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
|
|
||||||
return previous_cert.not_valid_after
|
|
|
@ -1,50 +0,0 @@
|
||||||
import os.path
|
|
||||||
|
|
||||||
# Cheap and cheerful URL detector
|
|
||||||
def looks_like_url(word):
|
|
||||||
return "." in word and word.startswith("gemini://")
|
|
||||||
|
|
||||||
def handle_filename_collisions(filename):
|
|
||||||
while os.path.exists(filename):
|
|
||||||
print("File %s already exists!" % filename)
|
|
||||||
filename = input("Choose a new one, or leave blank to abort: ")
|
|
||||||
return filename
|
|
||||||
|
|
||||||
def ask_yes_no(prompt, default=None):
|
|
||||||
print(prompt)
|
|
||||||
if default == True:
|
|
||||||
prompt = "(Y)/N: "
|
|
||||||
elif default == False:
|
|
||||||
prompt = "Y/(N): "
|
|
||||||
else:
|
|
||||||
prompt = "Y/N: "
|
|
||||||
while True:
|
|
||||||
resp = input(prompt)
|
|
||||||
if not resp.strip() and default != None:
|
|
||||||
return default
|
|
||||||
elif resp.strip().lower() in ("y", "yes"):
|
|
||||||
return True
|
|
||||||
elif resp.strip().lower() in ("n","no"):
|
|
||||||
return False
|
|
||||||
|
|
||||||
def fix_ipv6_url(url):
|
|
||||||
if not url.count(":") > 2: # Best way to detect them?
|
|
||||||
return url
|
|
||||||
# If there's a pair of []s in there, it's probably fine as is.
|
|
||||||
if "[" in url and "]" in url:
|
|
||||||
return url
|
|
||||||
# Easiest case is a raw address, no schema, no path.
|
|
||||||
# Just wrap it in square brackets and whack a slash on the end
|
|
||||||
if "/" not in url:
|
|
||||||
return "[" + url + "]/"
|
|
||||||
# Now the trickier cases...
|
|
||||||
if "://" in url:
|
|
||||||
schema, schemaless = url.split("://")
|
|
||||||
else:
|
|
||||||
schema, schemaless = None, url
|
|
||||||
if "/" in schemaless:
|
|
||||||
netloc, rest = schemaless.split("/",1)
|
|
||||||
schemaless = "[" + netloc + "]" + "/" + rest
|
|
||||||
if schema:
|
|
||||||
return schema + "://" + schemaless
|
|
||||||
return schemaless
|
|
Loading…
Reference in New Issue