Compare commits

..

No commits in common. "master" and "fix-cert-request" have entirely different histories.

10 changed files with 1677 additions and 2142 deletions

1654
av98.py Executable file

File diff suppressed because it is too large Load Diff

View File

@ -1,26 +0,0 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "AV-98"
dynamic = ["version"]
description = "Command line Gemini client"
authors = [{name="Solderpunk", email="solderpunk@posteo.net"}]
classifiers = [
"License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 3 :: Only",
"Topic :: Communications",
"Intended Audience :: End Users/Desktop",
"Environment :: Console",
"Development Status :: 5 - Production/Stable",
]
[project.urls]
Homepage = "https://tildegit.org/solderpunk/AV-98/"
Issues = "https://tildegit.org/solderpunk/AV-98/issues"
[project.scripts]
av98 = "av98.main:main"
[project.optional-dependencies]
tofu = ["cryptography"]
colour = ["ansiwrap"]
[tool.setuptools.dynamic]
version = {attr = "av98.__version__"}

23
setup.py Executable file
View File

@ -0,0 +1,23 @@
from setuptools import setup
setup(
name='AV-98',
version='1.0.2dev',
description="Command line Gemini client.",
author="Solderpunk",
author_email="solderpunk@sdf.org",
url='https://tildegit.org/solderpunk/AV-98/',
classifiers=[
'License :: OSI Approved :: BSD License',
'Programming Language :: Python :: 3 :: Only',
'Topic :: Communications',
'Intended Audience :: End Users/Desktop',
'Environment :: Console',
'Development Status :: 4 - Beta',
],
py_modules = ["av98"],
entry_points={
"console_scripts": ["av98=av98:main"]
},
install_requires=[],
)

View File

@ -1 +0,0 @@
__version__ = "1.1.0dev"

View File

@ -1,76 +0,0 @@
_MAX_CACHE_SIZE = 10
_MAX_CACHE_AGE_SECS = 180
import logging
import os
import os.path
import shutil
import tempfile
import time
ui_out = logging.getLogger("av98_logger")
class Cache:
def __init__(self):
self.cache = {}
self.cache_timestamps = {}
self.tempdir = tempfile.TemporaryDirectory()
def check(self, url):
if url not in self.cache:
return False
now = time.time()
cached = self.cache_timestamps[url]
if now - cached > _MAX_CACHE_AGE_SECS:
ui_out.debug("Expiring old cached copy of resource.")
self._remove(url)
return False
ui_out.debug("Found cached copy of resource.")
return True
def _remove(self, url):
self.cache_timestamps.pop(url)
mime, filename = self.cache.pop(url)
os.unlink(filename)
self.validatecache()
def add(self, url, mime, filename):
# Copy client's buffer file to new cache file
tmpf = tempfile.NamedTemporaryFile(dir=self.tempdir.name, delete=False)
tmpf.close()
shutil.copyfile(filename, tmpf.name)
# Remember details
self.cache_timestamps[url] = time.time()
self.cache[url] = (mime, tmpf.name)
if len(self.cache) > _MAX_CACHE_SIZE:
self._trim()
self.validatecache()
def _trim(self):
# Order cache entries by age
lru = [(t, u) for (u, t) in self.cache_timestamps.items()]
lru.sort()
# Drop the oldest entry no matter what
_, url = lru[0]
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
self._remove(url)
# Drop other entries if they are older than the limit
now = time.time()
for cached, url in lru[1:]:
if now - cached > _MAX_CACHE_AGE_SECS:
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
self._remove(url)
else:
break
self.validatecache()
def get(self, url):
return self.cache[url]
def validatecache(self):
assert self.cache.keys() == self.cache_timestamps.keys()
for _, filename in self.cache.values():
assert os.path.isfile(filename)

View File

@ -1,233 +0,0 @@
import glob
import logging
import os
import os.path
import uuid
import av98.util as util
ui_out = logging.getLogger("av98_logger")
class ClientCertificateManager:
def __init__(self, config_dir):
self.config_dir = config_dir
self.client_certs = {
"active": None
}
self.active_cert_domains = []
self.active_is_transient = False
self.transient_certs_created = []
def cleanup(self):
for cert in self.transient_certs_created:
for ext in (".crt", ".key"):
certfile = os.path.join(self.config_dir, "transient_certs", cert+ext)
if os.path.exists(certfile):
os.remove(certfile)
def manage(self):
if self.client_certs["active"]:
print("Active certificate: {}".format(self.client_certs["active"][0]))
print("1. Deactivate client certificate.")
print("2. Generate new certificate.")
print("3. Load previously generated certificate.")
print("4. Load externally created client certificate from file.")
print("Enter blank line to exit certificate manager.")
choice = input("> ").strip()
if choice == "1":
print("Deactivating client certificate.")
self._deactivate_client_cert()
elif choice == "2":
self._generate_persistent_client_cert()
elif choice == "3":
self._choose_client_cert()
elif choice == "4":
self._load_client_cert()
else:
print("Aborting.")
def associate_client_cert(self, context, gi):
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.client_certs["active"] and gi.host not in self.active_cert_domains:
if self.active_is_transient:
if util.ask_yes_no("Permanently delete currently active transient certificate?"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
return False
else:
if util.ask_yes_no("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?"):
print("Deactivating certificate.")
self._deactivate_client_cert()
else:
print("Keeping certificate active for {}".format(gi.host))
self.active_cert_domains.append(gi.host)
self.client_certs[gi.host] = self.client_certs["active"]
# Suggest reactivating previous certs
if not self.client_certs["active"] and gi.host in self.client_certs:
if util.ask_yes_no("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host)):
self._activate_client_cert(*self.client_certs[gi.host])
self.active_cert_domains.append(gi.host)
else:
print("Remaining unidentified.")
self.client_certs.pop(gi.host)
# Associate certs to context based on above
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
return True
def is_cert_active(self):
return self.client_certs["active"] != None
def handle_cert_request(self, meta, status, host):
# Don't do client cert stuff in restricted mode, as in principle
# it could be used to fill up the disk by creating a whole lot of
# certificates
print("SERVER SAYS: ", meta)
# Present different messages for different 6x statuses, but
# handle them the same.
if status in ("64", "65"):
print("The server rejected your certificate because it is either expired or not yet valid.")
elif status == "63":
print("The server did not accept your certificate.")
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
else:
print("The site {} is requesting a client certificate.".format(host))
print("This will allow the site to recognise you across requests.")
# Give the user choices
print("What do you want to do?")
print("1. Give up.")
print("2. Generate a new transient certificate.")
print("3. Generate a new persistent certificate.")
print("4. Load a previously generated certificate.")
print("5. Load a certificate from an external file.")
choice = input("> ").strip()
if choice == "2":
self._generate_transient_cert_cert()
elif choice == "3":
self._generate_persistent_client_cert()
elif choice == "4":
self._choose_client_cert()
elif choice == "5":
self._load_client_cert()
else:
print("Giving up.")
return False
if self.client_certs["active"]:
self.active_cert_domains.append(host)
self.client_certs[host] = self.client_certs["active"]
return True
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
format.
"""
print("Loading client certificate file, in PEM format (blank line to cancel)")
certfile = input("Certfile path: ").strip()
if not certfile:
print("Aborting.")
return
certfile = os.path.expanduser(certfile)
if not os.path.isfile(certfile):
print("Certificate file {} does not exist.".format(certfile))
return
print("Loading private key file, in PEM format (blank line to cancel)")
keyfile = input("Keyfile path: ").strip()
if not keyfile:
print("Aborting.")
return
keyfile = os.path.expanduser(keyfile)
if not os.path.isfile(keyfile):
print("Private key file {} does not exist.".format(keyfile))
return
self._activate_client_cert(certfile, keyfile)
def _generate_transient_cert_cert(self):
"""
Use `openssl` command to generate a new transient client certificate
with 24 hours of validity.
"""
certdir = os.path.join(self.config_dir, "transient_certs")
name = str(uuid.uuid4())
self._generate_client_cert(certdir, name, transient=True)
self.active_is_transient = True
self.transient_certs_created.append(name)
def _generate_persistent_client_cert(self):
"""
Interactively use `openssl` command to generate a new persistent client
certificate with one year of validity.
"""
certdir = os.path.join(self.config_dir, "client_certs")
print("What do you want to name this new certificate?")
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
name = input("> ")
if not name.strip():
print("Aborting.")
return
self._generate_client_cert(certdir, name)
def _generate_client_cert(self, certdir, basename, transient=False):
"""
Use `openssl` binary to generate a client certificate (which may be
transient or persistent) and save the certificate and private key to the
specified directory with the specified basename.
"""
if not os.path.exists(certdir):
os.makedirs(certdir)
certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, basename+".key")
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
if transient:
cmd += " -subj '/CN={}'".format(basename)
os.system(cmd)
self._activate_client_cert(certfile, keyfile)
def _choose_client_cert(self):
"""
Interactively select a previously generated client certificate and
activate it.
"""
certdir = os.path.join(self.config_dir, "client_certs")
certs = glob.glob(os.path.join(certdir, "*.crt"))
if len(certs) == 0:
print("There are no previously generated certificates.")
return
certdir = {}
for n, cert in enumerate(certs):
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
choice = input("> ").strip()
if choice in certdir:
certfile, keyfile = certdir[choice]
self._activate_client_cert(certfile, keyfile)
else:
print("What?")
def _activate_client_cert(self, certfile, keyfile):
self.client_certs["active"] = (certfile, keyfile)
self.active_cert_domains = []
ui_out.debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self):
if self.active_is_transient:
for filename in self.client_certs["active"]:
os.remove(filename)
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self.client_certs["active"] = None
self.active_cert_domains = []
self.active_is_transient = False

File diff suppressed because it is too large Load Diff

View File

@ -1,130 +0,0 @@
#!/usr/bin/env python3
# AV-98 Gemini client
# Dervied from VF-1 (https://github.com/solderpunk/VF-1),
# (C) 2019, 2020, 2023 Solderpunk <solderpunk@posteo.net>
# With contributions from:
# - danceka <hannu.hartikainen@gmail.com>
# - <jprjr@tilde.club>
# - <vee@vnsf.xyz>
# - Klaus Alexander Seistrup <klaus@seistrup.dk>
# - govynnus <govynnus@sdf.org>
# - Nik <nic@tilde.team>
# - <sario528@ctrl-c.club>
# - rmgr
# - Aleksey Ryndin
import argparse
import os.path
import shutil
import sys
from av98 import __version__
from av98.client import GeminiClient
def main():
# Parse args
parser = argparse.ArgumentParser(description='A command line gemini client.')
parser.add_argument('--bookmarks', action='store_true',
help='start with your list of bookmarks')
parser.add_argument('--dl', '--download', action='store_true',
help='download a single URL and quit')
parser.add_argument('-o', '--output', metavar='FILE',
help='filename to save --dl URL to')
parser.add_argument('--tls-cert', metavar='FILE', help='TLS client certificate file')
parser.add_argument('--tls-key', metavar='FILE', help='TLS client certificate private key file')
parser.add_argument('--restricted', action="store_true", help='Disallow shell, add, and save commands')
parser.add_argument('--version', action='store_true',
help='display version information and quit')
parser.add_argument('url', metavar='URL', nargs='*',
help='start with this URL')
args = parser.parse_args()
# Handle --version
if args.version:
print("AV-98 " + __version__)
sys.exit()
# Instantiate client
gc = GeminiClient(args.restricted)
# Activate client certs now in case they are needed for --download below
if args.tls_cert and args.tls_key:
gc.client_cert_manager._activate_client_cert(args.tls_cert, args.tls_key)
for url in args.url:
gi = GeminiItem(url)
gc.client_cert_manager.active_cert_domains.append(gi.host)
# Handle --download
if args.dl:
gc.onecmd("set debug True")
# Download
gi = GeminiItem(args.url[0])
gi, mime = gc._fetch_over_network(gi)
# Decide on a filename
if args.output:
filename = args.output
else:
if mime == "text/gemini":
# Parse gemtext in the hopes of getting a gi.name for the filename
gc.active_raw_file = gc.raw_file_buffer
gc._handle_gemtext(gi)
filename = gi.derive_filename(mime)
# Copy from temp file to pwd with a nice name
shutil.copyfile(gc.raw_file_buffer, filename)
size = os.path.getsize(filename)
# Notify user where the file ended up
print("Wrote %d byte %s response to %s." % (size, mime, filename))
gc.do_quit()
sys.exit()
# Process config file
rcfile = os.path.join(gc.config_dir, "av98rc")
if os.path.exists(rcfile):
print("Using config %s" % rcfile)
with open(rcfile, "r") as fp:
for line in fp:
line = line.strip()
if ((args.bookmarks or args.url) and
any((line.startswith(x) for x in ("go", "g", "tour", "t")))
):
if args.bookmarks:
print("Skipping rc command \"%s\" due to --bookmarks option." % line)
else:
print("Skipping rc command \"%s\" due to provided URLs." % line)
continue
gc.cmdqueue.append(line)
# Say hi
print("Welcome to AV-98!")
if args.restricted:
print("Restricted mode engaged!")
print("Enjoy your patrol through Geminispace...")
# Add commands to the queue based on command line arguments
if args.bookmarks:
gc.cmdqueue.append("bookmarks")
elif args.url:
if len(args.url) == 1:
gc.cmdqueue.append("go %s" % args.url[0])
else:
for url in args.url:
if not url.startswith("gemini://"):
url = "gemini://" + url
gc.cmdqueue.append("tour %s" % url)
gc.cmdqueue.append("tour")
# Endless interpret loop until user quits
while True:
try:
gc.cmdloop()
break
except KeyboardInterrupt:
print("")
# Say goodbye
print()
print("Thank you for patrolling with AV-98!")
sys.exit()
if __name__ == '__main__':
main()

View File

@ -1,210 +0,0 @@
import datetime
import hashlib
import logging
import os
import os.path
import sqlite3
import ssl
import time
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except ModuleNotFoundError:
_HAS_CRYPTOGRAPHY = False
import av98.util as util
ui_out = logging.getLogger("av98_logger")
class TofuStore:
def __init__(self, config_dir):
self.config_dir = config_dir
self.certdir = os.path.join(config_dir, "cert_cache")
if not os.path.exists(self.certdir):
os.makedirs(self.certdir)
db_path = os.path.join(self.config_dir, "tofu.db")
self.db_conn = sqlite3.connect(db_path)
self.db_cur = self.db_conn.cursor()
self.create_db()
self.update_db()
def create_db(self):
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, port integer, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
def update_db(self):
# Update 1 - check for port column
try:
self.db_cur.execute("""SELECT port FROM cert_cache where 1=0""")
has_port = True
except sqlite3.OperationalError:
has_port = False
if not has_port:
self.db_cur.execute("""ALTER TABLE cert_cache ADD COLUMN port integer""")
self.db_cur.execute("""UPDATE cert_cache SET port= 1965 WHERE count > 0""")
def close(self):
self.db_conn.commit()
self.db_conn.close()
def validate_cert(self, address, port, host, cert):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this hostname and
port
"""
now = datetime.datetime.utcnow()
# Do 'advanced' checks if Cryptography library is installed
if _HAS_CRYPTOGRAPHY:
self.check_cert_expiry_and_names(cert, host, now)
# Compute SHA256 fingerprint
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
# Have we been here before?
self.db_cur.execute("""SELECT fingerprint, address, first_seen, last_seen, count
FROM cert_cache WHERE hostname=? AND port=?""", (host, port))
cached_certs = self.db_cur.fetchall()
# If not, cache this first cert and we're done
if not cached_certs:
ui_out.debug("TOFU: Blindly trusting first ever certificate for this host!")
self.cache_new_cert(cert, host, port, address, fingerprint, now)
return
# If we have, check the received cert against the cache
if self.find_cert_in_cache(host, port, fingerprint, cached_certs, now):
return
# Handle an unrecognised cert
ui_out.debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
## Find the most recently seen previous cert for reporting
most_recent = None
for cached_fingerprint, cached_address, first, last, count in cached_certs:
if not most_recent or last > most_recent:
most_recent = last
most_recent_cert = cached_fingerprint
most_recent_address = cached_address
most_recent_count = count
## Report the situation
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {}:{} ({}) has never been seen before.".format(host, port, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(most_recent_count))
if _HAS_CRYPTOGRAPHY:
previous_ttl = self.get_cached_cert_expiry(most_recent_cert) - now
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
if most_recent_address == address:
print("The new certificate is being served from the same IP address as the previous one.")
else:
print("The new certificate is being served from a DIFFERNET IP address as the previous one.")
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
## Ask the question
if util.ask_yes_no("Accept this new certificate?"):
self.cache_new_cert(cert, host, port, address, fingerprint, now)
else:
raise Exception("TOFU Failure!")
def cache_new_cert(self, cert, host, port, address, fingerprint, now):
"""
Accept a new certificate for a given host/port combo.
"""
# Save cert to disk
with open(os.path.join(self.certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
# Record in DB
self.db_cur.execute("""INSERT INTO cert_cache
(hostname, port, address, fingerprint, first_seen, last_seen, count)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(host, port, address, fingerprint, now, now, 1))
self.db_conn.commit()
def check_cert_expiry_and_names(self, cert, host, now):
"""
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
"""
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if c.not_valid_before >= now:
raise ssl.CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise ssl.CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(name, host)
break
except Exception:
continue
else:
# If we didn't break out, none of the names were valid
raise ssl.CertificateError("Hostname does not match certificate common name or any alternative names.")
def find_cert_in_cache(self, host, port, fingerprint, cached_certs, now):
"""
Try to find a cached certificate for the given host:port matching the
given fingerprint. If one is found, update the "last seen" DB value.
"""
for cached_fingerprint, cached_address, first, last, count in cached_certs:
if fingerprint == cached_fingerprint:
# Matched!
ui_out.debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
self.db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND port=? AND fingerprint=?""",
(now, count+1, host, port, fingerprint))
self.db_conn.commit()
return True
return False
def get_cached_cert_expiry(self, fingerprint):
"""
Parse the stored certificate with a given fingerprint and return its
expiry date.
"""
with open(os.path.join(self.certdir, fingerprint+".crt"), "rb") as fp:
previous_cert = fp.read()
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
return previous_cert.not_valid_after

View File

@ -1,50 +0,0 @@
import os.path
# Cheap and cheerful URL detector
def looks_like_url(word):
return "." in word and word.startswith("gemini://")
def handle_filename_collisions(filename):
while os.path.exists(filename):
print("File %s already exists!" % filename)
filename = input("Choose a new one, or leave blank to abort: ")
return filename
def ask_yes_no(prompt, default=None):
print(prompt)
if default == True:
prompt = "(Y)/N: "
elif default == False:
prompt = "Y/(N): "
else:
prompt = "Y/N: "
while True:
resp = input(prompt)
if not resp.strip() and default != None:
return default
elif resp.strip().lower() in ("y", "yes"):
return True
elif resp.strip().lower() in ("n","no"):
return False
def fix_ipv6_url(url):
if not url.count(":") > 2: # Best way to detect them?
return url
# If there's a pair of []s in there, it's probably fine as is.
if "[" in url and "]" in url:
return url
# Easiest case is a raw address, no schema, no path.
# Just wrap it in square brackets and whack a slash on the end
if "/" not in url:
return "[" + url + "]/"
# Now the trickier cases...
if "://" in url:
schema, schemaless = url.split("://")
else:
schema, schemaless = None, url
if "/" in schemaless:
netloc, rest = schemaless.split("/",1)
schemaless = "[" + netloc + "]" + "/" + rest
if schema:
return schema + "://" + schemaless
return schemaless