Merge branch 'master' of tildegit.org:solderpunk/AV-98

This commit is contained in:
Alexey Ryndin 2023-12-06 11:58:46 +03:00
commit 474787dab7
10 changed files with 2092 additions and 1677 deletions

1654
av98.py

File diff suppressed because it is too large Load Diff

26
pyproject.toml Executable file
View File

@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "AV-98"
dynamic = ["version"]
description = "Command line Gemini client"
authors = [{name="Solderpunk", email="solderpunk@posteo.net"}]
classifiers = [
"License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 3 :: Only",
"Topic :: Communications",
"Intended Audience :: End Users/Desktop",
"Environment :: Console",
"Development Status :: 5 - Production/Stable",
]
[project.urls]
Homepage = "https://tildegit.org/solderpunk/AV-98/"
Issues = "https://tildegit.org/solderpunk/AV-98/issues"
[project.scripts]
av98 = "av98.main:main"
[project.optional-dependencies]
tofu = ["cryptography"]
colour = ["ansiwrap"]
[tool.setuptools.dynamic]
version = {attr = "av98.__version__"}

View File

@ -1,23 +0,0 @@
from setuptools import setup
setup(
name='AV-98',
version='1.0.2dev',
description="Command line Gemini client.",
author="Solderpunk",
author_email="solderpunk@sdf.org",
url='https://tildegit.org/solderpunk/AV-98/',
classifiers=[
'License :: OSI Approved :: BSD License',
'Programming Language :: Python :: 3 :: Only',
'Topic :: Communications',
'Intended Audience :: End Users/Desktop',
'Environment :: Console',
'Development Status :: 4 - Beta',
],
py_modules = ["av98"],
entry_points={
"console_scripts": ["av98=av98:main"]
},
install_requires=[],
)

1
src/av98/__init__.py Normal file
View File

@ -0,0 +1 @@
__version__ = "1.1.0dev"

76
src/av98/cache.py Normal file
View File

@ -0,0 +1,76 @@
_MAX_CACHE_SIZE = 10
_MAX_CACHE_AGE_SECS = 180
import logging
import os
import os.path
import shutil
import tempfile
import time
ui_out = logging.getLogger("av98_logger")
class Cache:
def __init__(self):
self.cache = {}
self.cache_timestamps = {}
self.tempdir = tempfile.TemporaryDirectory()
def check(self, url):
if url not in self.cache:
return False
now = time.time()
cached = self.cache_timestamps[url]
if now - cached > _MAX_CACHE_AGE_SECS:
ui_out.debug("Expiring old cached copy of resource.")
self._remove(url)
return False
ui_out.debug("Found cached copy of resource.")
return True
def _remove(self, url):
self.cache_timestamps.pop(url)
mime, filename = self.cache.pop(url)
os.unlink(filename)
self.validatecache()
def add(self, url, mime, filename):
# Copy client's buffer file to new cache file
tmpf = tempfile.NamedTemporaryFile(dir=self.tempdir.name, delete=False)
tmpf.close()
shutil.copyfile(filename, tmpf.name)
# Remember details
self.cache_timestamps[url] = time.time()
self.cache[url] = (mime, tmpf.name)
if len(self.cache) > _MAX_CACHE_SIZE:
self._trim()
self.validatecache()
def _trim(self):
# Order cache entries by age
lru = [(t, u) for (u, t) in self.cache_timestamps.items()]
lru.sort()
# Drop the oldest entry no matter what
_, url = lru[0]
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
self._remove(url)
# Drop other entries if they are older than the limit
now = time.time()
for cached, url in lru[1:]:
if now - cached > _MAX_CACHE_AGE_SECS:
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
self._remove(url)
else:
break
self.validatecache()
def get(self, url):
return self.cache[url]
def validatecache(self):
assert self.cache.keys() == self.cache_timestamps.keys()
for _, filename in self.cache.values():
assert os.path.isfile(filename)

233
src/av98/certmanager.py Normal file
View File

@ -0,0 +1,233 @@
import glob
import logging
import os
import os.path
import uuid
import av98.util as util
ui_out = logging.getLogger("av98_logger")
class ClientCertificateManager:
def __init__(self, config_dir):
self.config_dir = config_dir
self.client_certs = {
"active": None
}
self.active_cert_domains = []
self.active_is_transient = False
self.transient_certs_created = []
def cleanup(self):
for cert in self.transient_certs_created:
for ext in (".crt", ".key"):
certfile = os.path.join(self.config_dir, "transient_certs", cert+ext)
if os.path.exists(certfile):
os.remove(certfile)
def manage(self):
if self.client_certs["active"]:
print("Active certificate: {}".format(self.client_certs["active"][0]))
print("1. Deactivate client certificate.")
print("2. Generate new certificate.")
print("3. Load previously generated certificate.")
print("4. Load externally created client certificate from file.")
print("Enter blank line to exit certificate manager.")
choice = input("> ").strip()
if choice == "1":
print("Deactivating client certificate.")
self._deactivate_client_cert()
elif choice == "2":
self._generate_persistent_client_cert()
elif choice == "3":
self._choose_client_cert()
elif choice == "4":
self._load_client_cert()
else:
print("Aborting.")
def associate_client_cert(self, context, gi):
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.client_certs["active"] and gi.host not in self.active_cert_domains:
if self.active_is_transient:
if util.ask_yes_no("Permanently delete currently active transient certificate?"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
return False
else:
if util.ask_yes_no("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?"):
print("Deactivating certificate.")
self._deactivate_client_cert()
else:
print("Keeping certificate active for {}".format(gi.host))
self.active_cert_domains.append(gi.host)
self.client_certs[gi.host] = self.client_certs["active"]
# Suggest reactivating previous certs
if not self.client_certs["active"] and gi.host in self.client_certs:
if util.ask_yes_no("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host)):
self._activate_client_cert(*self.client_certs[gi.host])
self.active_cert_domains.append(gi.host)
else:
print("Remaining unidentified.")
self.client_certs.pop(gi.host)
# Associate certs to context based on above
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
return True
def is_cert_active(self):
return self.client_certs["active"] != None
def handle_cert_request(self, meta, status, host):
# Don't do client cert stuff in restricted mode, as in principle
# it could be used to fill up the disk by creating a whole lot of
# certificates
print("SERVER SAYS: ", meta)
# Present different messages for different 6x statuses, but
# handle them the same.
if status in ("64", "65"):
print("The server rejected your certificate because it is either expired or not yet valid.")
elif status == "63":
print("The server did not accept your certificate.")
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
else:
print("The site {} is requesting a client certificate.".format(host))
print("This will allow the site to recognise you across requests.")
# Give the user choices
print("What do you want to do?")
print("1. Give up.")
print("2. Generate a new transient certificate.")
print("3. Generate a new persistent certificate.")
print("4. Load a previously generated certificate.")
print("5. Load a certificate from an external file.")
choice = input("> ").strip()
if choice == "2":
self._generate_transient_cert_cert()
elif choice == "3":
self._generate_persistent_client_cert()
elif choice == "4":
self._choose_client_cert()
elif choice == "5":
self._load_client_cert()
else:
print("Giving up.")
return False
if self.client_certs["active"]:
self.active_cert_domains.append(host)
self.client_certs[host] = self.client_certs["active"]
return True
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
format.
"""
print("Loading client certificate file, in PEM format (blank line to cancel)")
certfile = input("Certfile path: ").strip()
if not certfile:
print("Aborting.")
return
certfile = os.path.expanduser(certfile)
if not os.path.isfile(certfile):
print("Certificate file {} does not exist.".format(certfile))
return
print("Loading private key file, in PEM format (blank line to cancel)")
keyfile = input("Keyfile path: ").strip()
if not keyfile:
print("Aborting.")
return
keyfile = os.path.expanduser(keyfile)
if not os.path.isfile(keyfile):
print("Private key file {} does not exist.".format(keyfile))
return
self._activate_client_cert(certfile, keyfile)
def _generate_transient_cert_cert(self):
"""
Use `openssl` command to generate a new transient client certificate
with 24 hours of validity.
"""
certdir = os.path.join(self.config_dir, "transient_certs")
name = str(uuid.uuid4())
self._generate_client_cert(certdir, name, transient=True)
self.active_is_transient = True
self.transient_certs_created.append(name)
def _generate_persistent_client_cert(self):
"""
Interactively use `openssl` command to generate a new persistent client
certificate with one year of validity.
"""
certdir = os.path.join(self.config_dir, "client_certs")
print("What do you want to name this new certificate?")
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
name = input("> ")
if not name.strip():
print("Aborting.")
return
self._generate_client_cert(certdir, name)
def _generate_client_cert(self, certdir, basename, transient=False):
"""
Use `openssl` binary to generate a client certificate (which may be
transient or persistent) and save the certificate and private key to the
specified directory with the specified basename.
"""
if not os.path.exists(certdir):
os.makedirs(certdir)
certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, basename+".key")
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
if transient:
cmd += " -subj '/CN={}'".format(basename)
os.system(cmd)
self._activate_client_cert(certfile, keyfile)
def _choose_client_cert(self):
"""
Interactively select a previously generated client certificate and
activate it.
"""
certdir = os.path.join(self.config_dir, "client_certs")
certs = glob.glob(os.path.join(certdir, "*.crt"))
if len(certs) == 0:
print("There are no previously generated certificates.")
return
certdir = {}
for n, cert in enumerate(certs):
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
choice = input("> ").strip()
if choice in certdir:
certfile, keyfile = certdir[choice]
self._activate_client_cert(certfile, keyfile)
else:
print("What?")
def _activate_client_cert(self, certfile, keyfile):
self.client_certs["active"] = (certfile, keyfile)
self.active_cert_domains = []
ui_out.debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self):
if self.active_is_transient:
for filename in self.client_certs["active"]:
os.remove(filename)
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self.client_certs["active"] = None
self.active_cert_domains = []
self.active_is_transient = False

1416
src/av98/client.py Executable file

File diff suppressed because it is too large Load Diff

130
src/av98/main.py Executable file
View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
# AV-98 Gemini client
# Dervied from VF-1 (https://github.com/solderpunk/VF-1),
# (C) 2019, 2020, 2023 Solderpunk <solderpunk@posteo.net>
# With contributions from:
# - danceka <hannu.hartikainen@gmail.com>
# - <jprjr@tilde.club>
# - <vee@vnsf.xyz>
# - Klaus Alexander Seistrup <klaus@seistrup.dk>
# - govynnus <govynnus@sdf.org>
# - Nik <nic@tilde.team>
# - <sario528@ctrl-c.club>
# - rmgr
# - Aleksey Ryndin
import argparse
import os.path
import shutil
import sys
from av98 import __version__
from av98.client import GeminiClient
def main():
# Parse args
parser = argparse.ArgumentParser(description='A command line gemini client.')
parser.add_argument('--bookmarks', action='store_true',
help='start with your list of bookmarks')
parser.add_argument('--dl', '--download', action='store_true',
help='download a single URL and quit')
parser.add_argument('-o', '--output', metavar='FILE',
help='filename to save --dl URL to')
parser.add_argument('--tls-cert', metavar='FILE', help='TLS client certificate file')
parser.add_argument('--tls-key', metavar='FILE', help='TLS client certificate private key file')
parser.add_argument('--restricted', action="store_true", help='Disallow shell, add, and save commands')
parser.add_argument('--version', action='store_true',
help='display version information and quit')
parser.add_argument('url', metavar='URL', nargs='*',
help='start with this URL')
args = parser.parse_args()
# Handle --version
if args.version:
print("AV-98 " + __version__)
sys.exit()
# Instantiate client
gc = GeminiClient(args.restricted)
# Activate client certs now in case they are needed for --download below
if args.tls_cert and args.tls_key:
gc.client_cert_manager._activate_client_cert(args.tls_cert, args.tls_key)
for url in args.url:
gi = GeminiItem(url)
gc.client_cert_manager.active_cert_domains.append(gi.host)
# Handle --download
if args.dl:
gc.onecmd("set debug True")
# Download
gi = GeminiItem(args.url[0])
gi, mime = gc._fetch_over_network(gi)
# Decide on a filename
if args.output:
filename = args.output
else:
if mime == "text/gemini":
# Parse gemtext in the hopes of getting a gi.name for the filename
gc.active_raw_file = gc.raw_file_buffer
gc._handle_gemtext(gi)
filename = gi.derive_filename(mime)
# Copy from temp file to pwd with a nice name
shutil.copyfile(gc.raw_file_buffer, filename)
size = os.path.getsize(filename)
# Notify user where the file ended up
print("Wrote %d byte %s response to %s." % (size, mime, filename))
gc.do_quit()
sys.exit()
# Process config file
rcfile = os.path.join(gc.config_dir, "av98rc")
if os.path.exists(rcfile):
print("Using config %s" % rcfile)
with open(rcfile, "r") as fp:
for line in fp:
line = line.strip()
if ((args.bookmarks or args.url) and
any((line.startswith(x) for x in ("go", "g", "tour", "t")))
):
if args.bookmarks:
print("Skipping rc command \"%s\" due to --bookmarks option." % line)
else:
print("Skipping rc command \"%s\" due to provided URLs." % line)
continue
gc.cmdqueue.append(line)
# Say hi
print("Welcome to AV-98!")
if args.restricted:
print("Restricted mode engaged!")
print("Enjoy your patrol through Geminispace...")
# Add commands to the queue based on command line arguments
if args.bookmarks:
gc.cmdqueue.append("bookmarks")
elif args.url:
if len(args.url) == 1:
gc.cmdqueue.append("go %s" % args.url[0])
else:
for url in args.url:
if not url.startswith("gemini://"):
url = "gemini://" + url
gc.cmdqueue.append("tour %s" % url)
gc.cmdqueue.append("tour")
# Endless interpret loop until user quits
while True:
try:
gc.cmdloop()
break
except KeyboardInterrupt:
print("")
# Say goodbye
print()
print("Thank you for patrolling with AV-98!")
sys.exit()
if __name__ == '__main__':
main()

160
src/av98/tofu.py Normal file
View File

@ -0,0 +1,160 @@
import datetime
import hashlib
import logging
import os
import os.path
import sqlite3
import ssl
import time
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except ModuleNotFoundError:
_HAS_CRYPTOGRAPHY = False
ui_out = logging.getLogger("av98_logger")
class TofuStore:
def __init__(self, config_dir):
self.config_dir = config_dir
db_path = os.path.join(self.config_dir, "tofu.db")
self.db_conn = sqlite3.connect(db_path)
self.db_cur = self.db_conn.cursor()
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
def close(self):
self.db_conn.commit()
self.db_conn.close()
def validate_cert(self, address, host, cert):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if c.not_valid_before >= now:
raise ssl.CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise ssl.CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(name, host)
break
except Exception:
continue
else:
# If we didn't break out, none of the names were valid
raise ssl.CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
# Have we been here before?
self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = self.db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
ui_out.debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
self.db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
self.db_conn.commit()
break
else:
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
certdir = os.path.join(self.config_dir, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
ui_out.debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
ui_out.debug("TOFU: Blindly trusting first ever certificate for this host!")
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
self.db_conn.commit()
certdir = os.path.join(self.config_dir, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)

50
src/av98/util.py Normal file
View File

@ -0,0 +1,50 @@
import os.path
# Cheap and cheerful URL detector
def looks_like_url(word):
return "." in word and word.startswith("gemini://")
def handle_filename_collisions(filename):
while os.path.exists(filename):
print("File %s already exists!" % filename)
filename = input("Choose a new one, or leave blank to abort: ")
return filename
def ask_yes_no(prompt, default=None):
print(prompt)
if default == True:
prompt = "(Y)/N: "
elif default == False:
prompt = "Y/(N): "
else:
prompt = "Y/N: "
while True:
resp = input(prompt)
if not resp.strip() and default != None:
return default
elif resp.strip().lower() in ("y", "yes"):
return True
elif resp.strip().lower() in ("n","no"):
return False
def fix_ipv6_url(url):
if not url.count(":") > 2: # Best way to detect them?
return url
# If there's a pair of []s in there, it's probably fine as is.
if "[" in url and "]" in url:
return url
# Easiest case is a raw address, no schema, no path.
# Just wrap it in square brackets and whack a slash on the end
if "/" not in url:
return "[" + url + "]/"
# Now the trickier cases...
if "://" in url:
schema, schemaless = url.split("://")
else:
schema, schemaless = None, url
if "/" in schemaless:
netloc, rest = schemaless.split("/",1)
schemaless = "[" + netloc + "]" + "/" + rest
if schema:
return schema + "://" + schemaless
return schemaless