From 405aea50e4fe044b7507dc9007c6cba1433437ae Mon Sep 17 00:00:00 2001 From: khuxkm fbexl Date: Fri, 2 Jul 2021 02:32:19 +0000 Subject: [PATCH] Verify certs, and prevent link items with no first space from crashing Zenit --- vcert.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++ zenit-reindexer.py | 7 +++- zenit.py | 9 +++-- 3 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 vcert.py diff --git a/vcert.py b/vcert.py new file mode 100644 index 0000000..cb331d0 --- /dev/null +++ b/vcert.py @@ -0,0 +1,82 @@ +"""Certificate verification. Stolen from AV-98, whose license is reproduced below (and should be interpreted as pertaining to this code only): + +Copyright (c) 2020, Solderpunk and contributors. +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +import ssl +from ssl import CertificateError +try: + from cryptography import x509 + from cryptography.hazmat.backends import default_backend + _HAS_CRYPTOGRAPHY = True + _BACKEND = default_backend() + _WARN_NO_CRYPTOGRAPHY = False +except ModuleNotFoundError: + _HAS_CRYPTOGRAPHY = False + _WARN_NO_CRYPTOGRAPHY = True +from sys import stderr +import datetime + +def validate_cert(host, cert): + """Validate a TLS certificate in TOFU mode. + If the cryptography module is installed: + - Check the certificate Common Name or SAN matches `host` + - Check the certificate's not valid before date is in the past + - Check the certificate's not valid after date is in the future + """ + now = datetime.datetime.utcnow() + if _HAS_CRYPTOGRAPHY: + # Using the cryptography module we can get detailed access + # to the properties of even self-signed certs, unlike in + # the standard ssl library... + c = x509.load_der_x509_certificate(cert, _BACKEND) + # Check certificate validity dates + if c.not_valid_before >= now: + raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before)) + elif c.not_valid_after <= now: + raise CertificateError("Certificate expired as of: {}!".format(c.not_valid_after)) + # Check certificate hostnames + names = [] + common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) + if common_name: + names.append(common_name[0].value) + try: + names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value if type(alt.value)==str]) + except x509.ExtensionNotFound: + pass + names = set(names) + for name in names: + try: + ssl._dnsname_match(name, host) + break + except CertificateError: + continue + else: + # If we didn't break out, none of the names were valid + raise CertificateError("Hostname does not match certificate common name or any alternative names.") + else: + if _WARN_NO_CRYPTOGRAPHY: + print("WARNING: cryptography library not installed!",file=stderr) + print("All certificates will be treated as valid!",file=stderr) + _WARN_NO_CRYPTOGRAPHY = False # only warn once per run diff --git a/zenit-reindexer.py b/zenit-reindexer.py index 224426f..ecfe255 100644 --- a/zenit-reindexer.py +++ b/zenit-reindexer.py @@ -1,7 +1,7 @@ """Zenit - the Molniya indexer. Zenit was a series of military photoreconnaissance satellites launched by the Soviet Union between 1961 and 1994. In keeping with the Soviet spy satellite theme, I chose this name for the indexer.""" -import json, urllib.parse, traceback, sys, ssl, socket, string +import json, urllib.parse, traceback, sys, ssl, socket, string, vcert from config import * # stolen from AV-98 urllib.parse.uses_relative.append("gemini") @@ -103,7 +103,12 @@ def grab_content(url,redirect_num=0): ss = ctx.wrap_socket(s,server_hostname=parsed.hostname) try: ss.connect((parsed.hostname,parsed.port or 1965)) + vcert.validate_cert(parsed.hostname,ss.getpeercert(True)) except ConnectionRefusedError: + print("Connection refused!",file=sys.stderr) + return b'', 'application/octet-stream' + except ssl.CertificateError as e: + print(e.args[0],file=sys.stderr) return b'', 'application/octet-stream' ss.send((url.strip()+"\r\n").encode("UTF-8")) out = b"" diff --git a/zenit.py b/zenit.py index 61829a5..9ea697d 100644 --- a/zenit.py +++ b/zenit.py @@ -1,7 +1,7 @@ """Zenit - the Molniya indexer. Zenit was a series of military photoreconnaissance satellites launched by the Soviet Union between 1961 and 1994. In keeping with the Soviet spy satellite theme, I chose this name for the indexer.""" -import json, urllib.parse, traceback, sys, ssl, socket, string +import json, urllib.parse, traceback, sys, ssl, socket, string, vcert from config import * # stolen from AV-98 urllib.parse.uses_relative.append("gemini") @@ -103,7 +103,12 @@ def grab_content(url,redirect_num=0): ss = ctx.wrap_socket(s,server_hostname=parsed.hostname) try: ss.connect((parsed.hostname,parsed.port or 1965)) + vcert.validate_cert(parsed.hostname,ss.getpeercert(True)) except ConnectionRefusedError: + print("Connection refused!") + return b'', 'application/octet-stream' + except ssl.CertificateError as e: + print(f"Certificate error for domain {parsed.hostname}: {e.args[0]}") return b'', 'application/octet-stream' ss.send((url.strip()+"\r\n").encode("UTF-8")) out = b"" @@ -193,7 +198,7 @@ for link in links: links_to_orbit = False for line in response.splitlines(): if line.startswith("=>"): - parts = line.split(None,2) + parts = line.replace("=>","=> ").replace("=> ","=> ").split(None,2) for reqlink in REQUIRED_LINKS: links_to_orbit=links_to_orbit or parts[1].startswith(reqlink) assert links_to_orbit, "doesn't link back to orbit"