offpunk/netcache.py

738 lines
28 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/python
import os
import sys
import urllib.parse
import argparse
import requests
import codecs
import getpass
import socket
import ssl
from ssl import CertificateError
try:
import chardet
_HAS_CHARDET = True
except ModuleNotFoundError:
_HAS_CHARDET = False
_home = os.path.expanduser('~')
cache_home = os.environ.get('XDG_CACHE_HOME') or\
os.path.join(_home,'.cache')
#_CACHE_PATH = os.path.join(cache_home,"offpunk/")
#Debug:
_CACHE_PATH = "/home/ploum/dev/netcache/"
_DATA_DIR = "/home/ploum/dev/netcache/"
if not os.path.exists(_CACHE_PATH):
print("Creating cache directory {}".format(_CACHE_PATH))
os.makedirs(_CACHE_PATH)
# This list is also used as a list of supported protocols
standard_ports = {
"gemini" : 1965,
"gopher" : 70,
"finger" : 79,
"http" : 80,
"https" : 443,
"spartan": 300,
}
default_protocol = "gemini"
CRLF = '\r\n'
DEFAULT_TIMEOUT = 10
_MAX_REDIRECTS = 5
# monkey-patch Gemini support in urllib.parse
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
urllib.parse.uses_relative.append("gemini")
urllib.parse.uses_netloc.append("gemini")
urllib.parse.uses_relative.append("spartan")
urllib.parse.uses_netloc.append("spartan")
def parse_mime(mime):
options = {}
if mime:
if ";" in mime:
splited = mime.split(";",maxsplit=1)
mime = splited[0]
if len(splited) >= 1:
options_list = splited[1].split()
for o in options_list:
spl = o.split("=",maxsplit=1)
if len(spl) > 0:
options[spl[0]] = spl[1]
return mime, options
def normalize_url(url):
if "://" not in url and ("./" not in url and url[0] != "/"):
if not url.startswith("mailto:"):
url = "gemini://" + url
return url
def cache_last_modified(url):
path = get_cache_path(url)
if path:
return os.path.getmtime(path)
elif self.local:
return 0
else:
print("ERROR :NOCACHE in cache_last_modified")
return None
def is_cache_valid(url,validity=0):
# Validity is the acceptable time for
# a cache to be valid (in seconds)
# If 0, then any cache is considered as valid
# (use validity = 1 if you want to refresh everything)
cache = get_cache_path(url)
# TODO FIXME : detect if we are local
#if self.local:
# return os.path.exists(cache)
if cache :
# If path is too long, we always return True to avoid
# fetching it.
if len(cache) > 259:
print("We return False because path is too long")
return False
if os.path.exists(cache) and not os.path.isdir(cache):
if validity > 0 :
last_modification = cache_last_modified(url)
now = time.time()
age = now - last_modification
return age < validity
else:
return True
else:
#Cache has not been build
return False
else:
#Theres not even a cache!
return False
def get_cache_path(url):
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
# In that case, we need to reconstruct it
#First, we parse the URL
parsed = urllib.parse.urlparse(url)
if url[0] == "/" or url.startswith("./"):
scheme = "file"
elif parsed.scheme:
scheme = parsed.scheme
else:
scheme = default_protocol
if scheme in ["file","mailto","list"]:
local = True
host = ""
port = None
# file:// is 7 char
if url.startswith("file://"):
path = self.url[7:]
elif scheme == "mailto":
path = parsed.path
elif url.startswith("list://"):
listdir = os.path.join(_DATA_DIR,"lists")
listname = url[7:].lstrip("/")
if listname in [""]:
name = "My Lists"
path = listdir
else:
name = listname
path = os.path.join(listdir, "%s.gmi"%listname)
else:
path = url
else:
local = False
# Convert unicode hostname to punycode using idna RFC3490
host = parsed.hostname #.encode("idna").decode()
port = parsed.port or standard_ports.get(scheme, 0)
# special gopher selector case
if scheme == "gopher":
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
path = parsed.path[2:]
else:
itemtype = "1"
path = ""
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9","g","I","s"):
mime = "binary"
else:
mime = "text/gopher"
else:
path = parsed.path
if parsed.query:
# we dont add the query if path is too long because path above 260 char
# are not supported and crash python.
# Also, very long query are usually useless stuff
if len(path+parsed.query) < 258:
path += "/" + parsed.query
# Now, we have a partial path. Lets make it full path.
if local:
cache_path = path
else:
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
#Theres an OSlimitation of 260 characters per path.
#We will thus cut the path enough to add the index afterward
cache_path = cache_path[:249]
# FIXME : this is a gross hack to give a name to
# index files. This will break if the index is not
# index.gmi. I dont know how to know the real name
# of the file. But first, we need to ensure that the domain name
# finish by "/". Else, the cache will create a file, not a folder.
if scheme.startswith("http"):
index = "index.html"
elif scheme == "finger":
index = "index.txt"
elif scheme == "gopher":
index = "gophermap"
else:
index = "index.gmi"
if path == "" or os.path.isdir(cache_path):
if not cache_path.endswith("/"):
cache_path += "/"
if not url.endswith("/"):
url += "/"
if cache_path.endswith("/"):
cache_path += index
#sometimes, the index itself is a dir
#like when folder/index.gmi?param has been created
#and we try to access folder
if os.path.isdir(cache_path):
cache_path += "/" + index
return cache_path
def write_body(url,body,mime=None):
## body is a copy of the raw gemtext
## Write_body() also create the cache !
#DEFAULT GEMINIMIME
mime, options = parse_mime(mime)
cache_path = get_cache_path(url)
if cache_path:
if mime and mime.startswith("text/"):
mode = "w"
else:
mode = "wb"
cache_dir = os.path.dirname(cache_path)
# If the subdirectory already exists as a file (not a folder)
# We remove it (happens when accessing URL/subfolder before
# URL/subfolder/file.gmi.
# This causes loss of data in the cache
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
# If the subdirectory doesnt exist, we recursively try to find one
# until it exists to avoid a file blocking the creation of folders
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir,exist_ok=True)
with open(cache_path, mode=mode) as f:
f.write(body)
f.close()
return cache_path
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,**kwargs):
def set_error(item,length,max_size):
err = "Size of %s is %s Mo\n"%(item.url,length)
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
err += "To retrieve this content anyway, type 'reload'."
item.set_error(err)
return item
header = {}
header["User-Agent"] = "Netcache"
parsed = urllib.parse.urlparse(url)
# Code to translate URLs to better frontends (think twitter.com -> nitter)
#if options["redirects"]:
# netloc = parsed.netloc
# if netloc.startswith("www."):
# netloc = netloc[4:]
# if netloc in self.redirects:
# if self.redirects[netloc] == "blocked":
# text = "This website has been blocked.\n"
# text += "Use the redirect command to unblock it."
# gi.write_body(text,"text/gemini")
# return gi
# else:
# parsed = parsed._replace(netloc = self.redirects[netloc])
url = urllib.parse.urlunparse(parsed)
with requests.get(url,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
#print("This is header for %s"%gi.url)
#print(response.headers)
if "content-type" in response.headers:
mime = response.headers['content-type']
else:
mime = None
if "content-length" in response.headers:
length = int(response.headers['content-length'])
else:
length = 0
if max_size and length > max_size:
response.close()
return set_error(gi,str(length/100),max_size)
elif max_size and length == 0:
body = b''
downloaded = 0
for r in response.iter_content():
body += r
#We divide max_size for streamed content
#in order to catch them faster
size = sys.getsizeof(body)
max = max_size/2
current = round(size*100/max,1)
if current > downloaded:
downloaded = current
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
if size > max_size/2:
response.close()
return set_error(gi,"streaming",max_size)
response.close()
else:
body = response.content
response.close()
if mime and "text/" in mime:
body = body.decode("UTF-8","replace")
cache = write_body(url,body,mime)
return cache
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
parsed =urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or 70
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
selector = parsed.path[2:]
else:
itemtype = "1"
selector = ""
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
s = socket.create_connection((host,port))
for address in addresses:
self._debug("Connecting to: " + str(address[4]))
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
try:
s.connect(address[4])
break
except OSError as e:
err = e
if parsed.query:
request = selector + "\t" + parsed.query
else:
request = selector
request += "\r\n"
s.sendall(request.encode("UTF-8"))
response = s.makefile("rb").read()
# Transcode response into UTF-8
#if itemtype in ("0","1","h"):
if not itemtype in ("9","g","I","s"):
# Try most common encodings
for encoding in ("UTF-8", "ISO-8859-1"):
try:
response = response.decode("UTF-8")
break
except UnicodeDecodeError:
pass
else:
# try to find encoding
if _HAS_CHARDET:
detected = chardet.detect(response)
response = response.decode(detected["encoding"])
else:
raise UnicodeDecodeError
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9","g","I","s"):
mime = None
else:
# by default, we should consider Gopher
mime = "text/gopher"
cache = write_body(url,response,mime)
return cache
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or standard_ports["finger"]
query = parsed.path.lstrip("/") + "\r\n"
with socket.create_connection((host,port)) as sock:
sock.settimeout(timeout)
sock.send(query.encode())
response = sock.makefile("rb").read().decode("UTF-8")
cache = write_body(response,"text/plain")
return cache
# Originally copied from reference spartan client by Michael Lazar
def _fetch_spartan(url,**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["spartan"]
path = url_parts.path or "/"
query = url_parts.query
redirect_url = None
with socket.create_connection((host,port)) as sock:
if query:
data = urllib.parse.unquote_to_bytes(query)
else:
data = b""
encoded_host = host.encode("idna")
ascii_path = urllib.parse.unquote_to_bytes(path)
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
fp = sock.makefile("rb")
response = fp.readline(4096).decode("ascii").strip("\r\n")
parts = response.split(" ",maxsplit=1)
code,meta = int(parts[0]),parts[1]
if code == 2:
body = fp.read()
if meta.startswith("text"):
body = body.decode("UTF-8")
cache = write_body(url,body,meta)
elif code == 3:
redirect_url = url_parts._replace(path=meta).geturl()
else:
#TODO:set error!
#gi.set_error("Spartan code %s: Error %s"%(code,meta))
print("TODO set_error")
if redirect_url:
cache = _fetch_spartan(redirect_url)
return cache
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
# Be careful with client certificates!
# Are we crossing a domain boundary?
# TODO :code should be adapted to netcache
# if self.active_cert_domains and host not in self.active_cert_domains:
# if self.active_is_transient:
# print("Permanently delete currently active transient certificate?")
# resp = input("Y/N? ")
# if resp.strip().lower() in ("y", "yes"):
# print("Destroying certificate.")
# self._deactivate_client_cert()
# else:
# print("Staying here.")
# raise UserAbortException()
# else:
# print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
# resp = input("Y/N? ")
# if resp.strip().lower() in ("n", "no"):
# print("Keeping certificate active for {}".format(host))
# else:
# print("Deactivating certificate.")
# self._deactivate_client_cert()
#
# # Suggest reactivating previous certs
# if not self.client_certs["active"] and host in self.client_certs:
# print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host))
# resp = input("Y/N? ")
# if resp.strip().lower() in ("y", "yes"):
# self._activate_client_cert(*self.client_certs[host])
# else:
# print("Remaining unidentified.")
# self.client_certs.pop(host)
# In AV-98, this was the _send_request method
#Send a selector to a given host and port.
#Returns the resolved address and binary file with the reply."""
host = host.encode("idna").decode()
# Do DNS resolution
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
# IPv6 addresses or getaddrinfo will complain
family_mask = socket.AF_INET6
elif socket.has_ipv6:
# Accept either IPv4 or IPv6 addresses
family_mask = 0
else:
# IPv4 only
family_mask = socket.AF_INET
addresses = socket.getaddrinfo(host, port, family=family_mask,
type=socket.SOCK_STREAM)
# Sort addresses so IPv6 ones come first
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
## Continuation of send_request
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
context.check_hostname=False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
#TODO: certificate handling to refactor
# # Load client certificate if needed
# if self.client_certs["active"]:
# certfile, keyfile = self.client_certs["active"]
# context.load_cert_chain(certfile, keyfile)
# Connect to remote host by any address possible
err = None
for address in addresses:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
# Do TOFU
cert = s.getpeercert(binary_form=True)
# TODO: another cert handling to refactor
# Remember that we showed the current cert to this domain...
# self._validate_cert(address[4][0], host, cert)
# if self.client_certs["active"]:
# self.active_cert_domains.append(host)
# self.client_certs[host] = self.client_certs["active"]
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
new_netloc = host
if port != standard_ports["gemini"]:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
s.sendall((url + CRLF).encode("UTF-8"))
f= s.makefile(mode = "rb")
## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
header = urllib.parse.unquote(header.decode("UTF-8"))
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
previous_redirectors = set()
#TODO FIXME
else:
#we set a previous_redirectors anyway because refactoring in progress
previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
user_input = input("> ")
return _fetch_gemini(query(user_input))
# Redirects
elif status.startswith("3"):
newurl = urllib.parse.urljoin(url,meta)
if newurl == url:
raise RuntimeError("URL redirects to itself!")
elif newurl in previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
# TODO: redirections handling should be refactored
# elif "interactive" in options and not options["interactive"]:
# follow = self.automatic_choice
# # Never follow cross-domain redirects without asking
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# # Never follow cross-protocol redirects without asking
# elif new_gi.scheme != gi.scheme:
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# # Don't follow *any* redirect without asking if auto-follow is off
# elif not self.options["auto_follow_redirects"]:
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# # Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
previous_redirectors.add(url)
# if status == "31":
# # Permanent redirect
# self.permanent_redirects[gi.url] = new_gi.url
return _fetch_gemini(newurl)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
# elif status.startswith("6"):
# self._handle_cert_request(meta)
# return self._fetch_over_network(gi)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
print("status :%s"%status)
assert status.startswith("2")
mime = meta
# Read the response body over the network
fbody = f.read()
#DEFAULT GEMINIMIME
if mime == "":
mime = "text/gemini; charset=utf-8"
shortmime, mime_options = parse_mime(mime)
if "charset" in mime_options:
try:
codecs.lookup(mime_options["charset"])
except LookupError:
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
#If the encoding is wrong, theres a high probably its UTF-8 with a bad header
mime_options["charset"] = "UTF-8"
if shortmime.startswith("text/"):
#Get the charset and default to UTF-8 in none
encoding = mime_options.get("charset", "UTF-8")
try:
body = fbody.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s\
encoding declared in header!" % encoding)
else:
body = fbody
cache = write_body(url,body,mime)
return cache
def fetch(url,**kwargs):
url = normalize_url(url)
path=None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
if "://" in url:
try:
scheme = url.split("://")[0]
if scheme not in standard_ports:
print("%s is not a supported protocol"%scheme)
elif scheme in ("http","https"):
path=_fetch_http(url,**kwargs)
elif scheme == "gopher":
path=_fetch_gopher(url,**kwargs)
elif scheme == "finger":
path=_fetch_finger(url,**kwargs)
elif scheme == "gemini":
patch=_fetch_gemini(url,**kwargs)
else:
print("scheme %s not implemented yet")
except UserAbortException:
return
except Exception as err:
#TODO return the error !
#gi.set_error(err)
# Print an error message
# we fail silently when sync_only
if isinstance(err, socket.gaierror):
if print_error:
print("ERROR: DNS error!")
elif isinstance(err, ConnectionRefusedError):
if print_error:
print("ERROR1: Connection refused!")
elif isinstance(err, ConnectionResetError):
if print_error:
print("ERROR2: Connection reset!")
elif isinstance(err, (TimeoutError, socket.timeout)):
if print_error:
print("""ERROR3: Connection timed out!
Slow internet connection? Use 'set timeout' to be more patient.""")
elif isinstance(err, FileExistsError):
if print_error:
print("""ERROR5: Trying to create a directory which already exists
in the cache : """)
print(err)
elif _DO_HTTP and isinstance(err,requests.exceptions.SSLError):
if print_error:
print("""ERROR6: Bad SSL certificate:\n""")
print(err)
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
print("""set accept_bad_ssl_certificates True""")
else:
if print_error:
import traceback
print("ERROR4: " + str(type(err)) + " : " + str(err))
print("\n" + str(err.with_traceback(None)))
print(traceback.format_exc())
return
else:
print("Not a supproted URL")
return path
def main():
# Parse arguments
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--path", action="store_true",
help="return path to the cache instead of the content of the cache")
parser.add_argument("--offline", action="store_true",
help="Do not attempt to download, return cached version or error")
parser.add_argument("--max-size", type=int,
help="Cancel download of items above that size (value in Mb).")
parser.add_argument("--timeout", type=int,
help="Time to wait before cancelling connection (in second).")
# No argument: write help
parser.add_argument('url', metavar='URL', nargs='*',
help='download URL and returns the content or the path to a cached version')
# arg = URL: download and returns cached URI
# --cache-validity : do not download if cache is valid
# --validity : returns the date of the cached version, Null if no version
# --force-download : download and replace cache, even if valid
args = parser.parse_args()
param = {}
for u in args.url:
if args.offline:
path = get_cache_path(u)
else:
print("Download URL: %s" %u)
path = fetch(u,max_size=args.max_size,timeout=args.timeout)
if args.path:
print(path)
else:
with open(path,"r") as f:
print(f.read())
f.close()
if __name__== '__main__':
main()