Verify TLS certificates using different network perspectives
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

738 lines
24 KiB

#!/usr/bin/env python3
import sqlite3
import sys
import os
import getopt
import urllib
import io
import ipaddress
import socket
import hashlib
import codecs
import base64
import time
from datetime import datetime, timezone
import ssl
import OpenSSL
import cryptography.hazmat.primitives.serialization
import idna # punycode
import pycurl
# Set the database path here, or it will be created in the default location.
dbpath = ''
if not dbpath:
if os.path.exists('trust-seeker.db'):
# If there's a trust-seeker.db in the current working directory, use it.
dbpath = '.'
elif 'XDG_DATA_HOME' in os.environ:
dbpath = os.environ['XDG_DATA_HOME'] + '/trust-seeker'
elif os.path.isdir(os.path.expanduser('~/.local/share')):
dbpath = '~/.local/share/trust-seeker'
else: # Save database in the same directory as the executable.
pathname = os.path.dirname(sys.argv[0])
basepath = os.path.abspath(os.path.realpath(pathname))
dbpath = basepath
dbpath = os.path.expanduser(dbpath)
os.makedirs(dbpath, exist_ok=True)
dbpath += '/trust-seeker.db'
remote_protocol = '' # command line by default
# CGI
if 'REQUEST_METHOD' in os.environ:
import cgi
# CGI debugging
#import cgitb; cgitb.enable()
#print('Content-Type: text/html')
#cgi.test()
if os.environ['REQUEST_SCHEME'] == 'gemini':
remote_protocol = 'gemini'
else:
remote_protocol = 'http'
pin_status: dict = {
'active': 1,
'inactive': 0,
'rejected': -1,
}
# Print to stderr (https://stackoverflow.com/a/14981125).
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
# Outputs response in the appropriate format
def respond(response: str, status='success', mime_type: str = 'text/plain'):
# Status codes for CLI, Gemini and HTTP.
# Inspired by Convergence's Notary Protocol:
# https://github.com/moxie0/Convergence/wiki/Notary-Protocol
status_codes: dict = {
'success': [0, 20, 200],
'mismatch': [1, 50, 409], # fingerprint mismatch
'no_info': [2, 51, 303], # host not found in database
'bad_input': [3, 59, 400],
'internal_error': [4, 41, 503],
'error': [5, status, status], # general error
}
if type(response) not in [str, list]:
status = 'internal_error'
response = 'Response type unknown'
# If status is an integer, get its corresponding string value
if type(status) == int:
def get_status(status: int) -> str:
for sc in status_codes:
for c in status_codes[sc]:
if status == c:
return sc
return 'error'
status = get_status(status)
status_code = status_codes[status][0]
if mime_type == 'text/plain':
if type(response) == list:
formatted_response = 'status=' + status
# Let user know if response contains multiple pins.
# TODO: Add support for multiple pins.
if len(response) > 1:
formatted_response += os.linesep + 'number_of_pins=' + str(len(response))
for pin in response:
fp = pin['fingerprint'].split(':')[1]
base64_fingerprint = base64.b64encode(codecs.decode(fp, 'hex')).decode('ascii')
formatted_response += os.linesep + 'fingerprint=' + pin['fingerprint'] +\
os.linesep + 'fingerprint_base64=sha256:' + base64_fingerprint +\
os.linesep + 'expires=' + str(pin['expires']) +\
os.linesep + 'tls_version=' + str(pin['tls_version']) +\
os.linesep + 'cipher_suite=' + pin['cipher_suite'] +\
os.linesep + 'first_seen=' + str(pin['first_seen']) +\
os.linesep + 'last_seen=' + str(pin['last_seen']) +\
os.linesep + 'seen_count=' + str(pin['seen_count'])
break
response = formatted_response
# No CGI
if remote_protocol == '':
if status == 'success':
print(response)
else:
eprint(response)
# CGI
else:
if remote_protocol == 'gemini':
remote_status_code = status_codes[status][1]
print(str(remote_status_code) + ' ' + mime_type + '\r\n', end='')
elif remote_protocol == 'http':
remote_status_code = str(status_codes[status][2])
print('Status: ' + remote_status_code + '\r\nContent-Type: ' + mime_type + '\r\n\r\n', end='')
# Note: setting the "Status" header won't work in Python's built-in CGIHTTPServer
# https://stackoverflow.com/a/32079589
print(response, end='')
exit(status_code)
# Split <hostname:port> into (<hostname>, <port>)
def split_host(host: str) -> tuple:
# Hostname might be an IPv6 address
hs = host.split(':')
hostname = ':'.join(hs[:-1]).lower() # all but the last element
port = hs[len(hs)-1] # the last element
return (hostname, port)
def normalize_host(host: str, exit_on_fail: bool = False) -> str:
# Host has the form <hostname:port>
hostname, port = split_host(host)
def exit_or_return(response: str, exit_on_fail: bool):
if exit_on_fail:
respond(response, 'bad_input')
else:
eprint(response)
return False
if hostname.count(':') >= 2: # IPv6
# Compress IPv6 address
hostname = hostname.replace('[', '').replace(']', '')
try:
hostname = '[' + ipaddress.IPv6Address(hostname).compressed + ']'
except:
exit_or_return('Bad input: ' + host, exit_on_fail)
else:
try:
# If hostname is punycode, convert it to IDN (UTF-8)
hostname = idna.decode(hostname)
# Also check the port for forbidden characters
port = idna.decode(port)
except:
exit_or_return('Bad input: ' + host, exit_on_fail)
# Check port
try:
port = int(port)
except:
exit_or_return('Bad input: ' + host, exit_on_fail)
if int(port) < 1 or int(port) > 65535:
exit_or_return('Port not in allowed range: ' + host, exit_on_fail)
# Block remote queries of private IP ranges (TODO: do this properly)
if remote_protocol:
if (hostname == 'localhost'
or hostname == '127.0.0.1'
or hostname == '::1' # IPv6
or hostname[:2] == 'fd' # IPv6
or hostname[:3] == '10.'
# TODO: 172.16.0.0 – 172.31.255.255 ( 172.16.0.0/12 )
or hostname[:8] == '192.168.'):
exit_or_return("Can't query this host remotely: " + host, exit_on_fail)
return hostname + ':' + str(port)
def create_database():
if os.path.isfile(dbpath):
return # DB already created
dbdir = os.path.expanduser(os.path.dirname(dbpath))
if not os.access(dbdir, os.W_OK):
respond('No write permission for "' + dbdir + '".'\
+ os.linesep + "Can't create database.", 'internal_error')
connection = sqlite3.connect(dbpath)
cursor = connection.cursor()
# Fingerprint = hash of SubjectPublicKeyInfo (SPKI), not of entire cert.
cursor.execute('''
CREATE TABLE certs (
id INTEGER PRIMARY KEY,
cert BLOB UNIQUE,
fingerprint TEXT NOT NULL,
expires INTEGER NOT NULL
);''')
cursor.execute('CREATE INDEX idx_certs_fp ON certs (fingerprint);')
cursor.execute('''
CREATE TABLE pins (
status INTEGER NOT NULL DEFAULT 1,
host TEXT NOT NULL,
cert_id INTEGER NOT NULL,
first_seen INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
seen_count INTEGER NOT NULL DEFAULT 1 CHECK(seen_count > 0),
tls_version REAL NOT NULL CHECK(tls_version >= 1),
cipher_suite TEXT NOT NULL,
PRIMARY KEY (host, cert_id),
FOREIGN KEY(cert_id) REFERENCES certs(id)
);''')
connection.commit()
cursor.close()
connection.close()
def get_TLS_info(host: str, timeout: int = 2) -> dict:
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
#context.verify_mode = ssl.CERT_REQUIRED
hostname, port = split_host(host)
try:
conn = socket.create_connection((hostname, port), timeout)
sock = context.wrap_socket(conn, server_hostname=hostname) # SNI
except:
eprint('Connection failed for ' + host)
return
connection_details = sock.cipher()
TLS_info = {
'cert': sock.getpeercert(True), # DER format
'cipher_suite': connection_details[0],
'tls_version': float(connection_details[1][4:]),
}
sock.close()
#PEM_cert = ssl.DER_cert_to_PEM_cert(TLS_info['cert'])
# Get hash of the SubjectPublicKeyInfo (SPKI).
# https://www.imperialviolet.org/2011/05/04/pinning.html
# gemini://makeworld.space/gemlog/2020-07-03-tofu-rec.gmi
s = cryptography.hazmat.primitives.serialization
e = s.Encoding.DER
f = s.PublicFormat.SubjectPublicKeyInfo
#pubkey = s.load_der_public_key(TLS_info['cert'])
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, TLS_info['cert'])
pubkey = x509.get_pubkey().to_cryptography_key()
spki = pubkey.public_bytes(encoding=e, format=f)
TLS_info['fingerprint'] = 'sha256:' + hashlib.sha256(spki).hexdigest()
# Get cert expiration value as UNIX timestamp.
TLS_info['expires']: int = int(
time.mktime(
time.strptime(
x509.get_notAfter().decode('ascii'),
'%Y%m%d%H%M%SZ')))
return TLS_info
# Check the database for this host
def find_pins(host: str, status: str = '') -> list:
connection = sqlite3.connect(dbpath)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
status_query = ''
if status:
# status = active, inactive or rejected
if status == 'active':
status_query = ' AND p.status > 0'
else:
status_query = ' AND p.status = ' + str(pin_status[status])
cursor.execute('SELECT * FROM certs c, pins p\
WHERE p.cert_id = c.id AND p.host = ?' + status_query + '\
ORDER BY p.last_seen DESC;',
[host])
pins = cursor.fetchall()
cursor.close()
connection.close()
return pins
# Get active certs
def get_certs(host: str) -> list:
pins = find_pins(host, 'active')
certs = []
for pin in pins:
certs.append(pin['cert'])
return certs
# Export certs to PEM string
def export_certs(host: str) -> str:
certs = get_certs(host)
certs_string = ''
for cert in certs:
certs_string += ssl.DER_cert_to_PEM_cert(cert)
return certs_string
def verify_host(host: str, fingerprint: str):
executed_import_host = False
pins = find_pins(host, 'active')
if not pins:
# If trust-seeker is called remotely, connect to the host and get its cert.
if remote_protocol:
import_host(host)
executed_import_host = True
pins = find_pins(host, 'active')
if not pins:
respond('No certs found for ' + host, 'no_info')
# Search for a pin matching the user-provided fingerprint.
def find_matching_pin(pins: list, fingerprint: str):
for pin in pins:
if pin['fingerprint'] == fingerprint:
respond([pin], 'success')
find_matching_pin(pins, fingerprint)
# No matching pins.
# Refresh the host's cert if we haven't already.
if remote_protocol and not executed_import_host:
import_host(host)
pins = find_pins(host, 'active')
find_matching_pin(pins, fingerprint)
# Again no matching pins.
# Output whatever pins we have for this host.
respond(pins, 'mismatch')
def import_host(host: str):
# Get TLS info.
TLS_info = get_TLS_info(host)
if TLS_info is None:
eprint('Retrying...')
time.sleep(2)
TLS_info = get_TLS_info(host)
if TLS_info is None:
return
timestamp_now: int = int(datetime.now(timezone.utc).timestamp())
cert_id: int = -1
pin_updated: bool = False
connection = sqlite3.connect(dbpath)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys=ON;')
# Check for mismatch.
# If mismatch, display a warning and continue.
cursor.execute('''SELECT c.cert FROM certs c, pins p
WHERE p.cert_id = c.id AND host = ? AND status > 0;''',
[host])
pins = cursor.fetchall()
mismatch: bool = True if pins else False
for pin in pins:
if pin['cert'] == TLS_info['cert']:
mismatch = False
break
if mismatch:
eprint('Certificate changed for ' + host)
# Check if the cert is already in the DB (for some other host).
cursor.execute('SELECT * FROM certs WHERE cert = :cert;', TLS_info)
cert = cursor.fetchone()
if cert:
# Update the corresponding pin for this (host, cert) tuple.
cert_id = cert['id']
cursor.execute('SELECT seen_count FROM pins WHERE host = ? AND cert_id = ?;',
(host, cert_id))
pin = cursor.fetchone()
if pin:
sql = '''UPDATE pins
SET status = 1,
last_seen = :last_seen,
seen_count = :seen_count,
tls_version = :tls_version,
cipher_suite = :cipher_suite
WHERE cert_id = :cert_id AND host = :host;'''
cursor.execute(sql, {
'last_seen': timestamp_now,
'seen_count': pin['seen_count'] + 1,
'tls_version': TLS_info['tls_version'],
'cipher_suite': TLS_info['cipher_suite'],
'cert_id': cert['id'],
'host': host,
})
pin_updated = True
else:
# Cert not found, so add it.
sql = '''INSERT INTO certs ( cert, fingerprint, expires)
VALUES (:cert,:fingerprint,:expires);'''
cursor.execute(sql, TLS_info)
cert_id = cursor.lastrowid
if not pin_updated:
# Insert pin.
pin = {
'host' : host,
'cert_id' : cert_id,
'first_seen' : timestamp_now,
'last_seen' : timestamp_now,
'tls_version' : TLS_info['tls_version'],
'cipher_suite': TLS_info['cipher_suite'],
}
sql = '''INSERT INTO pins
( status, host, cert_id, first_seen, last_seen, seen_count, tls_version, cipher_suite)
VALUES ( 1,:host,:cert_id,:first_seen,:last_seen, 1,:tls_version,:cipher_suite);'''
cursor.execute(sql, pin)
# Make other (non-rejected) pins for this host inactive.
sql = '''UPDATE pins SET status = 0
WHERE status != -1 AND cert_id != :cert_id AND host = :host;'''
cursor.execute(sql, {
'cert_id': cert_id,
'host': host,
})
connection.commit()
cursor.close()
connection.close()
def import_file(hosts_file: str, default_port: str):
with open(hosts_file, 'r') as f:
for line in f:
line = line.strip()
if not line: # empty line
continue
if line[0] == '#': # comment
continue
host = line
if ':' not in host:
host += ':' + default_port
host = normalize_host(host)
if not host: # bad input
continue
print('Importing ' + host)
import_host(host)
time.sleep(0.3)
def check_input(argv: list) -> dict:
help_text = """Usage: trust-seeker [options...] <host[:port]>/<file>
--import Import host's cert / Import from file
--verify Verify host; --fingerprint is required
--fingerprint sha256:<hash> Provide cert fingerprint (SubjectPublicKeyInfo hash)
--port <port> Specify (default) port
--lookup Look up host in the trust seeker's database
--output[=certs] Specify what to output; only "certs" is accepted, for now
--ask <URL> Ask another trust seeker about a host; can be combined with --fingerprint"""
command: dict = {
'actions': [] # import, verify, lookup, ask
}
try:
opts, args = getopt.gnu_getopt(argv, '', [
'help',
'import',
'verify',
'port=',
'fingerprint=',
'lookup',
'output=',
'ask=',
])
except getopt.GetoptError:
respond(help_text, 'bad_input')
if not opts:
respond(help_text, 'bad_input')
if len(args) > 1:
respond('Only one argument is accepted.', 'bad_input')
elif len(args) == 1:
arg = args[0]
filename, file_extension = os.path.splitext(arg)
if file_extension in ('', 'txt') and ':' not in arg and filename != 'localhost':
# Assume file.
command['file'] = arg
else:
# Assume host.
command['host'] = arg
for opt, arg in opts:
if opt in ('--help'):
respond(help_text, 'success')
elif opt in ('--import'):
command['actions'].append('import')
elif opt in ('--lookup'):
command['actions'].append('lookup')
elif opt in ('--output'):
if arg == 'certs':
command['output'] = arg
# TODO: specify exact output, for ex:
# --lookup=spki:sha256:base64,tls_version
elif opt in ('--verify'):
command['actions'].append('verify')
elif opt in ('-p', '--port'):
command['port'] = arg
elif opt in ('--fingerprint'):
command['fingerprint'] = arg
elif opt in ('--ask'):
command['actions'].append('ask')
command['trust_seeker_url'] = arg
if not command['actions']:
respond('No action specified.', 'bad_input')
# If --lookup is specified, ignore every other action
if 'lookup' in command['actions']:
command['actions'] = ['lookup']
# If --ask is specified, ignore --import and --verify
if 'ask' in command['actions']:
if 'import' in command['actions']:
command['actions'].remove('import')
if 'verify' in command['actions']:
command['actions'].remove('verify')
if 'verify' in command['actions']:
if 'fingerprint' not in command:
respond("Can't verify without fingerprint.", 'bad_input')
if 'file' in command:
respond("Can't verify a file, only a host.", 'bad_input')
if 'file' in command and 'port' not in command:
respond('Please specify the default --port.', 'bad_input')
if 'host' in command:
if ':' not in command['host']:
if 'port' in command:
command['host'] += ':' + command['port'] # TODO: [IPv6]:port
else:
respond('Please specify the port.', 'bad_input')
command['host'] = normalize_host(command['host'], exit_on_fail=True)
return command
def check_cgi_input() -> dict:
command: dict = {
'actions': [] # verify, lookup
}
# Get URL parameters
url_params = cgi.FieldStorage()
if 'host' in url_params:
command['host'] = url_params['host'].value
command['host'] = normalize_host(command['host'], exit_on_fail=True)
else:
html_help_text = """<!DOCTYPE html>
<html lang="en">
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Trust Seeker</title>
<h1>Trust Seeker</h1>
<p>Parameters:<br>
• host=hostname:port <i>(required)</i><br>
• fingerprint=sha256:SPKI-hash <i>(optional)</i></p>
<p>Examples:<br>
• <a href="trust-seeker?host=rawtext.club:443">Request info about rawtext.club:443</a><br>
• <a href="trust-seeker?host=gemini.circumlunar.space:1965&fingerprint=sha256:1a03a15619200db4496494ec90381c1fe8bd9e0142260f6d8a3d962ed3cfc72f">Verify fingerprint of gemini.circumlunar.space:1965</a></p>
</html>"""
respond(html_help_text, 'bad_input', 'text/html')
if 'fingerprint' in url_params:
command['actions'] = ['verify']
command['fingerprint'] = url_params['fingerprint'].value
else:
command['actions'] = ['lookup']
return command
def ask(trust_seeker_url: str, host: str, fingerprint: str = '', timeout: int = 2):
# TODO: add --timeout option
parsed_url = urllib.parse.urlparse(trust_seeker_url)
ts_scheme = parsed_url.scheme
if ts_scheme not in ['gemini', 'https']:
respond('For --ask, only gemini and https schemes are allowed.', 'bad_input')
ts_hostname = parsed_url.hostname
ts_port = parsed_url.port
if not ts_port:
ts_port = 1965 if parsed_url.scheme == 'gemini' else 443
ts_host = ts_hostname + ':' + str(ts_port)
# Trust seekers' certificates must be in the database
# before they can be queried.
ts_fingerprint = ''
ts_pins = find_pins(ts_host, 'active')
for pin in ts_pins:
ts_fingerprint = pin['fingerprint']
if not ts_fingerprint:
respond("`" + ts_host + "` can't be queried unless its certificate has been imported.")
query = {'host': host}
if fingerprint:
query['fingerprint'] = fingerprint
query = '?' + urllib.parse.urlencode(query)
url = trust_seeker_url + query
# Connect to trust seeker
if ts_scheme == 'gemini':
# TODO: add Gemini support
respond('Gemini requests not yet implemented.', 'internal_error')
else: # http
# convert fingerprint to base64, for curl:
# https://curl.se/libcurl/c/CURLOPT_PINNEDPUBLICKEY.html
fp = ts_fingerprint.split(':')[1]
ts_fingerprint = base64.b64encode(codecs.decode(fp, 'hex')).decode('ascii')
buf = io.BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url)
#c.setopt(c.VERBOSE, True)
c.setopt(c.TIMEOUT, timeout)
c.setopt(c.USERAGENT, '')
c.setopt(c.SSLVERSION, c.SSLVERSION_TLSv1_2) # TLS 1.2 or later
c.setopt(c.SSL_VERIFYHOST, 0)
c.setopt(c.PINNEDPUBLICKEY, 'sha256//' + ts_fingerprint)
c.setopt(c.WRITEDATA, buf)
try:
c.perform()
except:
respond('Could not connect to ' + trust_seeker_url, 'internal_error')
status = c.getinfo(pycurl.RESPONSE_CODE)
response = buf.getvalue().decode('ascii')
c.close()
if response:
respond(response, status)
respond('No response from ' + ts_host, 'internal_error')
def main(argv: list):
create_database()
if remote_protocol:
# CGI
command: dict = check_cgi_input()
else:
# Not CGI
command: dict = check_input(argv)
if 'lookup' in command['actions']:
if 'output' in command:
if command['output'] == 'certs':
certs_string = export_certs(command['host'])
if certs_string == '':
respond('No active certs found for ' + command['host'], 'no_info')
respond(certs_string, 'success')
pins = find_pins(command['host'], 'active')
if not pins:
# If trust-seeker is called remotely, connect to the host and get its cert.
if remote_protocol:
import_host(command['host'])
executed_import_host = True
pins = find_pins(command['host'], 'active')
if not pins:
respond('No certs found for ' + command['host'], 'no_info')
respond(pins, 'success')
if 'import' in command['actions']:
# TLSv1.3 is required for import operations, in order to put
# the highest available TLS version in the 'tls_version' field.
if not ssl.HAS_TLSv1_3:
respond('TLS 1.3 not supported. Please update OpenSSL.', 'internal_error')
if 'file' in command:
print('Importing hosts...')
import_file(command['file'], command['port'])
else:
print('Importing ' + command['host'])
import_host(command['host'])
if 'verify' in command['actions']:
#print('Verifying ' + command['host'])
verify_host(command['host'], command['fingerprint'])
if 'ask' in command['actions']:
#print('Asking `' + command['trust_seeker_url'] + '` about `' + command['host'] + '`')
if 'fingerprint' in command:
ask(command['trust_seeker_url'], command['host'], command['fingerprint'])
else:
ask(command['trust_seeker_url'], command['host'])
if __name__ == "__main__":
main(sys.argv[1:])