forked from solderpunk/AV-98
685 lines
25 KiB
Python
Executable File
685 lines
25 KiB
Python
Executable File
#!/bin/python
|
||
import os
|
||
import urllib.parse
|
||
import argparse
|
||
import requests
|
||
import socket
|
||
import ssl
|
||
from ssl import CertificateError
|
||
try:
|
||
import chardet
|
||
_HAS_CHARDET = True
|
||
except ModuleNotFoundError:
|
||
_HAS_CHARDET = False
|
||
|
||
_home = os.path.expanduser('~')
|
||
cache_home = os.environ.get('XDG_CACHE_HOME') or\
|
||
os.path.join(_home,'.cache')
|
||
#_CACHE_PATH = os.path.join(cache_home,"offpunk/")
|
||
#Debug:
|
||
_CACHE_PATH = "/home/ploum/dev/netcache/"
|
||
|
||
if not os.path.exists(_CACHE_PATH):
|
||
print("Creating cache directory {}".format(_CACHE_PATH))
|
||
os.makedirs(_CACHE_PATH)
|
||
|
||
# This list is also used as a list of supported protocols
|
||
standard_ports = {
|
||
"gemini" : 1965,
|
||
"gopher" : 70,
|
||
"finger" : 79,
|
||
"http" : 80,
|
||
"https" : 443,
|
||
"spartan": 300,
|
||
}
|
||
default_protocol = "gemini"
|
||
|
||
def parse_mime(mime):
|
||
options = {}
|
||
if mime:
|
||
if ";" in mime:
|
||
splited = mime.split(";",maxsplit=1)
|
||
mime = splited[0]
|
||
if len(splited) >= 1:
|
||
options_list = splited[1].split()
|
||
for o in options_list:
|
||
spl = o.split("=",maxsplit=1)
|
||
if len(spl) > 0:
|
||
options[spl[0]] = spl[1]
|
||
return mime, options
|
||
|
||
def normalize_url(url):
|
||
if "://" not in url and ("./" not in url and url[0] != "/"):
|
||
if not url.startswith("mailto:"):
|
||
url = "gemini://" + url
|
||
return url
|
||
|
||
|
||
def cache_last_modified(url):
|
||
path = get_cache_path(url)
|
||
if path:
|
||
return os.path.getmtime(path)
|
||
elif self.local:
|
||
return 0
|
||
else:
|
||
print("ERROR : NO CACHE in cache_last_modified")
|
||
return None
|
||
|
||
def is_cache_valid(url,validity=0):
|
||
# Validity is the acceptable time for
|
||
# a cache to be valid (in seconds)
|
||
# If 0, then any cache is considered as valid
|
||
# (use validity = 1 if you want to refresh everything)
|
||
cache = get_cache_path(url)
|
||
# TODO FIXME : detect if we are local
|
||
#if self.local:
|
||
# return os.path.exists(cache)
|
||
if cache :
|
||
# If path is too long, we always return True to avoid
|
||
# fetching it.
|
||
if len(cache) > 259:
|
||
print("We return False because path is too long")
|
||
return False
|
||
if os.path.exists(cache) and not os.path.isdir(cache):
|
||
if validity > 0 :
|
||
last_modification = cache_last_modified(url)
|
||
now = time.time()
|
||
age = now - last_modification
|
||
return age < validity
|
||
else:
|
||
return True
|
||
else:
|
||
#Cache has not been build
|
||
return False
|
||
else:
|
||
#There’s not even a cache!
|
||
return False
|
||
|
||
|
||
|
||
def get_cache_path(url):
|
||
#First, we parse the URL
|
||
parsed = urllib.parse.urlparse(url)
|
||
if url[0] == "/" or url.startswith("./"):
|
||
scheme = "file"
|
||
elif parsed.scheme:
|
||
scheme = parsed.scheme
|
||
else:
|
||
scheme = default_protocol
|
||
if scheme in ["file","mailto","list"]:
|
||
local = True
|
||
host = ""
|
||
port = None
|
||
# file:// is 7 char
|
||
if url.startswith("file://"):
|
||
path = self.url[7:]
|
||
elif scheme == "mailto":
|
||
path = parsed.path
|
||
elif url.startswith("list://"):
|
||
listdir = os.path.join(_DATA_DIR,"lists")
|
||
listname = url[7:].lstrip("/")
|
||
if listname in [""]:
|
||
name = "My Lists"
|
||
path = listdir
|
||
else:
|
||
name = listname
|
||
path = os.path.join(listdir, "%s.gmi"%listname)
|
||
else:
|
||
path = url
|
||
else:
|
||
local = False
|
||
# Convert unicode hostname to punycode using idna RFC3490
|
||
host = parsed.hostname #.encode("idna").decode()
|
||
port = parsed.port or standard_ports.get(scheme, 0)
|
||
# special gopher selector case
|
||
if scheme == "gopher":
|
||
if len(parsed.path) >= 2:
|
||
itemtype = parsed.path[1]
|
||
path = parsed.path[2:]
|
||
else:
|
||
itemtype = "1"
|
||
path = ""
|
||
if itemtype == "0":
|
||
mime = "text/gemini"
|
||
elif itemtype == "1":
|
||
mime = "text/gopher"
|
||
elif itemtype == "h":
|
||
mime = "text/html"
|
||
elif itemtype in ("9","g","I","s"):
|
||
mime = "binary"
|
||
else:
|
||
mime = "text/gopher"
|
||
else:
|
||
path = parsed.path
|
||
if parsed.query:
|
||
# we don’t add the query if path is too long because path above 260 char
|
||
# are not supported and crash python.
|
||
# Also, very long query are usually useless stuff
|
||
if len(path+parsed.query) < 258:
|
||
path += "/" + parsed.query
|
||
|
||
# Now, we have a partial path. Let’s make it full path.
|
||
if local:
|
||
cache_path = path
|
||
else:
|
||
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
|
||
#There’s an OS limitation of 260 characters per path.
|
||
#We will thus cut the path enough to add the index afterward
|
||
cache_path = cache_path[:249]
|
||
# FIXME : this is a gross hack to give a name to
|
||
# index files. This will break if the index is not
|
||
# index.gmi. I don’t know how to know the real name
|
||
# of the file. But first, we need to ensure that the domain name
|
||
# finish by "/". Else, the cache will create a file, not a folder.
|
||
if scheme.startswith("http"):
|
||
index = "index.html"
|
||
elif scheme == "finger":
|
||
index = "index.txt"
|
||
elif scheme == "gopher":
|
||
index = "gophermap"
|
||
else:
|
||
index = "index.gmi"
|
||
if path == "" or os.path.isdir(cache_path):
|
||
if not cache_path.endswith("/"):
|
||
cache_path += "/"
|
||
if not url.endswith("/"):
|
||
url += "/"
|
||
if cache_path.endswith("/"):
|
||
cache_path += index
|
||
#sometimes, the index itself is a dir
|
||
#like when folder/index.gmi?param has been created
|
||
#and we try to access folder
|
||
if os.path.isdir(cache_path):
|
||
cache_path += "/" + index
|
||
return cache_path
|
||
|
||
def write_body(url,body,mime=None):
|
||
## body is a copy of the raw gemtext
|
||
## Write_body() also create the cache !
|
||
# DEFAULT GEMINI MIME
|
||
mime, options = parse_mime(mime)
|
||
cache_path = get_cache_path(url)
|
||
if cache_path:
|
||
if mime and mime.startswith("text/"):
|
||
mode = "w"
|
||
else:
|
||
mode = "wb"
|
||
cache_dir = os.path.dirname(cache_path)
|
||
# If the subdirectory already exists as a file (not a folder)
|
||
# We remove it (happens when accessing URL/subfolder before
|
||
# URL/subfolder/file.gmi.
|
||
# This causes loss of data in the cache
|
||
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
|
||
# If the subdirectory doesn’t exist, we recursively try to find one
|
||
# until it exists to avoid a file blocking the creation of folders
|
||
root_dir = cache_dir
|
||
while not os.path.exists(root_dir):
|
||
root_dir = os.path.dirname(root_dir)
|
||
if os.path.isfile(root_dir):
|
||
os.remove(root_dir)
|
||
os.makedirs(cache_dir,exist_ok=True)
|
||
with open(cache_path, mode=mode) as f:
|
||
f.write(body)
|
||
f.close()
|
||
return cache_path
|
||
|
||
def _fetch_http(url,max_length=None):
|
||
def set_error(item,length,max_length):
|
||
err = "Size of %s is %s Mo\n"%(item.url,length)
|
||
err += "Offpunk only download automatically content under %s Mo\n" %(max_length/1000000)
|
||
err += "To retrieve this content anyway, type 'reload'."
|
||
item.set_error(err)
|
||
return item
|
||
header = {}
|
||
header["User-Agent"] = "Netcache"
|
||
parsed = urllib.parse.urlparse(url)
|
||
# Code to translate URLs to better frontends (think twitter.com -> nitter)
|
||
#if options["redirects"]:
|
||
# netloc = parsed.netloc
|
||
# if netloc.startswith("www."):
|
||
# netloc = netloc[4:]
|
||
# if netloc in self.redirects:
|
||
# if self.redirects[netloc] == "blocked":
|
||
# text = "This website has been blocked.\n"
|
||
# text += "Use the redirect command to unblock it."
|
||
# gi.write_body(text,"text/gemini")
|
||
# return gi
|
||
# else:
|
||
# parsed = parsed._replace(netloc = self.redirects[netloc])
|
||
url = urllib.parse.urlunparse(parsed)
|
||
with requests.get(url,headers=header, stream=True,timeout=5) as response:
|
||
#print("This is header for %s"%gi.url)
|
||
#print(response.headers)
|
||
if "content-type" in response.headers:
|
||
mime = response.headers['content-type']
|
||
else:
|
||
mime = None
|
||
if "content-length" in response.headers:
|
||
length = int(response.headers['content-length'])
|
||
else:
|
||
length = 0
|
||
if max_length and length > max_length:
|
||
response.close()
|
||
return set_error(gi,str(length/1000000),max_length)
|
||
elif max_length and length == 0:
|
||
body = b''
|
||
downloaded = 0
|
||
for r in response.iter_content():
|
||
body += r
|
||
#We divide max_size for streamed content
|
||
#in order to catch them faster
|
||
size = sys.getsizeof(body)
|
||
max = max_length/2
|
||
current = round(size*100/max,1)
|
||
if current > downloaded:
|
||
downloaded = current
|
||
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
|
||
#print("size: %s (%s\% of maxlenght)"%(size,size/max_length))
|
||
if size > max_length/2:
|
||
response.close()
|
||
return set_error(gi,"streaming",max_length)
|
||
response.close()
|
||
else:
|
||
body = response.content
|
||
response.close()
|
||
if mime and "text/" in mime:
|
||
body = body.decode("UTF-8","replace")
|
||
cache = write_body(url,body,mime)
|
||
return cache
|
||
|
||
def _fetch_gopher(url,timeout=10):
|
||
parsed =urllib.parse.urlparse(url)
|
||
host = parsed.hostname
|
||
port = parsed.port or 70
|
||
if len(parsed.path) >= 2:
|
||
itemtype = parsed.path[1]
|
||
selector = parsed.path[2:]
|
||
else:
|
||
itemtype = "1"
|
||
selector = ""
|
||
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
|
||
s = socket.create_connection((host,port))
|
||
for address in addresses:
|
||
self._debug("Connecting to: " + str(address[4]))
|
||
s = socket.socket(address[0], address[1])
|
||
s.settimeout(timeout)
|
||
try:
|
||
s.connect(address[4])
|
||
break
|
||
except OSError as e:
|
||
err = e
|
||
if parsed.query:
|
||
request = selector + "\t" + parsed.query
|
||
else:
|
||
request = selector
|
||
request += "\r\n"
|
||
s.sendall(request.encode("UTF-8"))
|
||
response = s.makefile("rb").read()
|
||
# Transcode response into UTF-8
|
||
#if itemtype in ("0","1","h"):
|
||
if not itemtype in ("9","g","I","s"):
|
||
# Try most common encodings
|
||
for encoding in ("UTF-8", "ISO-8859-1"):
|
||
try:
|
||
response = response.decode("UTF-8")
|
||
break
|
||
except UnicodeDecodeError:
|
||
pass
|
||
else:
|
||
# try to find encoding
|
||
if _HAS_CHARDET:
|
||
detected = chardet.detect(response)
|
||
response = response.decode(detected["encoding"])
|
||
else:
|
||
raise UnicodeDecodeError
|
||
if itemtype == "0":
|
||
mime = "text/gemini"
|
||
elif itemtype == "1":
|
||
mime = "text/gopher"
|
||
elif itemtype == "h":
|
||
mime = "text/html"
|
||
elif itemtype in ("9","g","I","s"):
|
||
mime = None
|
||
else:
|
||
# by default, we should consider Gopher
|
||
mime = "text/gopher"
|
||
cache = write_body(response,mime)
|
||
return cache
|
||
|
||
def _fetch_finger(url,timeout=10):
|
||
parsed = urllib.parse.urlparse(url)
|
||
host = parsed.hostname
|
||
port = parsed.port or standard_ports["finger"]
|
||
query = parsed.path.lstrip("/") + "\r\n"
|
||
with socket.create_connection((host,port)) as sock:
|
||
sock.settimeout(timeout)
|
||
sock.send(query.encode())
|
||
response = sock.makefile("rb").read().decode("UTF-8")
|
||
cache = write_body(response,"text/plain")
|
||
return cache
|
||
|
||
# Originally copied from reference spartan client by Michael Lazar
|
||
def _fetch_spartan(url):
|
||
cache = None
|
||
url_parts = urllib.parse.urlparse(url)
|
||
host = url_parts.hostname
|
||
port = url_parts.port or standard_ports["spartan"]
|
||
path = url_parts.path or "/"
|
||
query = url_parts.query
|
||
redirect_url = None
|
||
with socket.create_connection((host,port)) as sock:
|
||
if query:
|
||
data = urllib.parse.unquote_to_bytes(query)
|
||
else:
|
||
data = b""
|
||
encoded_host = host.encode("idna")
|
||
ascii_path = urllib.parse.unquote_to_bytes(path)
|
||
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
|
||
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
|
||
fp = sock.makefile("rb")
|
||
response = fp.readline(4096).decode("ascii").strip("\r\n")
|
||
parts = response.split(" ",maxsplit=1)
|
||
code,meta = int(parts[0]),parts[1]
|
||
if code == 2:
|
||
body = fp.read()
|
||
if meta.startswith("text"):
|
||
body = body.decode("UTF-8")
|
||
cache = write_body(body,meta)
|
||
elif code == 3:
|
||
redirect_url = url_parts._replace(path=meta).geturl()
|
||
else:
|
||
#TODO:set error!
|
||
#gi.set_error("Spartan code %s: Error %s"%(code,meta))
|
||
print("TODO set_error")
|
||
if redirect_url:
|
||
cache = _fetch_spartan(redirect_url)
|
||
return cache
|
||
|
||
def _fetch_gemini(url):
|
||
cache = None
|
||
url_parts = urllib.parse.urlparse(url)
|
||
host = url_parts.hostname
|
||
port = url_parts.port or standard_ports["gemini"]
|
||
path = url_parts.path or "/"
|
||
query = url_parts.query
|
||
# Be careful with client certificates!
|
||
# Are we crossing a domain boundary?
|
||
if self.active_cert_domains and host not in self.active_cert_domains:
|
||
if self.active_is_transient:
|
||
print("Permanently delete currently active transient certificate?")
|
||
resp = input("Y/N? ")
|
||
if resp.strip().lower() in ("y", "yes"):
|
||
print("Destroying certificate.")
|
||
self._deactivate_client_cert()
|
||
else:
|
||
print("Staying here.")
|
||
raise UserAbortException()
|
||
else:
|
||
print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?")
|
||
resp = input("Y/N? ")
|
||
if resp.strip().lower() in ("n", "no"):
|
||
print("Keeping certificate active for {}".format(host))
|
||
else:
|
||
print("Deactivating certificate.")
|
||
self._deactivate_client_cert()
|
||
|
||
# Suggest reactivating previous certs
|
||
if not self.client_certs["active"] and host in self.client_certs:
|
||
print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(host))
|
||
resp = input("Y/N? ")
|
||
if resp.strip().lower() in ("y", "yes"):
|
||
self._activate_client_cert(*self.client_certs[host])
|
||
else:
|
||
print("Remaining unidentified.")
|
||
self.client_certs.pop(host)
|
||
|
||
# In AV-98, this was the _send_request method
|
||
#Send a selector to a given host and port.
|
||
#Returns the resolved address and binary file with the reply."""
|
||
host = host.encode("idna").decode()
|
||
# Do DNS resolution
|
||
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
|
||
if ":" in host:
|
||
# This is likely a literal IPv6 address, so we can *only* ask for
|
||
# IPv6 addresses or getaddrinfo will complain
|
||
family_mask = socket.AF_INET6
|
||
elif socket.has_ipv6:
|
||
# Accept either IPv4 or IPv6 addresses
|
||
family_mask = 0
|
||
else:
|
||
# IPv4 only
|
||
family_mask = socket.AF_INET
|
||
addresses = socket.getaddrinfo(host, port, family=family_mask,
|
||
type=socket.SOCK_STREAM)
|
||
# Sort addresses so IPv6 ones come first
|
||
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
|
||
## Continuation of send_request
|
||
# Prepare TLS context
|
||
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
|
||
context = ssl.SSLContext(protocol)
|
||
|
||
# Use CAs or TOFU
|
||
#TODO : should we care about this options?
|
||
#if self.options["tls_mode"] == "ca":
|
||
# context.verify_mode = ssl.CERT_REQUIRED
|
||
# context.check_hostname = True
|
||
# context.load_default_certs()
|
||
#else:
|
||
# context.check_hostname = False
|
||
# context.verify_mode = ssl.CERT_NONE
|
||
context.check_hostname=False
|
||
context.verify_mode = ssl.CERT_NONE
|
||
# Impose minimum TLS version
|
||
## In 3.7 and above, this is easy...
|
||
if sys.version_info.minor >= 7:
|
||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||
## Otherwise, it seems very hard...
|
||
## The below is less strict than it ought to be, but trying to disable
|
||
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
|
||
## with recent versions of OpenSSL. What a mess...
|
||
else:
|
||
context.options |= ssl.OP_NO_SSLv3
|
||
context.options |= ssl.OP_NO_SSLv2
|
||
# Try to enforce sensible ciphers
|
||
try:
|
||
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
|
||
except ssl.SSLError:
|
||
# Rely on the server to only support sensible things, I guess...
|
||
pass
|
||
|
||
#TODO: I’m here in the refactor
|
||
# Load client certificate if needed
|
||
if self.client_certs["active"]:
|
||
certfile, keyfile = self.client_certs["active"]
|
||
context.load_cert_chain(certfile, keyfile)
|
||
|
||
# Connect to remote host by any address possible
|
||
err = None
|
||
for address in addresses:
|
||
self._debug("Connecting to: " + str(address[4]))
|
||
s = socket.socket(address[0], address[1])
|
||
if self.sync_only:
|
||
timeout = self.options["short_timeout"]
|
||
else:
|
||
timeout = self.options["timeout"]
|
||
s.settimeout(timeout)
|
||
s = context.wrap_socket(s, server_hostname = host)
|
||
try:
|
||
s.connect(address[4])
|
||
break
|
||
except OSError as e:
|
||
err = e
|
||
else:
|
||
# If we couldn't connect to *any* of the addresses, just
|
||
# bubble up the exception from the last attempt and deny
|
||
# knowledge of earlier failures.
|
||
raise err
|
||
if sys.version_info.minor >=5:
|
||
self._debug("Established {} connection.".format(s.version()))
|
||
self._debug("Cipher is: {}.".format(s.cipher()))
|
||
# Do TOFU
|
||
if self.options["tls_mode"] != "ca":
|
||
cert = s.getpeercert(binary_form=True)
|
||
self._validate_cert(address[4][0], host, cert)
|
||
# Remember that we showed the current cert to this domain...
|
||
if self.client_certs["active"]:
|
||
self.active_cert_domains.append(host)
|
||
self.client_certs[host] = self.client_certs["active"]
|
||
# Send request and wrap response in a file descriptor
|
||
url = urllib.parse.urlparse(gi.url)
|
||
new_netloc = host
|
||
if port != 1965:
|
||
new_netloc += ":" + str(port)
|
||
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
|
||
self._debug("Sending %s<CRLF>" % url)
|
||
s.sendall((url + CRLF).encode("UTF-8"))
|
||
mf= s.makefile(mode = "rb")
|
||
return address, mf
|
||
##
|
||
## end of send_request
|
||
TODO :address, f = self._send_request(gi)
|
||
# Spec dictates <META> should not exceed 1024 bytes,
|
||
# so maximum valid header length is 1027 bytes.
|
||
header = f.readline(1027)
|
||
header = urllib.parse.unquote(header.decode("UTF-8"))
|
||
if not header or header[-1] != '\n':
|
||
raise RuntimeError("Received invalid header from server!")
|
||
header = header.strip()
|
||
self._debug("Response header: %s." % header)
|
||
# Validate header
|
||
status, meta = header.split(maxsplit=1)
|
||
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
|
||
f.close()
|
||
raise RuntimeError("Received invalid header from server!")
|
||
# Update redirect loop/maze escaping state
|
||
if not status.startswith("3"):
|
||
self.previous_redirectors = set()
|
||
# Handle non-SUCCESS headers, which don't have a response body
|
||
# Inputs
|
||
if status.startswith("1"):
|
||
if self.sync_only:
|
||
return None
|
||
else:
|
||
print(meta)
|
||
if status == "11":
|
||
user_input = getpass.getpass("> ")
|
||
else:
|
||
user_input = input("> ")
|
||
return self._fetch_over_network(query(user_input))
|
||
# Redirects
|
||
elif status.startswith("3"):
|
||
new_gi = GeminiItem(gi.absolutise_url(meta))
|
||
if new_gi.url == gi.url:
|
||
raise RuntimeError("URL redirects to itself!")
|
||
elif new_gi.url in self.previous_redirectors:
|
||
raise RuntimeError("Caught in redirect loop!")
|
||
elif len(self.previous_redirectors) == _MAX_REDIRECTS:
|
||
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
|
||
elif self.sync_only:
|
||
follow = self.automatic_choice
|
||
# Never follow cross-domain redirects without asking
|
||
elif new_gi.host.encode("idna") != gi.host.encode("idna"):
|
||
follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
|
||
# Never follow cross-protocol redirects without asking
|
||
elif new_gi.scheme != gi.scheme:
|
||
follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
|
||
# Don't follow *any* redirect without asking if auto-follow is off
|
||
elif not self.options["auto_follow_redirects"]:
|
||
follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
|
||
# Otherwise, follow away
|
||
else:
|
||
follow = "yes"
|
||
if follow.strip().lower() not in ("y", "yes"):
|
||
raise UserAbortException()
|
||
self._debug("Following redirect to %s." % new_gi.url)
|
||
self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors))
|
||
self.previous_redirectors.add(gi.url)
|
||
if status == "31":
|
||
# Permanent redirect
|
||
self.permanent_redirects[gi.url] = new_gi.url
|
||
return self._fetch_over_network(new_gi)
|
||
# Errors
|
||
elif status.startswith("4") or status.startswith("5"):
|
||
raise RuntimeError(meta)
|
||
# Client cert
|
||
elif status.startswith("6"):
|
||
self._handle_cert_request(meta)
|
||
return self._fetch_over_network(gi)
|
||
# Invalid status
|
||
elif not status.startswith("2"):
|
||
raise RuntimeError("Server returned undefined status code %s!" % status)
|
||
# If we're here, this must be a success and there's a response body
|
||
assert status.startswith("2")
|
||
mime = meta
|
||
# Read the response body over the network
|
||
fbody = f.read()
|
||
# DEFAULT GEMINI MIME
|
||
if mime == "":
|
||
mime = "text/gemini; charset=utf-8"
|
||
shortmime, mime_options = parse_mime(mime)
|
||
if "charset" in mime_options:
|
||
try:
|
||
codecs.lookup(mime_options["charset"])
|
||
except LookupError:
|
||
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
|
||
#If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header
|
||
mime_options["charset"] = "UTF-8"
|
||
if shortmime.startswith("text/"):
|
||
#Get the charset and default to UTF-8 in none
|
||
encoding = mime_options.get("charset", "UTF-8")
|
||
try:
|
||
body = fbody.decode(encoding)
|
||
except UnicodeError:
|
||
raise RuntimeError("Could not decode response body using %s\
|
||
encoding declared in header!" % encoding)
|
||
else:
|
||
body = fbody
|
||
gi.write_body(body,mime)
|
||
return gi
|
||
|
||
|
||
def fetch(url):
|
||
url = normalize_url(url)
|
||
path=None
|
||
if "://" in url
|
||
scheme = url.split("://")[0]
|
||
if scheme not in standard_ports:
|
||
print("%s is not a supported protocol"%scheme)
|
||
elif scheme in ("http","https"):
|
||
path=_fetch_http(url)
|
||
elif scheme == "gopher":
|
||
path=_fetch_gopher(url)
|
||
elif scheme == "finger":
|
||
path=_fetch_finger(url)
|
||
else:
|
||
print("scheme %s not implemented yet")
|
||
else:
|
||
print("Not a supproted URL")
|
||
return path
|
||
|
||
|
||
def main():
|
||
|
||
# Parse arguments
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
|
||
# No argument: write help
|
||
parser.add_argument('url', metavar='URL', nargs='*',
|
||
help='download URL and returns the path to the cache of it')
|
||
# arg = URL: download and returns cached URI
|
||
# --cache-validity : do not download if cache is valid
|
||
# --offline : do not attempt to download, return Null if no cached version
|
||
# --validity : returns the date of the cached version, Null if no version
|
||
# --force-download : download and replace cache, even if valid
|
||
# --max-size-download : cancel download of items above that size. Returns Null.
|
||
args = parser.parse_args()
|
||
|
||
for u in args.url:
|
||
print("Download URL: %s" %u)
|
||
fetch(u)
|
||
|
||
|
||
|
||
if __name__== '__main__':
|
||
main()
|