netcache now works for gemini

This commit is contained in:
Lionel Dricot 2023-07-18 00:39:06 +02:00
parent a7c8ed33d5
commit b745b04f7a
2 changed files with 121 additions and 133 deletions

View File

@ -1,8 +1,11 @@
#!/bin/python
import os
import sys
import urllib.parse
import argparse
import requests
import codecs
import getpass
import socket
import ssl
from ssl import CertificateError
@ -34,6 +37,10 @@ standard_ports = {
}
default_protocol = "gemini"
CRLF = '\r\n'
DEFAULT_TIMEOUT = 10
_MAX_REDIRECTS = 5
def parse_mime(mime):
options = {}
if mime:
@ -343,7 +350,7 @@ def _fetch_gopher(url,timeout=10):
else:
# by default, we should consider Gopher
mime = "text/gopher"
cache = write_body(response,mime)
cache = write_body(url,response,mime)
return cache
def _fetch_finger(url,timeout=10):
@ -384,7 +391,7 @@ def _fetch_spartan(url):
body = fp.read()
if meta.startswith("text"):
body = body.decode("UTF-8")
cache = write_body(body,meta)
cache = write_body(url,body,meta)
elif code == 3:
redirect_url = url_parts._replace(path=meta).geturl()
else:
@ -395,7 +402,7 @@ def _fetch_spartan(url):
cache = _fetch_spartan(redirect_url)
return cache
def _fetch_gemini(url):
def _fetch_gemini(url,options={}):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
@ -404,34 +411,35 @@ def _fetch_gemini(url):
query = url_parts.query
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.active_cert_domains and host not in self.active_cert_domains:
if self.active_is_transient:
print("Permanently delete currently active transient certificate?")
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
raise UserAbortException()
else:
print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
resp = input("Y/N? ")
if resp.strip().lower() in ("n", "no"):
print("Keeping certificate active for {}".format(host))
else:
print("Deactivating certificate.")
self._deactivate_client_cert()
# Suggest reactivating previous certs
if not self.client_certs["active"] and host in self.client_certs:
print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host))
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
self._activate_client_cert(*self.client_certs[host])
else:
print("Remaining unidentified.")
self.client_certs.pop(host)
# TODO :code should be adapted to netcache
# if self.active_cert_domains and host not in self.active_cert_domains:
# if self.active_is_transient:
# print("Permanently delete currently active transient certificate?")
# resp = input("Y/N? ")
# if resp.strip().lower() in ("y", "yes"):
# print("Destroying certificate.")
# self._deactivate_client_cert()
# else:
# print("Staying here.")
# raise UserAbortException()
# else:
# print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
# resp = input("Y/N? ")
# if resp.strip().lower() in ("n", "no"):
# print("Keeping certificate active for {}".format(host))
# else:
# print("Deactivating certificate.")
# self._deactivate_client_cert()
#
# # Suggest reactivating previous certs
# if not self.client_certs["active"] and host in self.client_certs:
# print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host))
# resp = input("Y/N? ")
# if resp.strip().lower() in ("y", "yes"):
# self._activate_client_cert(*self.client_certs[host])
# else:
# print("Remaining unidentified.")
# self.client_certs.pop(host)
# In AV-98, this was the _send_request method
#Send a selector to a given host and port.
@ -457,16 +465,6 @@ def _fetch_gemini(url):
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
# Use CAs or TOFU
#TODO : should we care about this options?
#if self.options["tls_mode"] == "ca":
# context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
# context.load_default_certs()
#else:
# context.check_hostname = False
# context.verify_mode = ssl.CERT_NONE
context.check_hostname=False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
@ -487,57 +485,50 @@ def _fetch_gemini(url):
# Rely on the server to only support sensible things, I guess...
pass
#TODO: Im here in the refactor
# Load client certificate if needed
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
#TODO: certificate handling to refactor
# # Load client certificate if needed
# if self.client_certs["active"]:
# certfile, keyfile = self.client_certs["active"]
# context.load_cert_chain(certfile, keyfile)
# Connect to remote host by any address possible
err = None
for address in addresses:
self._debug("Connecting to: " + str(address[4]))
s = socket.socket(address[0], address[1])
if self.sync_only:
timeout = self.options["short_timeout"]
else:
timeout = self.options["timeout"]
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
err = None
for address in addresses:
s = socket.socket(address[0], address[1])
if "timeout" in options:
timeout = options["timeout"]
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
if sys.version_info.minor >=5:
self._debug("Established {} connection.".format(s.version()))
self._debug("Cipher is: {}.".format(s.cipher()))
# Do TOFU
if self.options["tls_mode"] != "ca":
cert = s.getpeercert(binary_form=True)
self._validate_cert(address[4][0], host, cert)
# Remember that we showed the current cert to this domain...
if self.client_certs["active"]:
self.active_cert_domains.append(host)
self.client_certs[host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(gi.url)
new_netloc = host
if port != 1965:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
self._debug("Sending %s<CRLF>" % url)
s.sendall((url + CRLF).encode("UTF-8"))
mf= s.makefile(mode = "rb")
return address, mf
##
## end of send_request
TODO :address, f = self._send_request(gi)
timeout = DEFAULT_TIMEOUT
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
# Do TOFU
cert = s.getpeercert(binary_form=True)
# TODO: another cert handling to refactor
# Remember that we showed the current cert to this domain...
# self._validate_cert(address[4][0], host, cert)
# if self.client_certs["active"]:
# self.active_cert_domains.append(host)
# self.client_certs[host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
new_netloc = host
if port != standard_ports["gemini"]:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
s.sendall((url + CRLF).encode("UTF-8"))
f= s.makefile(mode = "rb")
## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
@ -545,7 +536,6 @@ def _fetch_gemini(url):
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
self._debug("Response header: %s." % header)
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
@ -553,62 +543,63 @@ def _fetch_gemini(url):
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
self.previous_redirectors = set()
previous_redirectors = set()
#TODO FIXME
else:
#we set a previous_redirectors anyway because refactoring in progress
previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
if self.sync_only:
return None
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
user_input = input("> ")
return self._fetch_over_network(query(user_input))
user_input = input("> ")
return _fetch_gemini(query(user_input))
# Redirects
elif status.startswith("3"):
new_gi = GeminiItem(gi.absolutise_url(meta))
if new_gi.url == gi.url:
newurl = urllib.parse.urljoin(url,meta)
if newurl == url:
raise RuntimeError("URL redirects to itself!")
elif new_gi.url in self.previous_redirectors:
elif newurl in previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(self.previous_redirectors) == _MAX_REDIRECTS:
elif len(previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
elif self.sync_only:
follow = self.automatic_choice
# Never follow cross-domain redirects without asking
elif new_gi.host.encode("idna") != gi.host.encode("idna"):
follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# Never follow cross-protocol redirects without asking
elif new_gi.scheme != gi.scheme:
follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# Don't follow *any* redirect without asking if auto-follow is off
elif not self.options["auto_follow_redirects"]:
follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# Otherwise, follow away
# TODO: redirections handling should be refactored
# elif "interactive" in options and not options["interactive"]:
# follow = self.automatic_choice
# # Never follow cross-domain redirects without asking
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# # Never follow cross-protocol redirects without asking
# elif new_gi.scheme != gi.scheme:
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# # Don't follow *any* redirect without asking if auto-follow is off
# elif not self.options["auto_follow_redirects"]:
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# # Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
self._debug("Following redirect to %s." % new_gi.url)
self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
self.previous_redirectors.add(gi.url)
if status == "31":
# Permanent redirect
self.permanent_redirects[gi.url] = new_gi.url
return self._fetch_over_network(new_gi)
previous_redirectors.add(url)
# if status == "31":
# # Permanent redirect
# self.permanent_redirects[gi.url] = new_gi.url
return _fetch_gemini(newurl)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
self._handle_cert_request(meta)
return self._fetch_over_network(gi)
# elif status.startswith("6"):
# self._handle_cert_request(meta)
# return self._fetch_over_network(gi)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
print("status :%s"%status)
assert status.startswith("2")
mime = meta
# Read the response body over the network
@ -634,14 +625,14 @@ def _fetch_gemini(url):
encoding declared in header!" % encoding)
else:
body = fbody
gi.write_body(body,mime)
return gi
cache = write_body(url,body,mime)
return cache
def fetch(url):
url = normalize_url(url)
path=None
if "://" in url
if "://" in url:
scheme = url.split("://")[0]
if scheme not in standard_ports:
print("%s is not a supported protocol"%scheme)
@ -651,6 +642,8 @@ def fetch(url):
path=_fetch_gopher(url)
elif scheme == "finger":
path=_fetch_finger(url)
elif scheme == "gemini":
patch=_fetch_gemini(url)
else:
print("scheme %s not implemented yet")
else:

View File

@ -21,10 +21,8 @@ __version__ = "1.9.2"
import argparse
import cmd
import codecs
import datetime
import fnmatch
import getpass
import glob
import hashlib
import io
@ -89,7 +87,6 @@ if os.path.exists(_old_config):
#if no XDG .local/share and not XDG .config, we use the old config
if not os.path.exists(data_home) and os.path.exists(_old_config):
_DATA_DIR = _CONFIG_DIR
_MAX_REDIRECTS = 5
_MAX_CACHE_SIZE = 10
_MAX_CACHE_AGE_SECS = 180
@ -548,7 +545,6 @@ class GeminiItem():
def to_map_line(self):
return "=> {} {}\n".format(self.url_mode(), self.get_page_title())
CRLF = '\r\n'
# Cheap and cheerful URL detector
def looks_like_url(word):
@ -611,7 +607,6 @@ class GeminiClient(cmd.Cmd):
self.marks = {}
self.page_index = 0
self.permanent_redirects = {}
self.previous_redirectors = set()
# Sync-only mode is restriced by design
self.visited_hosts = set()
self.offline_only = False