Restore debugging output from cache using logging module.

This commit is contained in:
Solderpunk 2023-11-12 15:14:05 +01:00
parent e678bca089
commit 79a6187eac
2 changed files with 35 additions and 26 deletions

50
av98.py
View File

@ -24,6 +24,7 @@ import getpass
import glob
import hashlib
import io
import logging
import mimetypes
import os
import os.path
@ -96,11 +97,15 @@ _MIME_HANDLERS = {
"text/*": "cat %s",
}
# monkey-patch Gemini support in urllib.parse
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
urllib.parse.uses_relative.append("gemini")
urllib.parse.uses_netloc.append("gemini")
ui_out = logging.getLogger("av98_logger")
ui_handler = logging.StreamHandler()
ui_out.addHandler(ui_handler)
def fix_ipv6_url(url):
if not url.count(":") > 2: # Best way to detect them?
@ -438,7 +443,7 @@ you'll be able to transparently follow links to Gopherspace!""")
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
self._debug("Response header: %s." % header)
ui_out.debug("Response header: %s." % header)
# Validate header
status, meta = header.split(maxsplit=1) if header[2:].strip() else (header[:2], "")
@ -483,8 +488,8 @@ you'll be able to transparently follow links to Gopherspace!""")
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
self._debug("Following redirect to %s." % new_gi.url)
self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
ui_out.debug("Following redirect to %s." % new_gi.url)
ui_out.debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
self.previous_redirectors.add(gi.url)
if status == "31":
# Permanent redirect
@ -537,7 +542,7 @@ you'll be able to transparently follow links to Gopherspace!""")
size = tmpf.write(body)
tmpf.close()
self.tmp_filename = tmpf.name
self._debug("Wrote %d byte response to %s." % (size, self.tmp_filename))
ui_out.debug("Wrote %d byte response to %s." % (size, self.tmp_filename))
# Maintain cache and log
if self.options["cache"]:
@ -555,10 +560,10 @@ you'll be able to transparently follow links to Gopherspace!""")
elif gi.scheme == "gopher":
# For Gopher requests, use the configured proxy
host, port = self.options["gopher_proxy"].rsplit(":", 1)
self._debug("Using gopher proxy: " + self.options["gopher_proxy"])
ui_out.debug("Using gopher proxy: " + self.options["gopher_proxy"])
elif gi.scheme in ("http", "https"):
host, port = self.options["http_proxy"].rsplit(":",1)
self._debug("Using http proxy: " + self.options["http_proxy"])
ui_out.debug("Using http proxy: " + self.options["http_proxy"])
# Do DNS resolution
addresses = self._get_addresses(host, port)
@ -605,7 +610,7 @@ you'll be able to transparently follow links to Gopherspace!""")
# Connect to remote host by any address possible
err = None
for address in addresses:
self._debug("Connecting to: " + str(address[4]))
ui_out.debug("Connecting to: " + str(address[4]))
s = socket.socket(address[0], address[1])
s.settimeout(self.options["timeout"])
s = context.wrap_socket(s, server_hostname = gi.host)
@ -621,8 +626,8 @@ you'll be able to transparently follow links to Gopherspace!""")
raise err
if sys.version_info.minor >=5:
self._debug("Established {} connection.".format(s.version()))
self._debug("Cipher is: {}.".format(s.cipher()))
ui_out.debug("Established {} connection.".format(s.version()))
ui_out.debug("Cipher is: {}.".format(s.cipher()))
# Do TOFU
if self.options["tls_mode"] != "ca":
@ -635,7 +640,7 @@ you'll be able to transparently follow links to Gopherspace!""")
self.client_certs[gi.host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
self._debug("Sending %s<CRLF>" % gi.url)
ui_out.debug("Sending %s<CRLF>" % gi.url)
s.sendall((gi.url + CRLF).encode("UTF-8"))
return address, s.makefile(mode = "rb")
@ -767,7 +772,7 @@ you'll be able to transparently follow links to Gopherspace!""")
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
self._debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
ui_out.debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint))
self.db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
@ -785,7 +790,7 @@ you'll be able to transparently follow links to Gopherspace!""")
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
self._debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
ui_out.debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint))
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
@ -812,7 +817,7 @@ you'll be able to transparently follow links to Gopherspace!""")
# If not, cache this cert
else:
self._debug("TOFU: Blindly trusting first ever certificate for this host!")
ui_out.debug("TOFU: Blindly trusting first ever certificate for this host!")
self.db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
@ -839,7 +844,7 @@ you'll be able to transparently follow links to Gopherspace!""")
else:
# Use "xdg-open" as a last resort.
cmd_str = "xdg-open %s"
self._debug("Using handler: %s" % cmd_str)
ui_out.debug("Using handler: %s" % cmd_str)
return cmd_str
def _handle_gemtext(self, body, menu_gi, display=True):
@ -860,7 +865,7 @@ you'll be able to transparently follow links to Gopherspace!""")
self.index.append(gi)
tmpf.write(self._format_geminiitem(len(self.index), gi) + "\n")
except:
self._debug("Skipping possible link: %s" % line)
ui_out.debug("Skipping possible link: %s" % line)
elif line.startswith("* "):
line = line[1:].lstrip("\t ")
tmpf.write(textwrap.fill(line, self.options["width"],
@ -928,12 +933,6 @@ you'll be able to transparently follow links to Gopherspace!""")
else:
return self.tmp_filename
def _debug(self, debug_text):
if not self.options["debug"]:
return
debug_text = "\x1b[0;32m[DEBUG] " + debug_text + "\x1b[0m"
print(debug_text)
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
@ -1025,7 +1024,7 @@ you'll be able to transparently follow links to Gopherspace!""")
self.client_certs["active"] = (certfile, keyfile)
self.active_cert_domains = []
self.prompt = self.cert_prompt
self._debug("Using ID {} / {}.".format(*self.client_certs["active"]))
ui_out.debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self):
if self.active_is_transient:
@ -1092,6 +1091,13 @@ you'll be able to transparently follow links to Gopherspace!""")
if option not in self.options:
print("Unrecognised option %s" % option)
return
# Enable/disable debugging output
if option == "debug":
if value.lower() == "true":
ui_out.setLevel(logging.DEBUG)
elif value.lower() == "false":
ui_out.setLevel(logging.INFO)
# Validate / convert values
if option == "gopher_proxy":
if ":" not in value:

View File

@ -1,10 +1,13 @@
_MAX_CACHE_SIZE = 10
_MAX_CACHE_AGE_SECS = 180
import logging
import os
import os.path
import time
ui_out = logging.getLogger("av98_logger")
class Cache:
def __init__(self):
@ -18,10 +21,10 @@ class Cache:
now = time.time()
cached = self.cache_timestamps[url]
if now - cached > _MAX_CACHE_AGE_SECS:
# self._debug("Expiring old cached copy of resource.")
ui_out.debug("Expiring old cached copy of resource.")
self._remove(url)
return False
# self._debug("Found cached copy of resource.")
ui_out.debug("Found cached copy of resource.")
return True
def _remove(self, url):
@ -44,13 +47,13 @@ class Cache:
lru.sort()
# Drop the oldest entry no matter what
_, url = lru[0]
# self._debug("Dropping cached copy of {} from full cache.".format(url))
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
self._remove(url)
# Drop other entries if they are older than the limit
now = time.time()
for cached, url in lru[1:]:
if now - cached > _MAX_CACHE_AGE_SECS:
# self._debug("Dropping cached copy of {} from full cache.".format(url))
ui_out.debug("Dropping cached copy of {} from full cache.".format(url))
self._remove(url)
else:
break