General tidy-up of entire av98.py file.

Most a matter of rearranging the order of methods to flow
sensibly, as well writing or updating docstrings, getting rid of
old unused return values, and fixing a few very minor defects.
This commit is contained in:
Solderpunk 2023-11-18 15:59:36 +01:00
parent 247f01e3e7
commit 2a70985176
1 changed files with 218 additions and 151 deletions

369
av98.py
View File

@ -48,7 +48,6 @@ _VERSION = "1.0.2dev"
_MAX_REDIRECTS = 5
# Command abbreviations
_ABBREVS = {
"a": "add",
@ -83,7 +82,6 @@ _MIME_HANDLERS = {
"text/*": "cat %s",
}
# monkey-patch Gemini support in urllib.parse
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
urllib.parse.uses_relative.append("gemini")
@ -264,8 +262,8 @@ class GeminiClient(cmd.Cmd):
"timeout" : 10,
"width" : 80,
"auto_follow_redirects" : True,
"gopher_proxy" : None,
"tls_mode" : "tofu",
"gopher_proxy" : None,
"http_proxy": None,
"cache" : False
}
@ -294,11 +292,15 @@ class GeminiClient(cmd.Cmd):
ui_out.debug("Rendered buffer: ", self.rendered_file_buffer)
def _go_to_gi(self, gi, update_hist=True, check_cache=True):
"""This method might be considered "the heart of AV-98".
"""
This method might be considered "the heart of AV-98".
Everything involved in fetching a gemini resource happens here:
sending the request over the network, parsing the response if
its a menu, storing the response in a temporary file, choosing
and calling a handler program, and updating the history."""
and calling a handler program, and updating the history.
Most navigation commands are just a thin wrapper around a call
to this.
"""
# Don't try to speak to servers running other protocols
if gi.scheme in ("http", "https"):
@ -342,7 +344,7 @@ you'll be able to transparently follow links to Gopherspace!""")
self._print_friendly_error(err)
return
# Render gemtext, update index
# Render gemtext, updating the index
if mime == "text/gemini":
self._handle_gemtext(gi)
self.active_rendered_file = self.rendered_file_buffer
@ -363,25 +365,10 @@ you'll be able to transparently follow links to Gopherspace!""")
if update_hist:
self._update_history(gi)
def _print_friendly_error(self, err):
if isinstance(err, socket.gaierror):
ui_out.error("ERROR: DNS error!")
elif isinstance(err, ConnectionRefusedError):
ui_out.error("ERROR: Connection refused!")
elif isinstance(err, ConnectionResetError):
ui_out.error("ERROR: Connection reset!")
elif isinstance(err, (TimeoutError, socket.timeout)):
ui_out.error("""ERROR: Connection timed out!
Slow internet connection? Use 'set timeout' to be more patient.""")
elif isinstance(err, FileNotFoundError):
ui_out.error("ERROR: Local file not found!")
elif isinstance(err, IsADirectoryError):
ui_out.error("ERROR: Viewing local directories is not supported!")
else:
ui_out.error("ERROR: " + str(err))
ui_out.debug(traceback.format_exc())
def _handle_local_file(self, gi):
"""
Guess the MIME type of a local file, to determine the best handler.
"""
mime, noise = mimetypes.guess_type(gi.path)
if not mime:
if gi.path.endswith(".gmi"): # TODO: be better about this
@ -389,7 +376,10 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return mime
def _fetch_over_network(self, gi, destination=None):
"""
Fetch the provided GeminiItem over the network and save the received
content to a file.
"""
previous_redirectors = set()
while True:
# Obey permanent redirects
@ -483,10 +473,10 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
raise RuntimeError("Header declared unknown encoding %s" % value)
# Save response body to disk
body, size = self._write_response_to_file(mime, mime_options, f, destination)
size = self._write_response_to_file(mime, mime_options, f, destination)
ui_out.debug("Wrote %d byte response to %s." % (size, destination))
# Maintain cache and log
# Maintain cache and update flight recorder
if self.options["cache"]:
self.cache.add(gi.url, mime, self.raw_file_buffer)
self._log_visit(gi, address, size)
@ -494,8 +484,15 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return gi, mime
def _send_request(self, gi):
"""Send a selector to a given host and port.
Returns the resolved address and binary file with the reply."""
"""
Send a Gemini request to the appropriate host for the provided
GeminiItem. This is usually the GI's own host and port attributes,
but if it's a gopher:// or http(s):// item, a proxy might be used.
Returns the received response header, parsed into a status code
and meta, plus a the address object that was connected to and a
file interface to the underlying network socket.
"""
# Figure out which host to connect to
if gi.scheme == "gemini":
@ -552,16 +549,16 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
ui_out.debug("Cipher is: {}.".format(s.cipher()))
# Do TOFU
if self.options["tls_mode"] != "ca":
if self.options["tls_mode"] == "tofu":
cert = s.getpeercert(binary_form=True)
self.tofu_store.validate_cert(address[4][0], host, cert)
# Send request and wrap response in a file descriptor
ui_out.debug("Sending %s<CRLF>" % gi.url)
s.sendall((gi.url + CRLF).encode("UTF-8"))
# Read back response
f = s.makefile(mode = "rb")
# Fetch response header
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
@ -571,7 +568,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
header = header.strip()
ui_out.debug("Response header: %s." % header)
# Validate header
# Validate response header
status, meta = header.split(maxsplit=1) if header[2:].strip() else (header[:2], "")
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
@ -579,46 +576,11 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return status, meta, address, f
def _write_response_to_file(self, mime, mime_options, f, destination):
spinner_seq = ["|", "/", "-", "\\"]
# Read the response body over the network
body = bytearray([])
chunk_count = 0
while True:
chunk = f.read(100*1024)
chunk_count += 1
if not chunk:
break
body.extend(chunk)
if chunk_count > 1:
spinner = spinner_seq[chunk_count % 4]
if chunk_count < 10:
print("{} Received {} KiB...".format(spinner, chunk_count*100), end="\r")
else:
print("{} Received {} MiB...".format(spinner, chunk_count/10.0), end="\r")
# Save the result to a temporary file
## Determine file mode
if mime.startswith("text/"):
mode = "w"
encoding = mime_options.get("charset", "UTF-8")
try:
body = body.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s encoding declared in header!" % encoding)
else:
mode = "wb"
encoding = None
## Write
fp = open(destination or self.raw_file_buffer, mode=mode, encoding=encoding)
size = fp.write(body)
fp.close()
return body, size
def _get_addresses(self, host, port):
"""
Convert a host and port into an address object suitable for
instantiating a socket.
"""
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
@ -638,6 +600,9 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return addresses
def _prepare_SSL_context(self, cert_validation_mode="tofu"):
"""
Specify a bunch of low level SSL settings.
"""
# Flail against version churn
if sys.version_info >= (3, 10):
_newest_supported_protocol = ssl.PROTOCOL_TLS_CLIENT
@ -677,30 +642,81 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
return context
def _get_handler_cmd(self, mimetype):
# Now look for a handler for this mimetype
# Consider exact matches before wildcard matches
exact_matches = []
wildcard_matches = []
for handled_mime, cmd_str in _MIME_HANDLERS.items():
if "*" in handled_mime:
wildcard_matches.append((handled_mime, cmd_str))
else:
exact_matches.append((handled_mime, cmd_str))
for handled_mime, cmd_str in exact_matches + wildcard_matches:
if fnmatch.fnmatch(mimetype, handled_mime):
def _write_response_to_file(self, mime, mime_options, f, destination):
"""
Given a file handler representing a network socket which will yield
the response body for a successful Gemini request, and the associated
MIME information, download the response body and save it in the
specified file. text/* responses which use an encoding other than
UTF-8 will be transcoded to UTF-8 before hitting the disk.
Returns the size in bytes of the downloaded response.
"""
# Read the response body over the network
spinner_seq = ["|", "/", "-", "\\"]
body = bytearray([])
chunk_count = 0
while True:
chunk = f.read(100*1024)
chunk_count += 1
if not chunk:
break
body.extend(chunk)
if chunk_count > 1:
spinner = spinner_seq[chunk_count % 4]
if chunk_count < 10:
print("{} Received {} KiB...".format(spinner, chunk_count*100), end="\r")
else:
print("{} Received {} MiB...".format(spinner, chunk_count/10.0), end="\r")
print(" "*80, end="\r") # Clean up prompt space
# Determine file mode
if mime.startswith("text/"):
mode = "w"
# Decode received bytes with response-specified encoding...
encoding = mime_options.get("charset", "UTF-8")
try:
body = body.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s encoding declared in header!" % encoding)
# ...but alway save to disk in UTF-8
encoding = "UTF-8"
else:
# Use "xdg-open" as a last resort.
cmd_str = "xdg-open %s"
ui_out.debug("Using handler: %s" % cmd_str)
return cmd_str
mode = "wb"
encoding = None
# Write
fp = open(destination or self.raw_file_buffer, mode=mode, encoding=encoding)
size = fp.write(body)
fp.close()
return size
def _log_visit(self, gi, address, size):
"""
Update the "black box flight recorder" with details of requests and
responses.
"""
if not address:
return
self.log["requests"] += 1
self.log["bytes_recvd"] += size
self.visited_hosts.add(address)
if address[0] == socket.AF_INET:
self.log["ipv4_requests"] += 1
self.log["ipv4_bytes_recvd"] += size
elif address[0] == socket.AF_INET6:
self.log["ipv6_requests"] += 1
self.log["ipv6_bytes_recvd"] += size
def _handle_gemtext(self, menu_gi):
"""Simultaneously parse and render a text/gemini document.
Parsing causes self.index to be populated with GeminiItems.
"""
Simultaneously parse and render a text/gemini document.
Parsing causes self.index to be populated with GeminiItems
representing the links in the document.
Rendering causes self.rendered_file_buffer to contain a rendered
view."""
view of the document.
"""
self.index = []
preformatted = False
@ -744,15 +760,41 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
self.index_index = -1
def _format_geminiitem(self, index, gi, url=False):
"""
Render a link line.
"""
protocol = "" if gi.scheme == "gemini" else " %s" % gi.scheme
line = "[%d%s] %s" % (index, protocol, gi.name or gi.url)
if gi.name and url:
line += " (%s)" % gi.url
return line
def _show_lookup(self, offset=0, end=None, url=False):
for n, gi in enumerate(self.lookup[offset:end]):
print(self._format_geminiitem(n+offset+1, gi, url))
def _get_handler_cmd(self, mimetype):
"""
Given the MIME type of a downloaded item, figure out which program to
open it with.
Returns a string suitable for use with subprocess.call after the '%s'
has been replaced with the name of the file where the downloaded item
was saved.
"""
# Now look for a handler for this mimetype
# Consider exact matches before wildcard matches
exact_matches = []
wildcard_matches = []
for handled_mime, cmd_str in _MIME_HANDLERS.items():
if "*" in handled_mime:
wildcard_matches.append((handled_mime, cmd_str))
else:
exact_matches.append((handled_mime, cmd_str))
for handled_mime, cmd_str in exact_matches + wildcard_matches:
if fnmatch.fnmatch(mimetype, handled_mime):
break
else:
# Use "xdg-open" as a last resort.
cmd_str = "xdg-open %s"
ui_out.debug("Using handler: %s" % cmd_str)
return cmd_str
def _update_history(self, gi):
# Don't duplicate
@ -762,20 +804,34 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
self.history.append(gi)
self.hist_index = len(self.history) - 1
def _log_visit(self, gi, address, size):
if not address:
return
self.log["requests"] += 1
self.log["bytes_recvd"] += size
self.visited_hosts.add(address)
if address[0] == socket.AF_INET:
self.log["ipv4_requests"] += 1
self.log["ipv4_bytes_recvd"] += size
elif address[0] == socket.AF_INET6:
self.log["ipv6_requests"] += 1
self.log["ipv6_bytes_recvd"] += size
def _print_friendly_error(self, err):
if isinstance(err, socket.gaierror):
ui_out.error("ERROR: DNS error!")
elif isinstance(err, ConnectionRefusedError):
ui_out.error("ERROR: Connection refused!")
elif isinstance(err, ConnectionResetError):
ui_out.error("ERROR: Connection reset!")
elif isinstance(err, (TimeoutError, socket.timeout)):
ui_out.error("""ERROR: Connection timed out!
Slow internet connection? Use 'set timeout' to be more patient.""")
elif isinstance(err, FileNotFoundError):
ui_out.error("ERROR: Local file not found!")
elif isinstance(err, IsADirectoryError):
ui_out.error("ERROR: Viewing local directories is not supported!")
else:
ui_out.error("ERROR: " + str(err))
ui_out.debug(traceback.format_exc())
def _show_lookup(self, offset=0, end=None, url=False):
for n, gi in enumerate(self.lookup[offset:end]):
print(self._format_geminiitem(n+offset+1, gi, url))
def _maintain_bookmarks(self):
"""
Update any bookmarks whose URLs we tried to fetch during the current
session and received a permanent redirect for, so they are fetched
directly at the new address in future.
"""
# Nothing to do if no bookmarks exist!
bm_file = os.path.join(self.config_dir, "bookmarks.gmi")
if not os.path.exists(bm_file):
@ -810,6 +866,11 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
# Cmd implementation follows
def default(self, line):
"""
This is called when none of the do_* methods match the user's
input. This is probably either an abbreviated command, or a numeric
index for the lookup table.
"""
if line.strip() == "EOF":
return self.onecmd("quit")
elif line.strip() == "..":
@ -831,16 +892,19 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
print("What?")
return
# Pick out a GeminiItemt
try:
gi = self.lookup[n-1]
except IndexError:
print ("Index too high!")
return
# Go to selected item
self.index_index = n
self._go_to_gi(gi)
### Settings
@restricted
def do_set(self, line):
"""View or set various options."""
@ -898,12 +962,6 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
pass
self.options[option] = value
@restricted
def do_cert(self, line):
"""Manage client certificates"""
print("Managing client certificates")
self.client_cert_manager.manage()
@restricted
def do_handler(self, line):
"""View or set handler commands for different MIME types."""
@ -923,15 +981,11 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
if "%s" not in handler:
print("Are you sure you don't want to pass the filename to the handler?")
def do_abbrevs(self, *args):
"""Print all AV-98 command abbreviations."""
header = "Command Abbreviations:"
self.stdout.write("\n{}\n".format(str(header)))
if self.ruler:
self.stdout.write("{}\n".format(str(self.ruler * len(header))))
for k, v in _ABBREVS.items():
self.stdout.write("{:<7} {}\n".format(k, v))
self.stdout.write("\n")
@restricted
def do_cert(self, line):
"""Manage client certificates"""
print("Managing client certificates")
self.client_cert_manager.manage()
### Stuff for getting around
def do_go(self, line):
@ -961,9 +1015,15 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
"""Go up one directory in the path."""
self._go_to_gi(self.gi.up())
@needs_gi
def do_root(self, *args):
"""Go to root selector of the server hosting current item."""
self._go_to_gi(self.gi.root())
def do_back(self, *args):
"""Go back to the previous gemini item."""
if not self.history or self.hist_index == 0:
print("You are already at the end of your history.")
return
self.hist_index -= 1
gi = self.history[self.hist_index]
@ -972,6 +1032,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
def do_forward(self, *args):
"""Go forward to the next gemini item."""
if not self.history or self.hist_index == len(self.history) - 1:
print("You are already at the end of your history.")
return
self.hist_index += 1
gi = self.history[self.hist_index]
@ -986,10 +1047,10 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
self.lookup = self.index
return self.onecmd(str(self.index_index-1))
@needs_gi
def do_root(self, *args):
"""Go to root selector of the server hosting current item."""
self._go_to_gi(self.gi.root())
def do_gus(self, line):
"""Submit a search query to the Gemini search engine."""
gus = GeminiItem("gemini://geminispace.info/search")
self._go_to_gi(gus.query(line))
def do_tour(self, line):
"""Add index items as waypoints on a tour, which is basically a FIFO
@ -1059,10 +1120,6 @@ Think of it like marks in vi: 'mark a'='ma' and 'go a'=''a'."""
else:
print("Invalid mark, must be one letter")
def do_version(self, line):
"""Display version information."""
print("AV-98 " + _VERSION)
### Stuff that modifies the lookup table
def do_ls(self, line):
"""List contents of current index.
@ -1071,11 +1128,6 @@ Use 'ls -l' to see URLs."""
self._show_lookup(url = "-l" in line)
self.page_index = 0
def do_gus(self, line):
"""Submit a search query to the Gemini search engine."""
gus = GeminiItem("gemini://geminispace.info/search")
self._go_to_gi(gus.query(line))
def do_history(self, *args):
"""Display history."""
self.lookup = self.history
@ -1104,19 +1156,19 @@ Use 'ls -l' to see URLs."""
### Stuff that does something to most recently viewed item
@needs_gi
def do_cat(self, *args):
"""Run most recently visited item through "cat" command."""
"""Run most recently visited item through `cat` command."""
subprocess.call(shlex.split("cat %s" % self.active_rendered_file))
@needs_gi
def do_less(self, *args):
"""Run most recently visited item through "less" command."""
"""Run most recently visited item through `less` command."""
cmd_str = self._get_handler_cmd(self.mime)
cmd_str = cmd_str % self.active_rendered_file
subprocess.call("%s | less -R" % cmd_str, shell=True)
@needs_gi
def do_fold(self, *args):
"""Run most recently visited item through "fold" command."""
"""Run most recently visited item through `fold` command."""
cmd_str = self._get_handler_cmd(self.mime)
cmd_str = cmd_str % self.active_rendered_file
subprocess.call("%s | fold -w 70 -s" % cmd_str, shell=True)
@ -1124,16 +1176,16 @@ Use 'ls -l' to see URLs."""
@restricted
@needs_gi
def do_shell(self, line):
"""'cat' most recently visited item through a shell pipeline."""
"""`cat` most recently visited item through a shell pipeline."""
subprocess.call(("cat %s |" % self.active_rendered_file) + line, shell=True)
@restricted
@needs_gi
def do_save(self, line):
"""Save an item to the filesystem.
'save n filename' saves menu item n to the specified filename.
'save filename' saves the last viewed item to the specified filename.
'save n' saves menu item n to an automagic filename."""
`save n filename` saves menu item n to the specified filename.
`save filename` saves the last viewed item to the specified filename.
`save n` saves menu item n to an automagic filename."""
args = line.strip().split()
# First things first, figure out what our arguments are
@ -1209,6 +1261,7 @@ Use 'ls -l' to see URLs."""
print(self.gi.url)
### Bookmarking stuff
@restricted
@needs_gi
def do_add(self, line):
@ -1242,16 +1295,6 @@ Bookmarks are stored using the 'add' command."""
else:
self._go_to_gi(gi, update_hist=False)
### Help
def do_help(self, arg):
"""ALARM! Recursion detected! ALARM! Prepare to eject!"""
if arg == "!":
print("! is an alias for 'shell'")
elif arg == "?":
print("? is an alias for 'help'")
else:
cmd.Cmd.do_help(self, arg)
### Flight recorder
def do_blackbox(self, *args):
"""Display contents of flight recorder, showing statistics for the
@ -1289,6 +1332,30 @@ current gemini browsing session."""
for key, value in lines:
print(key.ljust(ljust+gap) + str(value).rjust(rjust))
### Help
def do_help(self, arg):
"""ALARM! Recursion detected! ALARM! Prepare to eject!"""
if arg == "!":
print("! is an alias for 'shell'")
elif arg == "?":
print("? is an alias for 'help'")
else:
cmd.Cmd.do_help(self, arg)
def do_abbrevs(self, *args):
"""Print all AV-98 command abbreviations."""
header = "Command Abbreviations:"
self.stdout.write("\n{}\n".format(str(header)))
if self.ruler:
self.stdout.write("{}\n".format(str(self.ruler * len(header))))
for k, v in _ABBREVS.items():
self.stdout.write("{:<7} {}\n".format(k, v))
self.stdout.write("\n")
def do_version(self, line):
"""Display version information."""
print("AV-98 " + _VERSION)
### The end!
def do_quit(self, *args):
"""Exit AV-98."""