real work has started with porting Gemini code

This commit is contained in:
Lionel Dricot 2023-07-09 11:16:19 +02:00
parent 0a385d5334
commit 6d43b13520
2 changed files with 240 additions and 308 deletions

View File

@ -4,6 +4,8 @@ import urllib.parse
import argparse
import requests
import socket
import ssl
from ssl import CertificateError
try:
import chardet
_HAS_CHARDET = True
@ -361,7 +363,7 @@ def _fetch_spartan(url):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or 300
port = url_parts.port or standard_ports["spartan"]
path = url_parts.path or "/"
query = url_parts.query
redirect_url = None
@ -393,6 +395,243 @@ def _fetch_spartan(url):
cache = _fetch_spartan(redirect_url)
return cache
def _fetch_gemini(url):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.active_cert_domains and host not in self.active_cert_domains:
if self.active_is_transient:
print("Permanently delete currently active transient certificate?")
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
raise UserAbortException()
else:
print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
resp = input("Y/N? ")
if resp.strip().lower() in ("n", "no"):
print("Keeping certificate active for {}".format(host))
else:
print("Deactivating certificate.")
self._deactivate_client_cert()
# Suggest reactivating previous certs
if not self.client_certs["active"] and host in self.client_certs:
print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host))
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
self._activate_client_cert(*self.client_certs[host])
else:
print("Remaining unidentified.")
self.client_certs.pop(host)
# In AV-98, this was the _send_request method
#Send a selector to a given host and port.
#Returns the resolved address and binary file with the reply."""
host = host.encode("idna").decode()
# Do DNS resolution
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
# IPv6 addresses or getaddrinfo will complain
family_mask = socket.AF_INET6
elif socket.has_ipv6:
# Accept either IPv4 or IPv6 addresses
family_mask = 0
else:
# IPv4 only
family_mask = socket.AF_INET
addresses = socket.getaddrinfo(host, port, family=family_mask,
type=socket.SOCK_STREAM)
# Sort addresses so IPv6 ones come first
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
## Continuation of send_request
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
# Use CAs or TOFU
if self.options["tls_mode"] == "ca":
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_default_certs()
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
# Load client certificate if needed
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
# Connect to remote host by any address possible
err = None
for address in addresses:
self._debug("Connecting to: " + str(address[4]))
s = socket.socket(address[0], address[1])
if self.sync_only:
timeout = self.options["short_timeout"]
else:
timeout = self.options["timeout"]
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
if sys.version_info.minor >=5:
self._debug("Established {} connection.".format(s.version()))
self._debug("Cipher is: {}.".format(s.cipher()))
# Do TOFU
if self.options["tls_mode"] != "ca":
cert = s.getpeercert(binary_form=True)
self._validate_cert(address[4][0], host, cert)
# Remember that we showed the current cert to this domain...
if self.client_certs["active"]:
self.active_cert_domains.append(host)
self.client_certs[host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(gi.url)
new_netloc = host
if port != 1965:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
self._debug("Sending %s<CRLF>" % url)
s.sendall((url + CRLF).encode("UTF-8"))
mf= s.makefile(mode = "rb")
return address, mf
##
## end of send_request
TODO :address, f = self._send_request(gi)
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
header = urllib.parse.unquote(header.decode("UTF-8"))
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
self._debug("Response header: %s." % header)
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
self.previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
if self.sync_only:
return None
else:
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
user_input = input("> ")
return self._fetch_over_network(query(user_input))
# Redirects
elif status.startswith("3"):
new_gi = GeminiItem(gi.absolutise_url(meta))
if new_gi.url == gi.url:
raise RuntimeError("URL redirects to itself!")
elif new_gi.url in self.previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(self.previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
elif self.sync_only:
follow = self.automatic_choice
# Never follow cross-domain redirects without asking
elif new_gi.host.encode("idna") != gi.host.encode("idna"):
follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# Never follow cross-protocol redirects without asking
elif new_gi.scheme != gi.scheme:
follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# Don't follow *any* redirect without asking if auto-follow is off
elif not self.options["auto_follow_redirects"]:
follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
self._debug("Following redirect to %s." % new_gi.url)
self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
self.previous_redirectors.add(gi.url)
if status == "31":
# Permanent redirect
self.permanent_redirects[gi.url] = new_gi.url
return self._fetch_over_network(new_gi)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
self._handle_cert_request(meta)
return self._fetch_over_network(gi)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
assert status.startswith("2")
mime = meta
# Read the response body over the network
fbody = f.read()
#DEFAULT GEMINIMIME
if mime == "":
mime = "text/gemini; charset=utf-8"
shortmime, mime_options = parse_mime(mime)
if "charset" in mime_options:
try:
codecs.lookup(mime_options["charset"])
except LookupError:
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
#If the encoding is wrong, theres a high probably its UTF-8 with a bad header
mime_options["charset"] = "UTF-8"
if shortmime.startswith("text/"):
#Get the charset and default to UTF-8 in none
encoding = mime_options.get("charset", "UTF-8")
try:
body = fbody.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s\
encoding declared in header!" % encoding)
else:
body = fbody
gi.write_body(body,mime)
return gi
def fetch(url):
url = normalize_url(url)

View File

@ -37,8 +37,6 @@ import shlex
import shutil
import socket
import sqlite3
import ssl
from ssl import CertificateError
import sys
import time
import urllib.parse
@ -631,7 +629,6 @@ class GeminiClient(cmd.Cmd):
self.options = {
"debug" : False,
"beta" : False,
"ipv6" : True,
"timeout" : 600,
"short_timeout" : 5,
"width" : 72,
@ -882,311 +879,7 @@ class GeminiClient(cmd.Cmd):
print("You can use the ! command to specify another handler program or pipeline.")
def _fetch_finger(self,gi,timeout=10):
if not looks_like_url(gi.url):
print("%s is not a valid url" %gi.url)
parsed = urllib.parse.urlparse(gi.url)
host = parsed.hostname
port = parsed.port or standard_ports["finger"]
query = parsed.path.lstrip("/") + "\r\n"
with socket.create_connection((host,port)) as sock:
sock.settimeout(timeout)
sock.send(query.encode())
response = sock.makefile("rb").read().decode("UTF-8")
gi.write_body(response,"text/plain")
return gi
# Copied from reference spartan client by Michael Lazar
def _fetch_spartan(self,gi):
url_parts = urllib.parse.urlparse(gi.url)
host = url_parts.hostname
port = url_parts.port or 300
path = url_parts.path or "/"
query = url_parts.query
redirect_url = None
with socket.create_connection((host,port)) as sock:
if query:
data = urllib.parse.unquote_to_bytes(query)
else:
data = b""
encoded_host = host.encode("idna")
ascii_path = urllib.parse.unquote_to_bytes(path)
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
fp = sock.makefile("rb")
response = fp.readline(4096).decode("ascii").strip("\r\n")
parts = response.split(" ",maxsplit=1)
code,meta = int(parts[0]),parts[1]
if code == 2:
body = fp.read()
if meta.startswith("text"):
body = body.decode("UTF-8")
gi.write_body(body,meta)
elif code == 3:
redirect_url = url_parts._replace(path=meta).geturl()
else:
gi.set_error("Spartan code %s: Error %s"%(code,meta))
if redirect_url:
gi = GeminiItem(redirect_url)
self._fetch_spartan(gi)
return gi
# fetch_over_network will modify with gi.write_body(body,mime)
# before returning the gi
def _fetch_over_network(self, gi):
# Be careful with client certificates!
# Are we crossing a domain boundary?
if self.active_cert_domains and gi.host not in self.active_cert_domains:
if self.active_is_transient:
print("Permanently delete currently active transient certificate?")
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
print("Destroying certificate.")
self._deactivate_client_cert()
else:
print("Staying here.")
raise UserAbortException()
else:
print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
resp = input("Y/N? ")
if resp.strip().lower() in ("n", "no"):
print("Keeping certificate active for {}".format(gi.host))
else:
print("Deactivating certificate.")
self._deactivate_client_cert()
# Suggest reactivating previous certs
if not self.client_certs["active"] and gi.host in self.client_certs:
print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host))
resp = input("Y/N? ")
if resp.strip().lower() in ("y", "yes"):
self._activate_client_cert(*self.client_certs[gi.host])
else:
print("Remaining unidentified.")
self.client_certs.pop(gi.host)
# Is this a local file?
if gi.local:
address, f = None, open(gi.path, "rb")
else:
address, f = self._send_request(gi)
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
header = urllib.parse.unquote(header.decode("UTF-8"))
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
self._debug("Response header: %s." % header)
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
self.previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
if self.sync_only:
return None
else:
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
user_input = input("> ")
return self._fetch_over_network(gi.query(user_input))
# Redirects
elif status.startswith("3"):
new_gi = GeminiItem(gi.absolutise_url(meta))
if new_gi.url == gi.url:
raise RuntimeError("URL redirects to itself!")
elif new_gi.url in self.previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(self.previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
elif self.sync_only:
follow = self.automatic_choice
# Never follow cross-domain redirects without asking
elif new_gi.host.encode("idna") != gi.host.encode("idna"):
follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# Never follow cross-protocol redirects without asking
elif new_gi.scheme != gi.scheme:
follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# Don't follow *any* redirect without asking if auto-follow is off
elif not self.options["auto_follow_redirects"]:
follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
self._debug("Following redirect to %s." % new_gi.url)
self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
self.previous_redirectors.add(gi.url)
if status == "31":
# Permanent redirect
self.permanent_redirects[gi.url] = new_gi.url
return self._fetch_over_network(new_gi)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
self._handle_cert_request(meta)
return self._fetch_over_network(gi)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
assert status.startswith("2")
mime = meta
# Read the response body over the network
fbody = f.read()
#DEFAULT GEMINIMIME
if mime == "":
mime = "text/gemini; charset=utf-8"
shortmime, mime_options = parse_mime(mime)
if "charset" in mime_options:
try:
codecs.lookup(mime_options["charset"])
except LookupError:
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
#If the encoding is wrong, theres a high probably its UTF-8 with a bad header
mime_options["charset"] = "UTF-8"
if shortmime.startswith("text/"):
#Get the charset and default to UTF-8 in none
encoding = mime_options.get("charset", "UTF-8")
try:
body = fbody.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s\
encoding declared in header!" % encoding)
else:
body = fbody
gi.write_body(body,mime)
return gi
def _send_request(self, gi):
"""Send a selector to a given host and port.
Returns the resolved address and binary file with the reply."""
host, port = gi.host, gi.port
host = host.encode("idna").decode()
# Do DNS resolution
addresses = self._get_addresses(host, port)
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
# Use CAs or TOFU
if self.options["tls_mode"] == "ca":
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_default_certs()
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
# Load client certificate if needed
if self.client_certs["active"]:
certfile, keyfile = self.client_certs["active"]
context.load_cert_chain(certfile, keyfile)
# Connect to remote host by any address possible
err = None
for address in addresses:
self._debug("Connecting to: " + str(address[4]))
s = socket.socket(address[0], address[1])
if self.sync_only:
timeout = self.options["short_timeout"]
else:
timeout = self.options["timeout"]
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
if sys.version_info.minor >=5:
self._debug("Established {} connection.".format(s.version()))
self._debug("Cipher is: {}.".format(s.cipher()))
# Do TOFU
if self.options["tls_mode"] != "ca":
cert = s.getpeercert(binary_form=True)
self._validate_cert(address[4][0], host, cert)
# Remember that we showed the current cert to this domain...
if self.client_certs["active"]:
self.active_cert_domains.append(host)
self.client_certs[host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(gi.url)
new_netloc = host
if port != 1965:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
self._debug("Sending %s<CRLF>" % url)
s.sendall((url + CRLF).encode("UTF-8"))
mf= s.makefile(mode = "rb")
return address, mf
def _get_addresses(self, host, port):
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
# IPv6 addresses or getaddrinfo will complain
family_mask = socket.AF_INET6
elif socket.has_ipv6 and self.options["ipv6"]:
# Accept either IPv4 or IPv6 addresses
family_mask = 0
else:
# IPv4 only
family_mask = socket.AF_INET
addresses = socket.getaddrinfo(host, port, family=family_mask,
type=socket.SOCK_STREAM)
# Sort addresses so IPv6 ones come first
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
return addresses
def _handle_cert_request(self, meta):