921 lines
34 KiB
Python
Executable File
921 lines
34 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Alphonse "containerised" Gemini client
|
|
# Dervied from AV-98 (https://github.com/solderpunk/AV-98),
|
|
# (C) 2020 Solderpunk <solderpunk@sdf.org>
|
|
|
|
import argparse
|
|
import cmd
|
|
import cgi
|
|
import codecs
|
|
import collections
|
|
import datetime
|
|
import fnmatch
|
|
import getpass
|
|
import glob
|
|
import hashlib
|
|
import io
|
|
import mimetypes
|
|
import os
|
|
import os.path
|
|
import random
|
|
import shlex
|
|
import shutil
|
|
import socket
|
|
import sqlite3
|
|
import ssl
|
|
from ssl import CertificateError
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.parse
|
|
import uuid
|
|
import webbrowser
|
|
|
|
try:
|
|
import ansiwrap as textwrap
|
|
except ModuleNotFoundError:
|
|
import textwrap
|
|
|
|
try:
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
_HAS_CRYPTOGRAPHY = True
|
|
_BACKEND = default_backend()
|
|
except ModuleNotFoundError:
|
|
_HAS_CRYPTOGRAPHY = False
|
|
|
|
_VERSION = "1.0.2dev"
|
|
|
|
_MAX_REDIRECTS = 5
|
|
|
|
# Command abbreviations
|
|
_ABBREVS = {
|
|
"b": "back",
|
|
"fo": "forward",
|
|
"h": "history",
|
|
"hist": "history",
|
|
"l": "less",
|
|
"n": "next",
|
|
"p": "previous",
|
|
"prev": "previous",
|
|
"q": "quit",
|
|
"r": "reload",
|
|
"se": "search",
|
|
"/": "search",
|
|
"u": "up",
|
|
}
|
|
|
|
# monkey-patch Gemini support in urllib.parse
|
|
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
|
|
urllib.parse.uses_relative.append("gemini")
|
|
urllib.parse.uses_netloc.append("gemini")
|
|
|
|
|
|
def fix_ipv6_url(url):
|
|
if not url.count(":") > 2: # Best way to detect them?
|
|
return url
|
|
# If there's a pair of []s in there, it's probably fine as is.
|
|
if "[" in url and "]" in url:
|
|
return url
|
|
# Easiest case is a raw address, no schema, no path.
|
|
# Just wrap it in square brackets and whack a slash on the end
|
|
if "/" not in url:
|
|
return "[" + url + "]/"
|
|
# Now the trickier cases...
|
|
if "://" in url:
|
|
schema, schemaless = url.split("://")
|
|
else:
|
|
schema, schemaless = None, url
|
|
if "/" in schemaless:
|
|
netloc, rest = schemaless.split("/",1)
|
|
schemaless = "[" + netloc + "]" + "/" + rest
|
|
if schema:
|
|
return schema + "://" + schemaless
|
|
return schemaless
|
|
|
|
standard_ports = {
|
|
"gemini": 1965,
|
|
}
|
|
|
|
class GeminiItem():
|
|
|
|
def __init__(self, url, name=""):
|
|
if "://" not in url:
|
|
url = "gemini://" + url
|
|
self.url = fix_ipv6_url(url)
|
|
self.name = name
|
|
parsed = urllib.parse.urlparse(self.url)
|
|
self.scheme = parsed.scheme
|
|
self.host = parsed.hostname
|
|
self.port = parsed.port or standard_ports.get(self.scheme, 0)
|
|
self.path = parsed.path
|
|
|
|
def root(self):
|
|
return GeminiItem(self._derive_url("/"))
|
|
|
|
def up(self):
|
|
pathbits = list(os.path.split(self.path.rstrip('/')))
|
|
# Don't try to go higher than root
|
|
if len(pathbits) == 1:
|
|
return self
|
|
# Get rid of bottom component
|
|
pathbits.pop()
|
|
new_path = os.path.join(*pathbits)
|
|
return GeminiItem(self._derive_url(new_path))
|
|
|
|
def query(self, query):
|
|
query = urllib.parse.quote(query)
|
|
return GeminiItem(self._derive_url(query=query))
|
|
|
|
def _derive_url(self, path="", query=""):
|
|
"""
|
|
A thin wrapper around urlunparse which avoids inserting standard ports
|
|
into URLs just to keep things clean.
|
|
"""
|
|
return urllib.parse.urlunparse((self.scheme,
|
|
self.host if self.port == standard_ports[self.scheme] else self.host + ":" + str(self.port),
|
|
path or self.path, "", query, ""))
|
|
|
|
def absolutise_url(self, relative_url):
|
|
"""
|
|
Convert a relative URL to an absolute URL by using the URL of this
|
|
GeminiItem as a base.
|
|
"""
|
|
return urllib.parse.urljoin(self.url, relative_url)
|
|
|
|
def to_map_line(self, name=None):
|
|
if name or self.name:
|
|
return "=> {} {}\n".format(self.url, name or self.name)
|
|
else:
|
|
return "=> {}\n".format(self.url)
|
|
|
|
@classmethod
|
|
def from_map_line(cls, line, origin_gi):
|
|
assert line.startswith("=>")
|
|
assert line[2:].strip()
|
|
bits = line[2:].strip().split(maxsplit=1)
|
|
bits[0] = origin_gi.absolutise_url(bits[0])
|
|
return cls(*bits)
|
|
|
|
CRLF = '\r\n'
|
|
|
|
# Cheap and cheerful URL detector
|
|
def looks_like_url(word):
|
|
return "." in word and word.startswith("gemini://")
|
|
|
|
# GeminiClient Decorators
|
|
def needs_gi(inner):
|
|
def outer(self, *args, **kwargs):
|
|
if not self.gi:
|
|
print("You need to 'go' somewhere, first")
|
|
return None
|
|
else:
|
|
return inner(self, *args, **kwargs)
|
|
outer.__doc__ = inner.__doc__
|
|
return outer
|
|
|
|
class GeminiClient(cmd.Cmd):
|
|
|
|
def __init__(self, initial_url, keyfile, certfile):
|
|
cmd.Cmd.__init__(self)
|
|
|
|
if "://" not in initial_url:
|
|
initial_url = "gemini://" + initial_url
|
|
self.initial_url = initial_url
|
|
parsed = urllib.parse.urlparse(self.initial_url)
|
|
self.bound_host = parsed.hostname
|
|
self.uppermost_path = parsed.path
|
|
print(self.bound_host, self.uppermost_path)
|
|
# Find config directory
|
|
## Look for something pre-existing
|
|
for confdir in ("~/.alphonse/", "~/.config/alphonse/"):
|
|
confdir = os.path.expanduser(confdir)
|
|
if os.path.exists(confdir):
|
|
self.config_dir = confdir
|
|
break
|
|
## Otherwise, make one in .config if it exists
|
|
else:
|
|
if os.path.exists(os.path.expanduser("~/.config/")):
|
|
self.config_dir = os.path.expanduser("~/.config/alphonse/")
|
|
else:
|
|
self.config_dir = os.path.expanduser("~/.alphonse/")
|
|
print("Creating config directory {}".format(self.config_dir))
|
|
os.makedirs(self.config_dir)
|
|
|
|
self.no_cert_prompt = "\x1b[38;5;76m" + "Alphonse" + "\x1b[38;5;255m" + "> " + "\x1b[0m"
|
|
self.cert_prompt = "\x1b[38;5;202m" + "Alphonse" + "\x1b[38;5;255m" + "+cert> " + "\x1b[0m"
|
|
self.prompt = self.no_cert_prompt
|
|
self.gi = None
|
|
self.history = []
|
|
self.hist_index = 0
|
|
self.index = []
|
|
self.index_index = -1
|
|
self.lookup = self.index
|
|
self.page_index = 0
|
|
self.permanent_redirects = {}
|
|
self.previous_redirectors = set()
|
|
|
|
self.options = {
|
|
"debug" : True,
|
|
"ipv6" : True,
|
|
"timeout" : 10,
|
|
"width" : 80,
|
|
"auto_follow_redirects" : True,
|
|
"tls_mode" : "tofu",
|
|
}
|
|
|
|
# Prepare TLS context
|
|
protocol = ssl.PROTOCOL_TLS if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
|
|
self.tls_context = ssl.SSLContext(protocol)
|
|
# Use CAs or TOFU
|
|
if self.options["tls_mode"] == "ca":
|
|
self.tls_context.verify_mode = ssl.CERT_REQUIRED
|
|
self.tls_context.check_hostname = True
|
|
self.tls_context.load_default_certs()
|
|
else:
|
|
self.tls_context.check_hostname = False
|
|
self.tls_context.verify_mode = ssl.CERT_NONE
|
|
# Impose minimum TLS version
|
|
## In 3.7 and above, this is easy...
|
|
if sys.version_info.minor >= 7:
|
|
self.tls_context.minimum_version = ssl.TLSVersion.TLSv1_2
|
|
## Otherwise, it seems very hard...
|
|
## The below is less strict than it ought to be, but trying to disable
|
|
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
|
|
## with recent versions of OpenSSL. What a mess...
|
|
else:
|
|
self.tls_context.options |= ssl.OP_NO_SSLv3
|
|
self.tls_context.options |= ssl.OP_NO_SSLv2
|
|
# Try to enforce sensible ciphers
|
|
try:
|
|
self.tls_context.set_ciphers("AESGCM:AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
|
|
except ssl.SSLError:
|
|
# Rely on the server to only support sensible things, I guess...
|
|
pass
|
|
# Load client certificate
|
|
self.tls_context.load_cert_chain(certfile, keyfile)
|
|
|
|
# Set umask so that nothing we create can be read by anybody else.
|
|
# The certificate cache and TOFU database contain "browser history"
|
|
# type sensitivie information.
|
|
os.umask(0o077)
|
|
self._connect_to_tofu_db()
|
|
|
|
self.do_root()
|
|
|
|
def _connect_to_tofu_db(self):
|
|
|
|
db_path = os.path.join(self.config_dir, "tofu.db")
|
|
self.db_conn = sqlite3.connect(db_path)
|
|
self.db_cur = self.db_conn.cursor()
|
|
|
|
self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
|
|
(hostname text, address text, fingerprint text,
|
|
first_seen date, last_seen date, count integer)""")
|
|
|
|
def _go_to_gi(self, gi, update_hist=True, handle=True):
|
|
"""This method might be considered "the heart of Alphonse".
|
|
Everything involved in fetching a gemini resource happens here:
|
|
sending the request over the network, parsing the response if
|
|
its a menu, storing the response in a temporary file, choosing
|
|
and calling a handler program, and updating the history."""
|
|
if gi.host != self.bound_host:
|
|
print("Refusing to visit new domain: {}".format(gi.host))
|
|
return
|
|
if not gi.path.startswith(self.uppermost_path):
|
|
print("Refusing to leave path-prefix {}".format(self.uppermost_path))
|
|
return
|
|
if gi.scheme != "gemini":
|
|
print("Refusing to follow non-Gemini link.")
|
|
return
|
|
|
|
# Obey permanent redirects
|
|
if gi.url in self.permanent_redirects:
|
|
new_gi = GeminiItem(self.permanent_redirects[gi.url], name=gi.name)
|
|
self._go_to_gi(new_gi)
|
|
return
|
|
|
|
# Do everything which touches the network in one block,
|
|
# so we only need to catch exceptions once
|
|
try:
|
|
# Is this a local file?
|
|
if not gi.host:
|
|
address, f = None, open(gi.path, "rb")
|
|
else:
|
|
address, f = self._send_request(gi)
|
|
|
|
# Spec dictates <META> should not exceed 1024 bytes,
|
|
# so maximum valid header length is 1027 bytes.
|
|
header = f.readline(1027)
|
|
header = header.decode("UTF-8")
|
|
if not header or header[-1] != '\n':
|
|
raise RuntimeError("Received invalid header from server!")
|
|
header = header.strip()
|
|
self._debug("Response header: %s." % header)
|
|
|
|
# Catch network errors which may happen on initial connection
|
|
except Exception as err:
|
|
# Print an error message
|
|
if isinstance(err, socket.gaierror):
|
|
print("ERROR: DNS error!")
|
|
elif isinstance(err, ConnectionRefusedError):
|
|
print("ERROR: Connection refused!")
|
|
elif isinstance(err, ConnectionResetError):
|
|
print("ERROR: Connection reset!")
|
|
elif isinstance(err, (TimeoutError, socket.timeout)):
|
|
print("""ERROR: Connection timed out!
|
|
Slow internet connection? Use 'set timeout' to be more patient.""")
|
|
else:
|
|
print("ERROR: " + str(err))
|
|
return
|
|
|
|
# Validate header
|
|
status, meta = header.split(maxsplit=1)
|
|
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
|
|
print("ERROR: Received invalid header from server!")
|
|
f.close()
|
|
return
|
|
|
|
# Update redirect loop/maze escaping state
|
|
if not status.startswith("3"):
|
|
self.previous_redirectors = set()
|
|
|
|
# Handle non-SUCCESS headers, which don't have a response body
|
|
# Inputs
|
|
if status.startswith("1"):
|
|
print(meta)
|
|
if status == "11":
|
|
user_input = getpass.getpass("> ")
|
|
else:
|
|
user_input = input("> ")
|
|
self._go_to_gi(gi.query(user_input))
|
|
return
|
|
# Redirects
|
|
elif status.startswith("3"):
|
|
new_gi = GeminiItem(gi.absolutise_url(meta))
|
|
if new_gi.url in self.previous_redirectors:
|
|
print("Error: caught in redirect loop!")
|
|
return
|
|
elif len(self.previous_redirectors) == _MAX_REDIRECTS:
|
|
print("Error: refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
|
|
return
|
|
self._debug("Following redirect to %s." % new_gi.url)
|
|
self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
|
|
self.previous_redirectors.add(gi.url)
|
|
if status == "31":
|
|
# Permanent redirect
|
|
self.permanent_redirects[gi.url] = new_gi.url
|
|
self._go_to_gi(new_gi)
|
|
return
|
|
# Errors
|
|
elif status.startswith("4") or status.startswith("5"):
|
|
print("Error: %s" % meta)
|
|
return
|
|
# Client cert
|
|
elif status.startswith("6"):
|
|
|
|
# Present different messages for different 6x statuses, but
|
|
# handle them the same.
|
|
if status in ("64", "65"):
|
|
print("The server rejected your certificate because it is either expired or not yet valid.")
|
|
elif status == "63":
|
|
print("The server did not accept your certificate.")
|
|
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
|
|
else:
|
|
print("The site {} is requesting a client certificate.".format(gi.host))
|
|
print("This will allow the site to recognise you across requests.")
|
|
print("What do you want to do?")
|
|
print("1. Give up.")
|
|
print("2. Generate new certificate and retry the request.")
|
|
print("3. Load previously generated certificate from file.")
|
|
print("4. Load certificate from file and retry the request.")
|
|
choice = input("> ").strip()
|
|
if choice == "2":
|
|
self._generate_persistent_client_cert()
|
|
self._go_to_gi(gi, update_hist, handle)
|
|
elif choice == "3":
|
|
self._choose_client_cert()
|
|
self._go_to_gi(gi, update_hist, handle)
|
|
elif choice == "4":
|
|
self._load_client_cert()
|
|
self._go_to_gi(gi, update_hist, handle)
|
|
else:
|
|
print("Giving up.")
|
|
return
|
|
# Invalid status
|
|
elif not status.startswith("2"):
|
|
print("ERROR: Server returned undefined status code %s!" % status)
|
|
return
|
|
|
|
# If we're here, this must be a success and there's a response body
|
|
assert status.startswith("2")
|
|
|
|
mime = meta
|
|
if mime == "":
|
|
mime = "text/gemini; charset=utf-8"
|
|
mime, mime_options = cgi.parse_header(mime)
|
|
if "charset" in mime_options:
|
|
try:
|
|
codecs.lookup(mime_options["charset"])
|
|
except LookupError:
|
|
print("Header declared unknown encoding %s" % value)
|
|
return
|
|
|
|
if not mime.startswith("text/"):
|
|
print("Non-text MIME type declared!")
|
|
return
|
|
|
|
encoding = mime_options.get("charset", "UTF-8")
|
|
if mime.startswith("text/gemini"):
|
|
self.index = []
|
|
preformatted = False
|
|
|
|
for line in f:
|
|
try:
|
|
line = line.decode(encoding)
|
|
except UnicodeError:
|
|
print("Could not decode response body using %s encoding declared in header!" % encoding)
|
|
return
|
|
|
|
if mime.startswith("text/gemini"):
|
|
if line.startswith("```"):
|
|
preformatted = not preformatted
|
|
elif preformatted:
|
|
print(line, end="")
|
|
elif line.startswith("=>"):
|
|
try:
|
|
link_gi = GeminiItem.from_map_line(line, gi)
|
|
self.index.append(link_gi)
|
|
print(self._format_geminiitem(len(self.index), link_gi))
|
|
except:
|
|
self._debug("Skipping possible link: %s" % line)
|
|
elif line.startswith("* "):
|
|
line = line[1:].lstrip("\t ")
|
|
print(textwrap.fill(line, self.options["width"],
|
|
initial_indent = "• ", subsequent_indent=" "), end="")
|
|
elif line.startswith(">"):
|
|
line = line[1:].lstrip("\t ")
|
|
print(textwrap.fill(line, self.options["width"],
|
|
initial_indent = "> ", subsequent_indent="> "), end="")
|
|
elif line.startswith("###"):
|
|
line = line[3:].lstrip("\t ")
|
|
print("\x1b[4m" + line + "\x1b[0m", end="")
|
|
elif line.startswith("##"):
|
|
line = line[2:].lstrip("\t ")
|
|
print("\x1b[1m" + line + "\x1b[0m", end="")
|
|
elif line.startswith("#"):
|
|
line = line[1:].lstrip("\t ")
|
|
print("\x1b[1m\x1b[4m" + line + "\x1b[0m", end="")
|
|
else:
|
|
print(textwrap.fill(line, self.options["width"]))
|
|
|
|
else:
|
|
# Other text/
|
|
print(line)
|
|
|
|
# Update state
|
|
self.lookup = self.index
|
|
self.page_index = 0
|
|
self.index_index = -1
|
|
self.gi = gi
|
|
self.mime = mime
|
|
if update_hist:
|
|
self._update_history(gi)
|
|
|
|
def _send_request(self, gi):
|
|
"""Send a selector to a given host and port.
|
|
Returns the resolved address and binary file with the reply."""
|
|
# For Gemini requests, connect to the host and port specified in the URL
|
|
host, port = gi.host, gi.port
|
|
|
|
# Do DNS resolution
|
|
addresses = self._get_addresses(host, port)
|
|
|
|
# Connect to remote host by any address possible
|
|
err = None
|
|
for address in addresses:
|
|
self._debug("Connecting to: " + str(address[4]))
|
|
s = socket.socket(address[0], address[1])
|
|
s.settimeout(self.options["timeout"])
|
|
s = self.tls_context.wrap_socket(s, server_hostname = gi.host)
|
|
try:
|
|
s.connect(address[4])
|
|
break
|
|
except OSError as e:
|
|
err = e
|
|
else:
|
|
# If we couldn't connect to *any* of the addresses, just
|
|
# bubble up the exception from the last attempt and deny
|
|
# knowledge of earlier failures.
|
|
raise err
|
|
|
|
if sys.version_info.minor >=5:
|
|
self._debug("Established {} connection.".format(s.version()))
|
|
self._debug("Cipher is: {}.".format(s.cipher()))
|
|
|
|
# Do TOFU
|
|
if self.options["tls_mode"] == "tofu":
|
|
cert = s.getpeercert(binary_form=True)
|
|
self._validate_cert(address[4][0], host, cert)
|
|
|
|
# Send request and wrap response in a file descriptor
|
|
self._debug("Sending %s<CRLF>" % gi.url)
|
|
s.sendall((gi.url + CRLF).encode("UTF-8"))
|
|
return address, s.makefile(mode = "rb")
|
|
|
|
def _get_addresses(self, host, port):
|
|
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
|
|
if ":" in host:
|
|
# This is likely a literal IPv6 address, so we can *only* ask for
|
|
# IPv6 addresses or getaddrinfo will complain
|
|
family_mask = socket.AF_INET6
|
|
elif socket.has_ipv6 and self.options["ipv6"]:
|
|
# Accept either IPv4 or IPv6 addresses
|
|
family_mask = 0
|
|
else:
|
|
# IPv4 only
|
|
family_mask = socket.AF_INET
|
|
addresses = socket.getaddrinfo(host, port, family=family_mask,
|
|
type=socket.SOCK_STREAM)
|
|
# Sort addresses so IPv6 ones come first
|
|
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
|
|
|
|
return addresses
|
|
|
|
def _validate_cert(self, address, host, cert):
|
|
"""
|
|
Validate a TLS certificate in TOFU mode.
|
|
|
|
If the cryptography module is installed:
|
|
- Check the certificate Common Name or SAN matches `host`
|
|
- Check the certificate's not valid before date is in the past
|
|
- Check the certificate's not valid after date is in the future
|
|
|
|
Whether the cryptography module is installed or not, check the
|
|
certificate's fingerprint against the TOFU database to see if we've
|
|
previously encountered a different certificate for this IP address and
|
|
hostname.
|
|
"""
|
|
now = datetime.datetime.utcnow()
|
|
if _HAS_CRYPTOGRAPHY:
|
|
# Using the cryptography module we can get detailed access
|
|
# to the properties of even self-signed certs, unlike in
|
|
# the standard ssl library...
|
|
c = x509.load_der_x509_certificate(cert, _BACKEND)
|
|
|
|
# Check certificate validity dates
|
|
if c.not_valid_before >= now:
|
|
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
|
|
elif c.not_valid_after <= now:
|
|
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
|
|
|
|
# Check certificate hostnames
|
|
names = []
|
|
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
|
|
if common_name:
|
|
names.append(common_name[0].value)
|
|
try:
|
|
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
names = set(names)
|
|
for name in names:
|
|
try:
|
|
ssl._dnsname_match(name, host)
|
|
break
|
|
except CertificateError:
|
|
continue
|
|
else:
|
|
# If we didn't break out, none of the names were valid
|
|
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
|
|
|
|
sha = hashlib.sha256()
|
|
sha.update(cert)
|
|
fingerprint = sha.hexdigest()
|
|
|
|
# Have we been here before?
|
|
self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
|
|
FROM cert_cache
|
|
WHERE hostname=? AND address=?""", (host, address))
|
|
cached_certs = self.db_cur.fetchall()
|
|
|
|
# If so, check for a match
|
|
if cached_certs:
|
|
max_count = 0
|
|
most_frequent_cert = None
|
|
for cached_fingerprint, first, last, count in cached_certs:
|
|
if count > max_count:
|
|
max_count = count
|
|
most_frequent_cert = cached_fingerprint
|
|
if fingerprint == cached_fingerprint:
|
|
# Matched!
|
|
self._debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
|
|
self.db_cur.execute("""UPDATE cert_cache
|
|
SET last_seen=?, count=?
|
|
WHERE hostname=? AND address=? AND fingerprint=?""",
|
|
(now, count+1, host, address, fingerprint))
|
|
self.db_conn.commit()
|
|
break
|
|
else:
|
|
if _HAS_CRYPTOGRAPHY:
|
|
# Load the most frequently seen certificate to see if it has
|
|
# expired
|
|
certdir = os.path.join(self.config_dir, "cert_cache")
|
|
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
|
|
previous_cert = fp.read()
|
|
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
|
|
previous_ttl = previous_cert.not_valid_after - now
|
|
print(previous_ttl)
|
|
|
|
self._debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
|
|
print("****************************************")
|
|
print("[SECURITY WARNING] Unrecognised certificate!")
|
|
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
|
|
print("This MIGHT be a Man-in-the-Middle attack.")
|
|
print("A different certificate has previously been seen {} times.".format(max_count))
|
|
if _HAS_CRYPTOGRAPHY:
|
|
if previous_ttl < datetime.timedelta():
|
|
print("That certificate has expired, which reduces suspicion somewhat.")
|
|
else:
|
|
print("That certificate is still valid for: {}".format(previous_ttl))
|
|
print("****************************************")
|
|
print("Attempt to verify the new certificate fingerprint out-of-band:")
|
|
print(fingerprint)
|
|
choice = input("Accept this new certificate? Y/N ").strip().lower()
|
|
if choice in ("y", "yes"):
|
|
self.db_cur.execute("""INSERT INTO cert_cache
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(host, address, fingerprint, now, now, 1))
|
|
self.db_conn.commit()
|
|
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
|
fp.write(cert)
|
|
else:
|
|
raise Exception("TOFU Failure!")
|
|
|
|
# If not, cache this cert
|
|
else:
|
|
self._debug("TOFU: Blindly trusting first ever certificate for this host!")
|
|
self.db_cur.execute("""INSERT INTO cert_cache
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(host, address, fingerprint, now, now, 1))
|
|
self.db_conn.commit()
|
|
certdir = os.path.join(self.config_dir, "cert_cache")
|
|
if not os.path.exists(certdir):
|
|
os.makedirs(certdir)
|
|
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
|
fp.write(cert)
|
|
|
|
def _format_geminiitem(self, index, gi, url=False):
|
|
line = "[%d] %s" % (index, gi.name or gi.url)
|
|
if gi.name and url:
|
|
line += " (%s)" % gi.url
|
|
return line
|
|
|
|
def _show_lookup(self, offset=0, end=None, url=False):
|
|
for n, gi in enumerate(self.lookup[offset:end]):
|
|
print(self._format_geminiitem(n+offset+1, gi, url))
|
|
|
|
def _update_history(self, gi):
|
|
# Don't duplicate
|
|
if self.history and self.history[self.hist_index] == gi:
|
|
return
|
|
self.history = self.history[0:self.hist_index+1]
|
|
self.history.append(gi)
|
|
self.hist_index = len(self.history) - 1
|
|
|
|
def _debug(self, debug_text):
|
|
if not self.options["debug"]:
|
|
return
|
|
debug_text = "\x1b[0;32m[DEBUG] " + debug_text + "\x1b[0m"
|
|
print(debug_text)
|
|
|
|
# Cmd implementation follows
|
|
|
|
def default(self, line):
|
|
if line.strip() == "EOF":
|
|
return self.onecmd("quit")
|
|
elif line.strip() == "..":
|
|
return self.do_up()
|
|
elif line.startswith("/"):
|
|
return self.do_search(line[1:])
|
|
|
|
# Expand abbreviated commands
|
|
first_word = line.split()[0].strip()
|
|
if first_word in _ABBREVS:
|
|
full_cmd = _ABBREVS[first_word]
|
|
expanded = line.replace(first_word, full_cmd, 1)
|
|
return self.onecmd(expanded)
|
|
|
|
# Try to parse numerical index for lookup table
|
|
try:
|
|
n = int(line.strip())
|
|
except ValueError:
|
|
print("What?")
|
|
return
|
|
|
|
try:
|
|
gi = self.lookup[n-1]
|
|
except IndexError:
|
|
print ("Index too high!")
|
|
return
|
|
|
|
self.index_index = n
|
|
self._go_to_gi(gi)
|
|
|
|
### Settings
|
|
def do_set(self, line):
|
|
"""View or set various options."""
|
|
if not line.strip():
|
|
# Show all current settings
|
|
for option in sorted(self.options.keys()):
|
|
print("%s %s" % (option, self.options[option]))
|
|
elif len(line.split()) == 1:
|
|
# Show current value of one specific setting
|
|
option = line.strip()
|
|
if option in self.options:
|
|
print("%s %s" % (option, self.options[option]))
|
|
else:
|
|
print("Unrecognised option %s" % option)
|
|
else:
|
|
# Set value of one specific setting
|
|
option, value = line.split(" ", 1)
|
|
if option not in self.options:
|
|
print("Unrecognised option %s" % option)
|
|
return
|
|
# Validate / convert values
|
|
if option == "tls_mode":
|
|
if value.lower() not in ("ca", "tofu", "insecure"):
|
|
print("""TLS mode must be "ca", "tofu" or "insecure"!""")
|
|
return
|
|
elif value.isnumeric():
|
|
value = int(value)
|
|
elif value.lower() == "false":
|
|
value = False
|
|
elif value.lower() == "true":
|
|
value = True
|
|
else:
|
|
try:
|
|
value = float(value)
|
|
except ValueError:
|
|
pass
|
|
self.options[option] = value
|
|
|
|
def do_abbrevs(self, *args):
|
|
"""Print all Alphonse command abbreviations."""
|
|
header = "Command Abbreviations:"
|
|
self.stdout.write("\n{}\n".format(str(header)))
|
|
if self.ruler:
|
|
self.stdout.write("{}\n".format(str(self.ruler * len(header))))
|
|
for k, v in _ABBREVS.items():
|
|
self.stdout.write("{:<7} {}\n".format(k, v))
|
|
self.stdout.write("\n")
|
|
|
|
@needs_gi
|
|
def do_reload(self, *args):
|
|
"""Reload the current URL."""
|
|
self._go_to_gi(self.gi)
|
|
|
|
@needs_gi
|
|
def do_up(self, *args):
|
|
"""Go up one directory in the path."""
|
|
self._go_to_gi(self.gi.up())
|
|
|
|
def do_back(self, *args):
|
|
"""Go back to the previous gemini item."""
|
|
if not self.history or self.hist_index == 0:
|
|
return
|
|
self.hist_index -= 1
|
|
gi = self.history[self.hist_index]
|
|
self._go_to_gi(gi, update_hist=False)
|
|
|
|
def do_forward(self, *args):
|
|
"""Go forward to the next gemini item."""
|
|
if not self.history or self.hist_index == len(self.history) - 1:
|
|
return
|
|
self.hist_index += 1
|
|
gi = self.history[self.hist_index]
|
|
self._go_to_gi(gi, update_hist=False)
|
|
|
|
def do_next(self, *args):
|
|
"""Go to next item after current in index."""
|
|
return self.onecmd(str(self.index_index+1))
|
|
|
|
def do_previous(self, *args):
|
|
"""Go to previous item before current in index."""
|
|
self.lookup = self.index
|
|
return self.onecmd(str(self.index_index-1))
|
|
|
|
def do_root(self, *args):
|
|
"""Go to root URL of the application."""
|
|
self._go_to_gi(GeminiItem(self.initial_url))
|
|
|
|
def do_version(self, line):
|
|
"""Display version information."""
|
|
print("Alphonse " + _VERSION)
|
|
|
|
### Stuff that modifies the lookup table
|
|
def do_ls(self, line):
|
|
"""List contents of current index.
|
|
Use 'ls -l' to see URLs."""
|
|
self.lookup = self.index
|
|
self._show_lookup(url = "-l" in line)
|
|
self.page_index = 0
|
|
|
|
def do_history(self, *args):
|
|
"""Display history."""
|
|
self.lookup = self.history
|
|
self._show_lookup(url=True)
|
|
self.page_index = 0
|
|
|
|
def do_search(self, searchterm):
|
|
"""Search index (case insensitive)."""
|
|
results = [
|
|
gi for gi in self.lookup if searchterm.lower() in gi.name.lower()]
|
|
if results:
|
|
self.lookup = results
|
|
self._show_lookup()
|
|
self.page_index = 0
|
|
else:
|
|
print("No results found.")
|
|
|
|
def emptyline(self):
|
|
"""Page through index ten lines at a time."""
|
|
i = self.page_index
|
|
if i > len(self.lookup):
|
|
return
|
|
self._show_lookup(offset=i, end=i+10)
|
|
self.page_index += 10
|
|
|
|
@needs_gi
|
|
def do_url(self, *args):
|
|
"""Print URL of most recently visited item."""
|
|
print(self.gi.url)
|
|
|
|
### Help
|
|
def do_help(self, arg):
|
|
"""ALARM! Recursion detected! ALARM! Prepare to eject!"""
|
|
if arg == "!":
|
|
print("! is an alias for 'shell'")
|
|
elif arg == "?":
|
|
print("? is an alias for 'help'")
|
|
else:
|
|
cmd.Cmd.do_help(self, arg)
|
|
|
|
### The end!
|
|
def do_quit(self, *args):
|
|
"""Exit Alphonse."""
|
|
# Close TOFU DB
|
|
self.db_conn.commit()
|
|
self.db_conn.close()
|
|
print()
|
|
print("Thank you for flying Alphonse!")
|
|
sys.exit()
|
|
|
|
do_exit = do_quit
|
|
|
|
# Main function
|
|
def main():
|
|
|
|
# Parse args
|
|
parser = argparse.ArgumentParser(description='A command line gemini client.')
|
|
parser.add_argument('--tls-cert', metavar='FILE', required=True,
|
|
help='TLS client certificate file')
|
|
parser.add_argument('--tls-key', metavar='FILE', required=True,
|
|
help='TLS client certificate private key file')
|
|
parser.add_argument('--version', action='store_true',
|
|
help='display version information and quit')
|
|
parser.add_argument('url', metavar='URL', help='start with this URL')
|
|
args = parser.parse_args()
|
|
|
|
# Handle --version
|
|
if args.version:
|
|
print("Alphonse " + _VERSION)
|
|
sys.exit()
|
|
|
|
# Instantiate client
|
|
gc = GeminiClient(args.url, args.tls_key, args.tls_cert)
|
|
|
|
# Process config file
|
|
rcfile = os.path.join(gc.config_dir, "alphonserc")
|
|
if os.path.exists(rcfile):
|
|
print("Using config %s" % rcfile)
|
|
with open(rcfile, "r") as fp:
|
|
for line in fp:
|
|
line = line.strip()
|
|
gc.cmdqueue.append(line)
|
|
|
|
# Say hi
|
|
print("Welcome to Alphonse!")
|
|
print("Enjoy your patrol through Geminispace...")
|
|
|
|
# Endless interpret loop
|
|
while True:
|
|
try:
|
|
gc.cmdloop()
|
|
except KeyboardInterrupt:
|
|
print("")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|