forked from solderpunk/AV-98
Compare commits
5 Commits
Author | SHA1 | Date |
---|---|---|
Lionel Dricot | 8b97acc5e0 | |
Lionel Dricot | 963ac3d7a3 | |
Austreelis | 6e215c0512 | |
Austreelis | 370e7e4dc5 | |
Austreelis | bc43e3150b |
1349
ansicat.py
1349
ansicat.py
File diff suppressed because it is too large
Load Diff
890
netcache.py
890
netcache.py
|
@ -1,891 +1,3 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import urllib.parse
|
||||
import argparse
|
||||
import requests
|
||||
import codecs
|
||||
import getpass
|
||||
import socket
|
||||
import ssl
|
||||
import glob
|
||||
import datetime
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from ssl import CertificateError
|
||||
import ansicat
|
||||
import offutils
|
||||
from offutils import _CACHE_PATH,_DATA_DIR,_CONFIG_DIR
|
||||
import time
|
||||
try:
|
||||
import chardet
|
||||
_HAS_CHARDET = True
|
||||
except ModuleNotFoundError:
|
||||
_HAS_CHARDET = False
|
||||
|
||||
try:
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
_HAS_CRYPTOGRAPHY = True
|
||||
_BACKEND = default_backend()
|
||||
except(ModuleNotFoundError,ImportError):
|
||||
_HAS_CRYPTOGRAPHY = False
|
||||
|
||||
if not os.path.exists(_CACHE_PATH):
|
||||
print("Creating cache directory {}".format(_CACHE_PATH))
|
||||
os.makedirs(_CACHE_PATH)
|
||||
|
||||
# This list is also used as a list of supported protocols
|
||||
standard_ports = {
|
||||
"gemini" : 1965,
|
||||
"gopher" : 70,
|
||||
"finger" : 79,
|
||||
"http" : 80,
|
||||
"https" : 443,
|
||||
"spartan": 300,
|
||||
}
|
||||
default_protocol = "gemini"
|
||||
|
||||
CRLF = '\r\n'
|
||||
DEFAULT_TIMEOUT = 10
|
||||
_MAX_REDIRECTS = 5
|
||||
|
||||
# monkey-patch Gemini support in urllib.parse
|
||||
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
|
||||
urllib.parse.uses_relative.append("gemini")
|
||||
urllib.parse.uses_netloc.append("gemini")
|
||||
urllib.parse.uses_relative.append("spartan")
|
||||
urllib.parse.uses_netloc.append("spartan")
|
||||
|
||||
|
||||
class UserAbortException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def parse_mime(mime):
|
||||
options = {}
|
||||
if mime:
|
||||
if ";" in mime:
|
||||
splited = mime.split(";",maxsplit=1)
|
||||
mime = splited[0]
|
||||
if len(splited) >= 1:
|
||||
options_list = splited[1].split()
|
||||
for o in options_list:
|
||||
spl = o.split("=",maxsplit=1)
|
||||
if len(spl) > 0:
|
||||
options[spl[0]] = spl[1]
|
||||
return mime, options
|
||||
|
||||
def normalize_url(url):
|
||||
if "://" not in url and ("./" not in url and url[0] != "/"):
|
||||
if not url.startswith("mailto:"):
|
||||
url = "gemini://" + url
|
||||
return url
|
||||
|
||||
|
||||
def cache_last_modified(url):
|
||||
if not url:
|
||||
return None
|
||||
path = get_cache_path(url)
|
||||
if path:
|
||||
return os.path.getmtime(path)
|
||||
else:
|
||||
print("ERROR : NO CACHE in cache_last_modified")
|
||||
return None
|
||||
|
||||
def is_cache_valid(url,validity=0):
|
||||
# Validity is the acceptable time for
|
||||
# a cache to be valid (in seconds)
|
||||
# If 0, then any cache is considered as valid
|
||||
# (use validity = 1 if you want to refresh everything)
|
||||
if offutils.is_local(url):
|
||||
return True
|
||||
cache = get_cache_path(url)
|
||||
if cache :
|
||||
# If path is too long, we always return True to avoid
|
||||
# fetching it.
|
||||
if len(cache) > 259:
|
||||
print("We return False because path is too long")
|
||||
return False
|
||||
if os.path.exists(cache) and not os.path.isdir(cache):
|
||||
if validity > 0 :
|
||||
last_modification = cache_last_modified(url)
|
||||
now = time.time()
|
||||
age = now - last_modification
|
||||
return age < validity
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
#Cache has not been build
|
||||
return False
|
||||
else:
|
||||
#There’s not even a cache!
|
||||
return False
|
||||
|
||||
def get_cache_path(url):
|
||||
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
|
||||
# In that case, we need to reconstruct it
|
||||
#First, we parse the URL
|
||||
if not url:
|
||||
return None
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
|
||||
scheme = "file"
|
||||
elif parsed.scheme:
|
||||
scheme = parsed.scheme
|
||||
else:
|
||||
scheme = default_protocol
|
||||
if scheme in ["file","mailto","list"]:
|
||||
local = True
|
||||
host = ""
|
||||
port = None
|
||||
# file:// is 7 char
|
||||
if url.startswith("file://"):
|
||||
path = url[7:]
|
||||
elif scheme == "mailto":
|
||||
path = parsed.path
|
||||
elif url.startswith("list://"):
|
||||
listdir = os.path.join(_DATA_DIR,"lists")
|
||||
listname = url[7:].lstrip("/")
|
||||
if listname in [""]:
|
||||
name = "My Lists"
|
||||
path = listdir
|
||||
else:
|
||||
name = listname
|
||||
path = os.path.join(listdir, "%s.gmi"%listname)
|
||||
else:
|
||||
path = url
|
||||
else:
|
||||
local = False
|
||||
# Convert unicode hostname to punycode using idna RFC3490
|
||||
host = parsed.hostname #.encode("idna").decode()
|
||||
port = parsed.port or standard_ports.get(scheme, 0)
|
||||
# special gopher selector case
|
||||
if scheme == "gopher":
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
path = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
path = ""
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s"):
|
||||
mime = "binary"
|
||||
else:
|
||||
mime = "text/gopher"
|
||||
else:
|
||||
path = parsed.path
|
||||
if parsed.query:
|
||||
# we don’t add the query if path is too long because path above 260 char
|
||||
# are not supported and crash python.
|
||||
# Also, very long query are usually useless stuff
|
||||
if len(path+parsed.query) < 258:
|
||||
path += "/" + parsed.query
|
||||
|
||||
# Now, we have a partial path. Let’s make it full path.
|
||||
if local:
|
||||
cache_path = path
|
||||
elif scheme and host:
|
||||
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
|
||||
#There’s an OS limitation of 260 characters per path.
|
||||
#We will thus cut the path enough to add the index afterward
|
||||
cache_path = cache_path[:249]
|
||||
# FIXME : this is a gross hack to give a name to
|
||||
# index files. This will break if the index is not
|
||||
# index.gmi. I don’t know how to know the real name
|
||||
# of the file. But first, we need to ensure that the domain name
|
||||
# finish by "/". Else, the cache will create a file, not a folder.
|
||||
if scheme.startswith("http"):
|
||||
index = "index.html"
|
||||
elif scheme == "finger":
|
||||
index = "index.txt"
|
||||
elif scheme == "gopher":
|
||||
index = "gophermap"
|
||||
else:
|
||||
index = "index.gmi"
|
||||
if path == "" or os.path.isdir(cache_path):
|
||||
if not cache_path.endswith("/"):
|
||||
cache_path += "/"
|
||||
if not url.endswith("/"):
|
||||
url += "/"
|
||||
if cache_path.endswith("/"):
|
||||
cache_path += index
|
||||
#sometimes, the index itself is a dir
|
||||
#like when folder/index.gmi?param has been created
|
||||
#and we try to access folder
|
||||
if os.path.isdir(cache_path):
|
||||
cache_path += "/" + index
|
||||
else:
|
||||
#URL is missing either a supported scheme or a valid host
|
||||
#print("Error: %s is not a supported url"%url)
|
||||
return None
|
||||
if len(cache_path) > 259:
|
||||
print("Path is too long. This is an OS limitation.\n\n")
|
||||
print(url)
|
||||
return None
|
||||
return cache_path
|
||||
|
||||
def write_body(url,body,mime=None):
|
||||
## body is a copy of the raw gemtext
|
||||
## Write_body() also create the cache !
|
||||
# DEFAULT GEMINI MIME
|
||||
mime, options = parse_mime(mime)
|
||||
cache_path = get_cache_path(url)
|
||||
if cache_path:
|
||||
if mime and mime.startswith("text/"):
|
||||
mode = "w"
|
||||
else:
|
||||
mode = "wb"
|
||||
cache_dir = os.path.dirname(cache_path)
|
||||
# If the subdirectory already exists as a file (not a folder)
|
||||
# We remove it (happens when accessing URL/subfolder before
|
||||
# URL/subfolder/file.gmi.
|
||||
# This causes loss of data in the cache
|
||||
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
|
||||
# If the subdirectory doesn’t exist, we recursively try to find one
|
||||
# until it exists to avoid a file blocking the creation of folders
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
with open(cache_path, mode=mode) as f:
|
||||
f.write(body)
|
||||
f.close()
|
||||
return cache_path
|
||||
|
||||
|
||||
def set_error(url,err):
|
||||
# If we get an error, we want to keep an existing cache
|
||||
# but we need to touch it or to create an empty one
|
||||
# to avoid hitting the error at each refresh
|
||||
cache = get_cache_path(url)
|
||||
if is_cache_valid(url):
|
||||
os.utime(cache)
|
||||
else:
|
||||
cache_dir = os.path.dirname(cache)
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
if os.path.isdir(cache_dir):
|
||||
with open(cache, "w") as c:
|
||||
c.write(str(datetime.datetime.now())+"\n")
|
||||
c.write("ERROR while caching %s\n\n" %url)
|
||||
c.write("*****\n\n")
|
||||
c.write(str(type(err)) + " = " + str(err))
|
||||
#cache.write("\n" + str(err.with_traceback(None)))
|
||||
c.write("\n*****\n\n")
|
||||
c.write("If you believe this error was temporary, type ""reload"".\n")
|
||||
c.write("The ressource will be tentatively fetched during next sync.\n")
|
||||
c.close()
|
||||
return cache
|
||||
|
||||
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs):
|
||||
def too_large_error(url,length,max_size):
|
||||
err = "Size of %s is %s Mo\n"%(url,length)
|
||||
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
|
||||
err += "To retrieve this content anyway, type 'reload'."
|
||||
return set_error(url,err)
|
||||
if accept_bad_ssl_certificates:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
|
||||
requests.packages.urllib3.disable_warnings()
|
||||
verify=False
|
||||
else:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2'
|
||||
verify=True
|
||||
header = {}
|
||||
header["User-Agent"] = "Netcache"
|
||||
with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
|
||||
if "content-type" in response.headers:
|
||||
mime = response.headers['content-type']
|
||||
else:
|
||||
mime = None
|
||||
if "content-length" in response.headers:
|
||||
length = int(response.headers['content-length'])
|
||||
else:
|
||||
length = 0
|
||||
if max_size and length > max_size:
|
||||
response.close()
|
||||
return too_large_error(url,str(length/100),max_size)
|
||||
elif max_size and length == 0:
|
||||
body = b''
|
||||
downloaded = 0
|
||||
for r in response.iter_content():
|
||||
body += r
|
||||
#We divide max_size for streamed content
|
||||
#in order to catch them faster
|
||||
size = sys.getsizeof(body)
|
||||
max = max_size/2
|
||||
current = round(size*100/max,1)
|
||||
if current > downloaded:
|
||||
downloaded = current
|
||||
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
|
||||
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
|
||||
if size > max_size/2:
|
||||
response.close()
|
||||
return too_large_error(url,"streaming",max_size)
|
||||
response.close()
|
||||
else:
|
||||
body = response.content
|
||||
response.close()
|
||||
if mime and "text/" in mime:
|
||||
body = body.decode("UTF-8","replace")
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed =urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or 70
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
selector = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
selector = ""
|
||||
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
|
||||
s = socket.create_connection((host,port))
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
if parsed.query:
|
||||
request = selector + "\t" + parsed.query
|
||||
else:
|
||||
request = selector
|
||||
request += "\r\n"
|
||||
s.sendall(request.encode("UTF-8"))
|
||||
response = s.makefile("rb").read()
|
||||
# Transcode response into UTF-8
|
||||
#if itemtype in ("0","1","h"):
|
||||
if not itemtype in ("9","g","I","s"):
|
||||
# Try most common encodings
|
||||
for encoding in ("UTF-8", "ISO-8859-1"):
|
||||
try:
|
||||
response = response.decode("UTF-8")
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
else:
|
||||
# try to find encoding
|
||||
if _HAS_CHARDET:
|
||||
detected = chardet.detect(response)
|
||||
response = response.decode(detected["encoding"])
|
||||
else:
|
||||
raise UnicodeDecodeError
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s"):
|
||||
mime = None
|
||||
else:
|
||||
# by default, we should consider Gopher
|
||||
mime = "text/gopher"
|
||||
cache = write_body(url,response,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or standard_ports["finger"]
|
||||
query = parsed.path.lstrip("/") + "\r\n"
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
sock.settimeout(timeout)
|
||||
sock.send(query.encode())
|
||||
response = sock.makefile("rb").read().decode("UTF-8")
|
||||
cache = write_body(response,"text/plain")
|
||||
return cache
|
||||
|
||||
# Originally copied from reference spartan client by Michael Lazar
|
||||
def _fetch_spartan(url,**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["spartan"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
redirect_url = None
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
if query:
|
||||
data = urllib.parse.unquote_to_bytes(query)
|
||||
else:
|
||||
data = b""
|
||||
encoded_host = host.encode("idna")
|
||||
ascii_path = urllib.parse.unquote_to_bytes(path)
|
||||
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
|
||||
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
|
||||
fp = sock.makefile("rb")
|
||||
response = fp.readline(4096).decode("ascii").strip("\r\n")
|
||||
parts = response.split(" ",maxsplit=1)
|
||||
code,meta = int(parts[0]),parts[1]
|
||||
if code == 2:
|
||||
body = fp.read()
|
||||
if meta.startswith("text"):
|
||||
body = body.decode("UTF-8")
|
||||
cache = write_body(url,body,meta)
|
||||
elif code == 3:
|
||||
redirect_url = url_parts._replace(path=meta).geturl()
|
||||
else:
|
||||
return set_error(url,"Spartan code %s: Error %s"%(code,meta))
|
||||
if redirect_url:
|
||||
cache = _fetch_spartan(redirect_url)
|
||||
return cache
|
||||
|
||||
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
|
||||
"""
|
||||
Validate a TLS certificate in TOFU mode.
|
||||
|
||||
If the cryptography module is installed:
|
||||
- Check the certificate Common Name or SAN matches `host`
|
||||
- Check the certificate's not valid before date is in the past
|
||||
- Check the certificate's not valid after date is in the future
|
||||
|
||||
Whether the cryptography module is installed or not, check the
|
||||
certificate's fingerprint against the TOFU database to see if we've
|
||||
previously encountered a different certificate for this IP address and
|
||||
hostname.
|
||||
"""
|
||||
now = datetime.datetime.utcnow()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Using the cryptography module we can get detailed access
|
||||
# to the properties of even self-signed certs, unlike in
|
||||
# the standard ssl library...
|
||||
c = x509.load_der_x509_certificate(cert, _BACKEND)
|
||||
# Check certificate validity dates
|
||||
if accept_bad_ssl:
|
||||
if c.not_valid_before >= now:
|
||||
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
|
||||
elif c.not_valid_after <= now:
|
||||
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
|
||||
|
||||
# Check certificate hostnames
|
||||
names = []
|
||||
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
|
||||
if common_name:
|
||||
names.append(common_name[0].value)
|
||||
try:
|
||||
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
|
||||
except x509.ExtensionNotFound:
|
||||
pass
|
||||
names = set(names)
|
||||
for name in names:
|
||||
try:
|
||||
ssl._dnsname_match(str(name), host)
|
||||
break
|
||||
except CertificateError:
|
||||
continue
|
||||
else:
|
||||
# If we didn't break out, none of the names were valid
|
||||
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
|
||||
|
||||
sha = hashlib.sha256()
|
||||
sha.update(cert)
|
||||
fingerprint = sha.hexdigest()
|
||||
|
||||
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
|
||||
db_conn = sqlite3.connect(db_path)
|
||||
db_cur = db_conn.cursor()
|
||||
|
||||
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
|
||||
(hostname text, address text, fingerprint text,
|
||||
first_seen date, last_seen date, count integer)""")
|
||||
# Have we been here before?
|
||||
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
|
||||
FROM cert_cache
|
||||
WHERE hostname=? AND address=?""", (host, address))
|
||||
cached_certs = db_cur.fetchall()
|
||||
|
||||
# If so, check for a match
|
||||
if cached_certs:
|
||||
max_count = 0
|
||||
most_frequent_cert = None
|
||||
for cached_fingerprint, first, last, count in cached_certs:
|
||||
if count > max_count:
|
||||
max_count = count
|
||||
most_frequent_cert = cached_fingerprint
|
||||
if fingerprint == cached_fingerprint:
|
||||
# Matched!
|
||||
db_cur.execute("""UPDATE cert_cache
|
||||
SET last_seen=?, count=?
|
||||
WHERE hostname=? AND address=? AND fingerprint=?""",
|
||||
(now, count+1, host, address, fingerprint))
|
||||
db_conn.commit()
|
||||
break
|
||||
else:
|
||||
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
|
||||
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
|
||||
previous_cert = fp.read()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Load the most frequently seen certificate to see if it has
|
||||
# expired
|
||||
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
|
||||
previous_ttl = previous_cert.not_valid_after - now
|
||||
print(previous_ttl)
|
||||
|
||||
print("****************************************")
|
||||
print("[SECURITY WARNING] Unrecognised certificate!")
|
||||
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
|
||||
print("This MIGHT be a Man-in-the-Middle attack.")
|
||||
print("A different certificate has previously been seen {} times.".format(max_count))
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
if previous_ttl < datetime.timedelta():
|
||||
print("That certificate has expired, which reduces suspicion somewhat.")
|
||||
else:
|
||||
print("That certificate is still valid for: {}".format(previous_ttl))
|
||||
print("****************************************")
|
||||
print("Attempt to verify the new certificate fingerprint out-of-band:")
|
||||
print(fingerprint)
|
||||
if automatic_choice:
|
||||
choice = automatic_choice
|
||||
else:
|
||||
choice = input("Accept this new certificate? Y/N ").strip().lower()
|
||||
if choice in ("y", "yes"):
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
else:
|
||||
raise Exception("TOFU Failure!")
|
||||
|
||||
# If not, cache this cert
|
||||
else:
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
|
||||
if not os.path.exists(certdir):
|
||||
os.makedirs(certdir)
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
|
||||
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
|
||||
**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["gemini"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
# In AV-98, this was the _send_request method
|
||||
#Send a selector to a given host and port.
|
||||
#Returns the resolved address and binary file with the reply."""
|
||||
host = host.encode("idna").decode()
|
||||
# Do DNS resolution
|
||||
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
|
||||
if ":" in host:
|
||||
# This is likely a literal IPv6 address, so we can *only* ask for
|
||||
# IPv6 addresses or getaddrinfo will complain
|
||||
family_mask = socket.AF_INET6
|
||||
elif socket.has_ipv6:
|
||||
# Accept either IPv4 or IPv6 addresses
|
||||
family_mask = 0
|
||||
else:
|
||||
# IPv4 only
|
||||
family_mask = socket.AF_INET
|
||||
addresses = socket.getaddrinfo(host, port, family=family_mask,
|
||||
type=socket.SOCK_STREAM)
|
||||
# Sort addresses so IPv6 ones come first
|
||||
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
|
||||
## Continuation of send_request
|
||||
# Prepare TLS context
|
||||
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
|
||||
context = ssl.SSLContext(protocol)
|
||||
context.check_hostname=False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
# Impose minimum TLS version
|
||||
## In 3.7 and above, this is easy...
|
||||
if sys.version_info.minor >= 7:
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
## Otherwise, it seems very hard...
|
||||
## The below is less strict than it ought to be, but trying to disable
|
||||
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
|
||||
## with recent versions of OpenSSL. What a mess...
|
||||
else:
|
||||
context.options |= ssl.OP_NO_SSLv3
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
# Try to enforce sensible ciphers
|
||||
try:
|
||||
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
|
||||
except ssl.SSLError:
|
||||
# Rely on the server to only support sensible things, I guess...
|
||||
pass
|
||||
# Connect to remote host by any address possible
|
||||
err = None
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
s = context.wrap_socket(s, server_hostname = host)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
else:
|
||||
# If we couldn't connect to *any* of the addresses, just
|
||||
# bubble up the exception from the last attempt and deny
|
||||
# knowledge of earlier failures.
|
||||
raise err
|
||||
|
||||
# Do TOFU
|
||||
cert = s.getpeercert(binary_form=True)
|
||||
# Remember that we showed the current cert to this domain...
|
||||
#TODO : accept badssl and automatic choice
|
||||
_validate_cert(address[4][0], host, cert,automatic_choice="y")
|
||||
# Send request and wrap response in a file descriptor
|
||||
url = urllib.parse.urlparse(url)
|
||||
new_netloc = host
|
||||
if port != standard_ports["gemini"]:
|
||||
new_netloc += ":" + str(port)
|
||||
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
|
||||
s.sendall((url + CRLF).encode("UTF-8"))
|
||||
f= s.makefile(mode = "rb")
|
||||
## end of send_request in AV98
|
||||
# Spec dictates <META> should not exceed 1024 bytes,
|
||||
# so maximum valid header length is 1027 bytes.
|
||||
header = f.readline(1027)
|
||||
header = urllib.parse.unquote(header.decode("UTF-8"))
|
||||
if not header or header[-1] != '\n':
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
header = header.strip()
|
||||
# Validate header
|
||||
status, meta = header.split(maxsplit=1)
|
||||
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
|
||||
f.close()
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
# Update redirect loop/maze escaping state
|
||||
if not status.startswith("3"):
|
||||
previous_redirectors = set()
|
||||
#TODO FIXME
|
||||
else:
|
||||
#we set a previous_redirectors anyway because refactoring in progress
|
||||
previous_redirectors = set()
|
||||
# Handle non-SUCCESS headers, which don't have a response body
|
||||
# Inputs
|
||||
if status.startswith("1"):
|
||||
if interactive:
|
||||
print(meta)
|
||||
if status == "11":
|
||||
user_input = getpass.getpass("> ")
|
||||
else:
|
||||
#TODO:FIXME we should not ask for user input while non-interactive
|
||||
user_input = input("> ")
|
||||
return _fetch_gemini(query(user_input))
|
||||
else:
|
||||
return None
|
||||
# Redirects
|
||||
elif status.startswith("3"):
|
||||
newurl = urllib.parse.urljoin(url,meta)
|
||||
if newurl == url:
|
||||
raise RuntimeError("URL redirects to itself!")
|
||||
elif newurl in previous_redirectors:
|
||||
raise RuntimeError("Caught in redirect loop!")
|
||||
elif len(previous_redirectors) == _MAX_REDIRECTS:
|
||||
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
|
||||
# TODO: redirections handling should be refactored
|
||||
# elif "interactive" in options and not options["interactive"]:
|
||||
# follow = self.automatic_choice
|
||||
# # Never follow cross-domain redirects without asking
|
||||
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
|
||||
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Never follow cross-protocol redirects without asking
|
||||
# elif new_gi.scheme != gi.scheme:
|
||||
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Don't follow *any* redirect without asking if auto-follow is off
|
||||
# elif not self.options["auto_follow_redirects"]:
|
||||
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Otherwise, follow away
|
||||
else:
|
||||
follow = "yes"
|
||||
if follow.strip().lower() not in ("y", "yes"):
|
||||
raise UserAbortException()
|
||||
previous_redirectors.add(url)
|
||||
# if status == "31":
|
||||
# # Permanent redirect
|
||||
# self.permanent_redirects[gi.url] = new_gi.url
|
||||
return _fetch_gemini(newurl)
|
||||
# Errors
|
||||
elif status.startswith("4") or status.startswith("5"):
|
||||
raise RuntimeError(meta)
|
||||
# Client cert
|
||||
elif status.startswith("6"):
|
||||
print("Handling certificates for status 6X are not supported by offpunk\n")
|
||||
print("Please open a bug report")
|
||||
_fetch_gemini(url)
|
||||
# Invalid status
|
||||
elif not status.startswith("2"):
|
||||
raise RuntimeError("Server returned undefined status code %s!" % status)
|
||||
# If we're here, this must be a success and there's a response body
|
||||
assert status.startswith("2")
|
||||
mime = meta
|
||||
# Read the response body over the network
|
||||
fbody = f.read()
|
||||
# DEFAULT GEMINI MIME
|
||||
if mime == "":
|
||||
mime = "text/gemini; charset=utf-8"
|
||||
shortmime, mime_options = parse_mime(mime)
|
||||
if "charset" in mime_options:
|
||||
try:
|
||||
codecs.lookup(mime_options["charset"])
|
||||
except LookupError:
|
||||
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
|
||||
#If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header
|
||||
mime_options["charset"] = "UTF-8"
|
||||
if shortmime.startswith("text/"):
|
||||
#Get the charset and default to UTF-8 in none
|
||||
encoding = mime_options.get("charset", "UTF-8")
|
||||
try:
|
||||
body = fbody.decode(encoding)
|
||||
except UnicodeError:
|
||||
raise RuntimeError("Could not decode response body using %s\
|
||||
encoding declared in header!" % encoding)
|
||||
else:
|
||||
body = fbody
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
|
||||
def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs):
|
||||
url = normalize_url(url)
|
||||
path=None
|
||||
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
|
||||
if is_cache_valid(url,validity=validity):
|
||||
path = get_cache_path(url)
|
||||
#If we are offline, any cache is better than nothing
|
||||
elif offline and is_cache_valid(url,validity=0):
|
||||
path = get_cache_path(url)
|
||||
elif "://" in url and not offline:
|
||||
try:
|
||||
scheme = url.split("://")[0]
|
||||
if scheme not in standard_ports:
|
||||
if print_error:
|
||||
print("%s is not a supported protocol"%scheme)
|
||||
path = None
|
||||
elif scheme in ("http","https"):
|
||||
path=_fetch_http(url,**kwargs)
|
||||
elif scheme == "gopher":
|
||||
path=_fetch_gopher(url,**kwargs)
|
||||
elif scheme == "finger":
|
||||
path=_fetch_finger(url,**kwargs)
|
||||
elif scheme == "gemini":
|
||||
path=_fetch_gemini(url,**kwargs)
|
||||
else:
|
||||
print("scheme %s not implemented yet")
|
||||
except UserAbortException:
|
||||
return
|
||||
except Exception as err:
|
||||
cache = set_error(url, err)
|
||||
# Print an error message
|
||||
# we fail silently when sync_only
|
||||
if isinstance(err, socket.gaierror):
|
||||
if print_error:
|
||||
print("ERROR: DNS error!")
|
||||
elif isinstance(err, ConnectionRefusedError):
|
||||
if print_error:
|
||||
print("ERROR1: Connection refused!")
|
||||
elif isinstance(err, ConnectionResetError):
|
||||
if print_error:
|
||||
print("ERROR2: Connection reset!")
|
||||
elif isinstance(err, (TimeoutError, socket.timeout)):
|
||||
if print_error:
|
||||
print("""ERROR3: Connection timed out!
|
||||
Slow internet connection? Use 'set timeout' to be more patient.""")
|
||||
elif isinstance(err, FileExistsError):
|
||||
if print_error:
|
||||
print("""ERROR5: Trying to create a directory which already exists
|
||||
in the cache : """)
|
||||
print(err)
|
||||
elif isinstance(err,requests.exceptions.SSLError):
|
||||
if print_error:
|
||||
print("""ERROR6: Bad SSL certificate:\n""")
|
||||
print(err)
|
||||
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
|
||||
print("""set accept_bad_ssl_certificates True""")
|
||||
elif isinstance(err,requests.exceptions.ConnectionError):
|
||||
if print_error:
|
||||
print("""ERROR7: Cannot connect to URL:\n""")
|
||||
print(str(err))
|
||||
else:
|
||||
if print_error:
|
||||
import traceback
|
||||
print("ERROR4: " + str(type(err)) + " : " + str(err))
|
||||
#print("\n" + str(err.with_traceback(None)))
|
||||
print(traceback.format_exc())
|
||||
return cache
|
||||
# We download images contained in the document (from full mode)
|
||||
if not offline and download_image_first and images_mode:
|
||||
renderer = ansicat.renderer_from_file(path,url)
|
||||
if renderer:
|
||||
for image in renderer.get_images(mode=images_mode):
|
||||
#Image should exist, should be an url (not a data image)
|
||||
#and should not be already cached
|
||||
if image and not image.startswith("data:image/") and not is_cache_valid(image):
|
||||
width = offutils.term_width() - 1
|
||||
toprint = "Downloading %s" %image
|
||||
toprint = toprint[:width]
|
||||
toprint += " "*(width-len(toprint))
|
||||
print(toprint,end="\r")
|
||||
#d_i_f and images_mode are False/None to avoid recursive downloading
|
||||
#if that ever happen
|
||||
fetch(image,offline=offline,download_image_first=False,\
|
||||
images_mode=None,validity=0,**kwargs)
|
||||
return path
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
# Parse arguments
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--path", action="store_true",
|
||||
help="return path to the cache instead of the content of the cache")
|
||||
parser.add_argument("--offline", action="store_true",
|
||||
help="Do not attempt to download, return cached version or error")
|
||||
parser.add_argument("--max-size", type=int,
|
||||
help="Cancel download of items above that size (value in Mb).")
|
||||
parser.add_argument("--timeout", type=int,
|
||||
help="Time to wait before cancelling connection (in second).")
|
||||
# No argument: write help
|
||||
parser.add_argument('url', metavar='URL', nargs='*',
|
||||
help='download URL and returns the content or the path to a cached version')
|
||||
# arg = URL: download and returns cached URI
|
||||
# --cache-validity : do not download if cache is valid
|
||||
# --validity : returns the date of the cached version, Null if no version
|
||||
# --force-download : download and replace cache, even if valid
|
||||
args = parser.parse_args()
|
||||
|
||||
param = {}
|
||||
|
||||
for u in args.url:
|
||||
if args.offline:
|
||||
path = get_cache_path(u)
|
||||
else:
|
||||
print("Download URL: %s" %u)
|
||||
path = fetch(u,max_size=args.max_size,timeout=args.timeout)
|
||||
if args.path:
|
||||
print(path)
|
||||
else:
|
||||
with open(path,"r") as f:
|
||||
print(f.read())
|
||||
f.close()
|
||||
|
||||
|
||||
if __name__== '__main__':
|
||||
from offpunk.netcache import main
|
||||
main()
|
||||
|
|
1918
offpunk.py
1918
offpunk.py
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,3 @@
|
|||
from offpunk import main
|
||||
main()
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,890 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import urllib.parse
|
||||
import argparse
|
||||
import requests
|
||||
import codecs
|
||||
import getpass
|
||||
import socket
|
||||
import ssl
|
||||
import glob
|
||||
import datetime
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from ssl import CertificateError
|
||||
from offpunk import ansicat, utils
|
||||
from offpunk.utils import _CACHE_PATH,_DATA_DIR,_CONFIG_DIR
|
||||
import time
|
||||
try:
|
||||
import chardet
|
||||
_HAS_CHARDET = True
|
||||
except ModuleNotFoundError:
|
||||
_HAS_CHARDET = False
|
||||
|
||||
try:
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
_HAS_CRYPTOGRAPHY = True
|
||||
_BACKEND = default_backend()
|
||||
except(ModuleNotFoundError,ImportError):
|
||||
_HAS_CRYPTOGRAPHY = False
|
||||
|
||||
if not os.path.exists(_CACHE_PATH):
|
||||
print("Creating cache directory {}".format(_CACHE_PATH))
|
||||
os.makedirs(_CACHE_PATH)
|
||||
|
||||
# This list is also used as a list of supported protocols
|
||||
standard_ports = {
|
||||
"gemini" : 1965,
|
||||
"gopher" : 70,
|
||||
"finger" : 79,
|
||||
"http" : 80,
|
||||
"https" : 443,
|
||||
"spartan": 300,
|
||||
}
|
||||
default_protocol = "gemini"
|
||||
|
||||
CRLF = '\r\n'
|
||||
DEFAULT_TIMEOUT = 10
|
||||
_MAX_REDIRECTS = 5
|
||||
|
||||
# monkey-patch Gemini support in urllib.parse
|
||||
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
|
||||
urllib.parse.uses_relative.append("gemini")
|
||||
urllib.parse.uses_netloc.append("gemini")
|
||||
urllib.parse.uses_relative.append("spartan")
|
||||
urllib.parse.uses_netloc.append("spartan")
|
||||
|
||||
|
||||
class UserAbortException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def parse_mime(mime):
|
||||
options = {}
|
||||
if mime:
|
||||
if ";" in mime:
|
||||
splited = mime.split(";",maxsplit=1)
|
||||
mime = splited[0]
|
||||
if len(splited) >= 1:
|
||||
options_list = splited[1].split()
|
||||
for o in options_list:
|
||||
spl = o.split("=",maxsplit=1)
|
||||
if len(spl) > 0:
|
||||
options[spl[0]] = spl[1]
|
||||
return mime, options
|
||||
|
||||
def normalize_url(url):
|
||||
if "://" not in url and ("./" not in url and url[0] != "/"):
|
||||
if not url.startswith("mailto:"):
|
||||
url = "gemini://" + url
|
||||
return url
|
||||
|
||||
|
||||
def cache_last_modified(url):
|
||||
if not url:
|
||||
return None
|
||||
path = get_cache_path(url)
|
||||
if path:
|
||||
return os.path.getmtime(path)
|
||||
else:
|
||||
print("ERROR : NO CACHE in cache_last_modified")
|
||||
return None
|
||||
|
||||
def is_cache_valid(url,validity=0):
|
||||
# Validity is the acceptable time for
|
||||
# a cache to be valid (in seconds)
|
||||
# If 0, then any cache is considered as valid
|
||||
# (use validity = 1 if you want to refresh everything)
|
||||
if utils.is_local(url):
|
||||
return True
|
||||
cache = get_cache_path(url)
|
||||
if cache :
|
||||
# If path is too long, we always return True to avoid
|
||||
# fetching it.
|
||||
if len(cache) > 259:
|
||||
print("We return False because path is too long")
|
||||
return False
|
||||
if os.path.exists(cache) and not os.path.isdir(cache):
|
||||
if validity > 0 :
|
||||
last_modification = cache_last_modified(url)
|
||||
now = time.time()
|
||||
age = now - last_modification
|
||||
return age < validity
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
#Cache has not been build
|
||||
return False
|
||||
else:
|
||||
#There’s not even a cache!
|
||||
return False
|
||||
|
||||
def get_cache_path(url):
|
||||
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
|
||||
# In that case, we need to reconstruct it
|
||||
#First, we parse the URL
|
||||
if not url:
|
||||
return None
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
|
||||
scheme = "file"
|
||||
elif parsed.scheme:
|
||||
scheme = parsed.scheme
|
||||
else:
|
||||
scheme = default_protocol
|
||||
if scheme in ["file","mailto","list"]:
|
||||
local = True
|
||||
host = ""
|
||||
port = None
|
||||
# file:// is 7 char
|
||||
if url.startswith("file://"):
|
||||
path = url[7:]
|
||||
elif scheme == "mailto":
|
||||
path = parsed.path
|
||||
elif url.startswith("list://"):
|
||||
listdir = os.path.join(_DATA_DIR,"lists")
|
||||
listname = url[7:].lstrip("/")
|
||||
if listname in [""]:
|
||||
name = "My Lists"
|
||||
path = listdir
|
||||
else:
|
||||
name = listname
|
||||
path = os.path.join(listdir, "%s.gmi"%listname)
|
||||
else:
|
||||
path = url
|
||||
else:
|
||||
local = False
|
||||
# Convert unicode hostname to punycode using idna RFC3490
|
||||
host = parsed.hostname #.encode("idna").decode()
|
||||
port = parsed.port or standard_ports.get(scheme, 0)
|
||||
# special gopher selector case
|
||||
if scheme == "gopher":
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
path = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
path = ""
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s"):
|
||||
mime = "binary"
|
||||
else:
|
||||
mime = "text/gopher"
|
||||
else:
|
||||
path = parsed.path
|
||||
if parsed.query:
|
||||
# we don’t add the query if path is too long because path above 260 char
|
||||
# are not supported and crash python.
|
||||
# Also, very long query are usually useless stuff
|
||||
if len(path+parsed.query) < 258:
|
||||
path += "/" + parsed.query
|
||||
|
||||
# Now, we have a partial path. Let’s make it full path.
|
||||
if local:
|
||||
cache_path = path
|
||||
elif scheme and host:
|
||||
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
|
||||
#There’s an OS limitation of 260 characters per path.
|
||||
#We will thus cut the path enough to add the index afterward
|
||||
cache_path = cache_path[:249]
|
||||
# FIXME : this is a gross hack to give a name to
|
||||
# index files. This will break if the index is not
|
||||
# index.gmi. I don’t know how to know the real name
|
||||
# of the file. But first, we need to ensure that the domain name
|
||||
# finish by "/". Else, the cache will create a file, not a folder.
|
||||
if scheme.startswith("http"):
|
||||
index = "index.html"
|
||||
elif scheme == "finger":
|
||||
index = "index.txt"
|
||||
elif scheme == "gopher":
|
||||
index = "gophermap"
|
||||
else:
|
||||
index = "index.gmi"
|
||||
if path == "" or os.path.isdir(cache_path):
|
||||
if not cache_path.endswith("/"):
|
||||
cache_path += "/"
|
||||
if not url.endswith("/"):
|
||||
url += "/"
|
||||
if cache_path.endswith("/"):
|
||||
cache_path += index
|
||||
#sometimes, the index itself is a dir
|
||||
#like when folder/index.gmi?param has been created
|
||||
#and we try to access folder
|
||||
if os.path.isdir(cache_path):
|
||||
cache_path += "/" + index
|
||||
else:
|
||||
#URL is missing either a supported scheme or a valid host
|
||||
#print("Error: %s is not a supported url"%url)
|
||||
return None
|
||||
if len(cache_path) > 259:
|
||||
print("Path is too long. This is an OS limitation.\n\n")
|
||||
print(url)
|
||||
return None
|
||||
return cache_path
|
||||
|
||||
def write_body(url,body,mime=None):
|
||||
## body is a copy of the raw gemtext
|
||||
## Write_body() also create the cache !
|
||||
# DEFAULT GEMINI MIME
|
||||
mime, options = parse_mime(mime)
|
||||
cache_path = get_cache_path(url)
|
||||
if cache_path:
|
||||
if mime and mime.startswith("text/"):
|
||||
mode = "w"
|
||||
else:
|
||||
mode = "wb"
|
||||
cache_dir = os.path.dirname(cache_path)
|
||||
# If the subdirectory already exists as a file (not a folder)
|
||||
# We remove it (happens when accessing URL/subfolder before
|
||||
# URL/subfolder/file.gmi.
|
||||
# This causes loss of data in the cache
|
||||
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
|
||||
# If the subdirectory doesn’t exist, we recursively try to find one
|
||||
# until it exists to avoid a file blocking the creation of folders
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
with open(cache_path, mode=mode) as f:
|
||||
f.write(body)
|
||||
f.close()
|
||||
return cache_path
|
||||
|
||||
|
||||
def set_error(url,err):
|
||||
# If we get an error, we want to keep an existing cache
|
||||
# but we need to touch it or to create an empty one
|
||||
# to avoid hitting the error at each refresh
|
||||
cache = get_cache_path(url)
|
||||
if is_cache_valid(url):
|
||||
os.utime(cache)
|
||||
else:
|
||||
cache_dir = os.path.dirname(cache)
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
if os.path.isdir(cache_dir):
|
||||
with open(cache, "w") as c:
|
||||
c.write(str(datetime.datetime.now())+"\n")
|
||||
c.write("ERROR while caching %s\n\n" %url)
|
||||
c.write("*****\n\n")
|
||||
c.write(str(type(err)) + " = " + str(err))
|
||||
#cache.write("\n" + str(err.with_traceback(None)))
|
||||
c.write("\n*****\n\n")
|
||||
c.write("If you believe this error was temporary, type ""reload"".\n")
|
||||
c.write("The ressource will be tentatively fetched during next sync.\n")
|
||||
c.close()
|
||||
return cache
|
||||
|
||||
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs):
|
||||
def too_large_error(url,length,max_size):
|
||||
err = "Size of %s is %s Mo\n"%(url,length)
|
||||
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
|
||||
err += "To retrieve this content anyway, type 'reload'."
|
||||
return set_error(url,err)
|
||||
if accept_bad_ssl_certificates:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
|
||||
requests.packages.urllib3.disable_warnings()
|
||||
verify=False
|
||||
else:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2'
|
||||
verify=True
|
||||
header = {}
|
||||
header["User-Agent"] = "Netcache"
|
||||
with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
|
||||
if "content-type" in response.headers:
|
||||
mime = response.headers['content-type']
|
||||
else:
|
||||
mime = None
|
||||
if "content-length" in response.headers:
|
||||
length = int(response.headers['content-length'])
|
||||
else:
|
||||
length = 0
|
||||
if max_size and length > max_size:
|
||||
response.close()
|
||||
return too_large_error(url,str(length/100),max_size)
|
||||
elif max_size and length == 0:
|
||||
body = b''
|
||||
downloaded = 0
|
||||
for r in response.iter_content():
|
||||
body += r
|
||||
#We divide max_size for streamed content
|
||||
#in order to catch them faster
|
||||
size = sys.getsizeof(body)
|
||||
max = max_size/2
|
||||
current = round(size*100/max,1)
|
||||
if current > downloaded:
|
||||
downloaded = current
|
||||
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
|
||||
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
|
||||
if size > max_size/2:
|
||||
response.close()
|
||||
return too_large_error(url,"streaming",max_size)
|
||||
response.close()
|
||||
else:
|
||||
body = response.content
|
||||
response.close()
|
||||
if mime and "text/" in mime:
|
||||
body = body.decode("UTF-8","replace")
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed =urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or 70
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
selector = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
selector = ""
|
||||
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
|
||||
s = socket.create_connection((host,port))
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
if parsed.query:
|
||||
request = selector + "\t" + parsed.query
|
||||
else:
|
||||
request = selector
|
||||
request += "\r\n"
|
||||
s.sendall(request.encode("UTF-8"))
|
||||
response = s.makefile("rb").read()
|
||||
# Transcode response into UTF-8
|
||||
#if itemtype in ("0","1","h"):
|
||||
if not itemtype in ("9","g","I","s"):
|
||||
# Try most common encodings
|
||||
for encoding in ("UTF-8", "ISO-8859-1"):
|
||||
try:
|
||||
response = response.decode("UTF-8")
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
else:
|
||||
# try to find encoding
|
||||
if _HAS_CHARDET:
|
||||
detected = chardet.detect(response)
|
||||
response = response.decode(detected["encoding"])
|
||||
else:
|
||||
raise UnicodeDecodeError
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s"):
|
||||
mime = None
|
||||
else:
|
||||
# by default, we should consider Gopher
|
||||
mime = "text/gopher"
|
||||
cache = write_body(url,response,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or standard_ports["finger"]
|
||||
query = parsed.path.lstrip("/") + "\r\n"
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
sock.settimeout(timeout)
|
||||
sock.send(query.encode())
|
||||
response = sock.makefile("rb").read().decode("UTF-8")
|
||||
cache = write_body(response,"text/plain")
|
||||
return cache
|
||||
|
||||
# Originally copied from reference spartan client by Michael Lazar
|
||||
def _fetch_spartan(url,**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["spartan"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
redirect_url = None
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
if query:
|
||||
data = urllib.parse.unquote_to_bytes(query)
|
||||
else:
|
||||
data = b""
|
||||
encoded_host = host.encode("idna")
|
||||
ascii_path = urllib.parse.unquote_to_bytes(path)
|
||||
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
|
||||
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
|
||||
fp = sock.makefile("rb")
|
||||
response = fp.readline(4096).decode("ascii").strip("\r\n")
|
||||
parts = response.split(" ",maxsplit=1)
|
||||
code,meta = int(parts[0]),parts[1]
|
||||
if code == 2:
|
||||
body = fp.read()
|
||||
if meta.startswith("text"):
|
||||
body = body.decode("UTF-8")
|
||||
cache = write_body(url,body,meta)
|
||||
elif code == 3:
|
||||
redirect_url = url_parts._replace(path=meta).geturl()
|
||||
else:
|
||||
return set_error(url,"Spartan code %s: Error %s"%(code,meta))
|
||||
if redirect_url:
|
||||
cache = _fetch_spartan(redirect_url)
|
||||
return cache
|
||||
|
||||
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
|
||||
"""
|
||||
Validate a TLS certificate in TOFU mode.
|
||||
|
||||
If the cryptography module is installed:
|
||||
- Check the certificate Common Name or SAN matches `host`
|
||||
- Check the certificate's not valid before date is in the past
|
||||
- Check the certificate's not valid after date is in the future
|
||||
|
||||
Whether the cryptography module is installed or not, check the
|
||||
certificate's fingerprint against the TOFU database to see if we've
|
||||
previously encountered a different certificate for this IP address and
|
||||
hostname.
|
||||
"""
|
||||
now = datetime.datetime.utcnow()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Using the cryptography module we can get detailed access
|
||||
# to the properties of even self-signed certs, unlike in
|
||||
# the standard ssl library...
|
||||
c = x509.load_der_x509_certificate(cert, _BACKEND)
|
||||
# Check certificate validity dates
|
||||
if accept_bad_ssl:
|
||||
if c.not_valid_before >= now:
|
||||
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
|
||||
elif c.not_valid_after <= now:
|
||||
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
|
||||
|
||||
# Check certificate hostnames
|
||||
names = []
|
||||
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
|
||||
if common_name:
|
||||
names.append(common_name[0].value)
|
||||
try:
|
||||
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
|
||||
except x509.ExtensionNotFound:
|
||||
pass
|
||||
names = set(names)
|
||||
for name in names:
|
||||
try:
|
||||
ssl._dnsname_match(str(name), host)
|
||||
break
|
||||
except CertificateError:
|
||||
continue
|
||||
else:
|
||||
# If we didn't break out, none of the names were valid
|
||||
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
|
||||
|
||||
sha = hashlib.sha256()
|
||||
sha.update(cert)
|
||||
fingerprint = sha.hexdigest()
|
||||
|
||||
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
|
||||
db_conn = sqlite3.connect(db_path)
|
||||
db_cur = db_conn.cursor()
|
||||
|
||||
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
|
||||
(hostname text, address text, fingerprint text,
|
||||
first_seen date, last_seen date, count integer)""")
|
||||
# Have we been here before?
|
||||
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
|
||||
FROM cert_cache
|
||||
WHERE hostname=? AND address=?""", (host, address))
|
||||
cached_certs = db_cur.fetchall()
|
||||
|
||||
# If so, check for a match
|
||||
if cached_certs:
|
||||
max_count = 0
|
||||
most_frequent_cert = None
|
||||
for cached_fingerprint, first, last, count in cached_certs:
|
||||
if count > max_count:
|
||||
max_count = count
|
||||
most_frequent_cert = cached_fingerprint
|
||||
if fingerprint == cached_fingerprint:
|
||||
# Matched!
|
||||
db_cur.execute("""UPDATE cert_cache
|
||||
SET last_seen=?, count=?
|
||||
WHERE hostname=? AND address=? AND fingerprint=?""",
|
||||
(now, count+1, host, address, fingerprint))
|
||||
db_conn.commit()
|
||||
break
|
||||
else:
|
||||
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
|
||||
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
|
||||
previous_cert = fp.read()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Load the most frequently seen certificate to see if it has
|
||||
# expired
|
||||
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
|
||||
previous_ttl = previous_cert.not_valid_after - now
|
||||
print(previous_ttl)
|
||||
|
||||
print("****************************************")
|
||||
print("[SECURITY WARNING] Unrecognised certificate!")
|
||||
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
|
||||
print("This MIGHT be a Man-in-the-Middle attack.")
|
||||
print("A different certificate has previously been seen {} times.".format(max_count))
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
if previous_ttl < datetime.timedelta():
|
||||
print("That certificate has expired, which reduces suspicion somewhat.")
|
||||
else:
|
||||
print("That certificate is still valid for: {}".format(previous_ttl))
|
||||
print("****************************************")
|
||||
print("Attempt to verify the new certificate fingerprint out-of-band:")
|
||||
print(fingerprint)
|
||||
if automatic_choice:
|
||||
choice = automatic_choice
|
||||
else:
|
||||
choice = input("Accept this new certificate? Y/N ").strip().lower()
|
||||
if choice in ("y", "yes"):
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
else:
|
||||
raise Exception("TOFU Failure!")
|
||||
|
||||
# If not, cache this cert
|
||||
else:
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
|
||||
if not os.path.exists(certdir):
|
||||
os.makedirs(certdir)
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
|
||||
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
|
||||
**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["gemini"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
# In AV-98, this was the _send_request method
|
||||
#Send a selector to a given host and port.
|
||||
#Returns the resolved address and binary file with the reply."""
|
||||
host = host.encode("idna").decode()
|
||||
# Do DNS resolution
|
||||
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
|
||||
if ":" in host:
|
||||
# This is likely a literal IPv6 address, so we can *only* ask for
|
||||
# IPv6 addresses or getaddrinfo will complain
|
||||
family_mask = socket.AF_INET6
|
||||
elif socket.has_ipv6:
|
||||
# Accept either IPv4 or IPv6 addresses
|
||||
family_mask = 0
|
||||
else:
|
||||
# IPv4 only
|
||||
family_mask = socket.AF_INET
|
||||
addresses = socket.getaddrinfo(host, port, family=family_mask,
|
||||
type=socket.SOCK_STREAM)
|
||||
# Sort addresses so IPv6 ones come first
|
||||
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
|
||||
## Continuation of send_request
|
||||
# Prepare TLS context
|
||||
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
|
||||
context = ssl.SSLContext(protocol)
|
||||
context.check_hostname=False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
# Impose minimum TLS version
|
||||
## In 3.7 and above, this is easy...
|
||||
if sys.version_info.minor >= 7:
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
## Otherwise, it seems very hard...
|
||||
## The below is less strict than it ought to be, but trying to disable
|
||||
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
|
||||
## with recent versions of OpenSSL. What a mess...
|
||||
else:
|
||||
context.options |= ssl.OP_NO_SSLv3
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
# Try to enforce sensible ciphers
|
||||
try:
|
||||
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
|
||||
except ssl.SSLError:
|
||||
# Rely on the server to only support sensible things, I guess...
|
||||
pass
|
||||
# Connect to remote host by any address possible
|
||||
err = None
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
s = context.wrap_socket(s, server_hostname = host)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
else:
|
||||
# If we couldn't connect to *any* of the addresses, just
|
||||
# bubble up the exception from the last attempt and deny
|
||||
# knowledge of earlier failures.
|
||||
raise err
|
||||
|
||||
# Do TOFU
|
||||
cert = s.getpeercert(binary_form=True)
|
||||
# Remember that we showed the current cert to this domain...
|
||||
#TODO : accept badssl and automatic choice
|
||||
_validate_cert(address[4][0], host, cert,automatic_choice="y")
|
||||
# Send request and wrap response in a file descriptor
|
||||
url = urllib.parse.urlparse(url)
|
||||
new_netloc = host
|
||||
if port != standard_ports["gemini"]:
|
||||
new_netloc += ":" + str(port)
|
||||
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
|
||||
s.sendall((url + CRLF).encode("UTF-8"))
|
||||
f= s.makefile(mode = "rb")
|
||||
## end of send_request in AV98
|
||||
# Spec dictates <META> should not exceed 1024 bytes,
|
||||
# so maximum valid header length is 1027 bytes.
|
||||
header = f.readline(1027)
|
||||
header = urllib.parse.unquote(header.decode("UTF-8"))
|
||||
if not header or header[-1] != '\n':
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
header = header.strip()
|
||||
# Validate header
|
||||
status, meta = header.split(maxsplit=1)
|
||||
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
|
||||
f.close()
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
# Update redirect loop/maze escaping state
|
||||
if not status.startswith("3"):
|
||||
previous_redirectors = set()
|
||||
#TODO FIXME
|
||||
else:
|
||||
#we set a previous_redirectors anyway because refactoring in progress
|
||||
previous_redirectors = set()
|
||||
# Handle non-SUCCESS headers, which don't have a response body
|
||||
# Inputs
|
||||
if status.startswith("1"):
|
||||
if interactive:
|
||||
print(meta)
|
||||
if status == "11":
|
||||
user_input = getpass.getpass("> ")
|
||||
else:
|
||||
#TODO:FIXME we should not ask for user input while non-interactive
|
||||
user_input = input("> ")
|
||||
return _fetch_gemini(query(user_input))
|
||||
else:
|
||||
return None
|
||||
# Redirects
|
||||
elif status.startswith("3"):
|
||||
newurl = urllib.parse.urljoin(url,meta)
|
||||
if newurl == url:
|
||||
raise RuntimeError("URL redirects to itself!")
|
||||
elif newurl in previous_redirectors:
|
||||
raise RuntimeError("Caught in redirect loop!")
|
||||
elif len(previous_redirectors) == _MAX_REDIRECTS:
|
||||
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
|
||||
# TODO: redirections handling should be refactored
|
||||
# elif "interactive" in options and not options["interactive"]:
|
||||
# follow = self.automatic_choice
|
||||
# # Never follow cross-domain redirects without asking
|
||||
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
|
||||
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Never follow cross-protocol redirects without asking
|
||||
# elif new_gi.scheme != gi.scheme:
|
||||
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Don't follow *any* redirect without asking if auto-follow is off
|
||||
# elif not self.options["auto_follow_redirects"]:
|
||||
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Otherwise, follow away
|
||||
else:
|
||||
follow = "yes"
|
||||
if follow.strip().lower() not in ("y", "yes"):
|
||||
raise UserAbortException()
|
||||
previous_redirectors.add(url)
|
||||
# if status == "31":
|
||||
# # Permanent redirect
|
||||
# self.permanent_redirects[gi.url] = new_gi.url
|
||||
return _fetch_gemini(newurl)
|
||||
# Errors
|
||||
elif status.startswith("4") or status.startswith("5"):
|
||||
raise RuntimeError(meta)
|
||||
# Client cert
|
||||
elif status.startswith("6"):
|
||||
print("Handling certificates for status 6X are not supported by offpunk\n")
|
||||
print("Please open a bug report")
|
||||
_fetch_gemini(url)
|
||||
# Invalid status
|
||||
elif not status.startswith("2"):
|
||||
raise RuntimeError("Server returned undefined status code %s!" % status)
|
||||
# If we're here, this must be a success and there's a response body
|
||||
assert status.startswith("2")
|
||||
mime = meta
|
||||
# Read the response body over the network
|
||||
fbody = f.read()
|
||||
# DEFAULT GEMINI MIME
|
||||
if mime == "":
|
||||
mime = "text/gemini; charset=utf-8"
|
||||
shortmime, mime_options = parse_mime(mime)
|
||||
if "charset" in mime_options:
|
||||
try:
|
||||
codecs.lookup(mime_options["charset"])
|
||||
except LookupError:
|
||||
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
|
||||
#If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header
|
||||
mime_options["charset"] = "UTF-8"
|
||||
if shortmime.startswith("text/"):
|
||||
#Get the charset and default to UTF-8 in none
|
||||
encoding = mime_options.get("charset", "UTF-8")
|
||||
try:
|
||||
body = fbody.decode(encoding)
|
||||
except UnicodeError:
|
||||
raise RuntimeError("Could not decode response body using %s\
|
||||
encoding declared in header!" % encoding)
|
||||
else:
|
||||
body = fbody
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
|
||||
def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs):
|
||||
url = normalize_url(url)
|
||||
path=None
|
||||
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
|
||||
if is_cache_valid(url,validity=validity):
|
||||
path = get_cache_path(url)
|
||||
#If we are offline, any cache is better than nothing
|
||||
elif offline and is_cache_valid(url,validity=0):
|
||||
path = get_cache_path(url)
|
||||
elif "://" in url and not offline:
|
||||
try:
|
||||
scheme = url.split("://")[0]
|
||||
if scheme not in standard_ports:
|
||||
if print_error:
|
||||
print("%s is not a supported protocol"%scheme)
|
||||
path = None
|
||||
elif scheme in ("http","https"):
|
||||
path=_fetch_http(url,**kwargs)
|
||||
elif scheme == "gopher":
|
||||
path=_fetch_gopher(url,**kwargs)
|
||||
elif scheme == "finger":
|
||||
path=_fetch_finger(url,**kwargs)
|
||||
elif scheme == "gemini":
|
||||
path=_fetch_gemini(url,**kwargs)
|
||||
else:
|
||||
print("scheme %s not implemented yet")
|
||||
except UserAbortException:
|
||||
return
|
||||
except Exception as err:
|
||||
cache = set_error(url, err)
|
||||
# Print an error message
|
||||
# we fail silently when sync_only
|
||||
if isinstance(err, socket.gaierror):
|
||||
if print_error:
|
||||
print("ERROR: DNS error!")
|
||||
elif isinstance(err, ConnectionRefusedError):
|
||||
if print_error:
|
||||
print("ERROR1: Connection refused!")
|
||||
elif isinstance(err, ConnectionResetError):
|
||||
if print_error:
|
||||
print("ERROR2: Connection reset!")
|
||||
elif isinstance(err, (TimeoutError, socket.timeout)):
|
||||
if print_error:
|
||||
print("""ERROR3: Connection timed out!
|
||||
Slow internet connection? Use 'set timeout' to be more patient.""")
|
||||
elif isinstance(err, FileExistsError):
|
||||
if print_error:
|
||||
print("""ERROR5: Trying to create a directory which already exists
|
||||
in the cache : """)
|
||||
print(err)
|
||||
elif isinstance(err,requests.exceptions.SSLError):
|
||||
if print_error:
|
||||
print("""ERROR6: Bad SSL certificate:\n""")
|
||||
print(err)
|
||||
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
|
||||
print("""set accept_bad_ssl_certificates True""")
|
||||
elif isinstance(err,requests.exceptions.ConnectionError):
|
||||
if print_error:
|
||||
print("""ERROR7: Cannot connect to URL:\n""")
|
||||
print(str(err))
|
||||
else:
|
||||
if print_error:
|
||||
import traceback
|
||||
print("ERROR4: " + str(type(err)) + " : " + str(err))
|
||||
#print("\n" + str(err.with_traceback(None)))
|
||||
print(traceback.format_exc())
|
||||
return cache
|
||||
# We download images contained in the document (from full mode)
|
||||
if not offline and download_image_first and images_mode:
|
||||
renderer = ansicat.renderer_from_file(path,url)
|
||||
if renderer:
|
||||
for image in renderer.get_images(mode=images_mode):
|
||||
#Image should exist, should be an url (not a data image)
|
||||
#and should not be already cached
|
||||
if image and not image.startswith("data:image/") and not is_cache_valid(image):
|
||||
width = utils.term_width() - 1
|
||||
toprint = "Downloading %s" %image
|
||||
toprint = toprint[:width]
|
||||
toprint += " "*(width-len(toprint))
|
||||
print(toprint,end="\r")
|
||||
#d_i_f and images_mode are False/None to avoid recursive downloading
|
||||
#if that ever happen
|
||||
fetch(image,offline=offline,download_image_first=False,\
|
||||
images_mode=None,validity=0,**kwargs)
|
||||
return path
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
# Parse arguments
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--path", action="store_true",
|
||||
help="return path to the cache instead of the content of the cache")
|
||||
parser.add_argument("--offline", action="store_true",
|
||||
help="Do not attempt to download, return cached version or error")
|
||||
parser.add_argument("--max-size", type=int,
|
||||
help="Cancel download of items above that size (value in Mb).")
|
||||
parser.add_argument("--timeout", type=int,
|
||||
help="Time to wait before cancelling connection (in second).")
|
||||
# No argument: write help
|
||||
parser.add_argument('url', metavar='URL', nargs='*',
|
||||
help='download URL and returns the content or the path to a cached version')
|
||||
# arg = URL: download and returns cached URI
|
||||
# --cache-validity : do not download if cache is valid
|
||||
# --validity : returns the date of the cached version, Null if no version
|
||||
# --force-download : download and replace cache, even if valid
|
||||
args = parser.parse_args()
|
||||
|
||||
param = {}
|
||||
|
||||
for u in args.url:
|
||||
if args.offline:
|
||||
path = get_cache_path(u)
|
||||
else:
|
||||
print("Download URL: %s" %u)
|
||||
path = fetch(u,max_size=args.max_size,timeout=args.timeout)
|
||||
if args.path:
|
||||
print(path)
|
||||
else:
|
||||
with open(path,"r") as f:
|
||||
print(f.read())
|
||||
f.close()
|
||||
|
||||
|
||||
if __name__== '__main__':
|
||||
main()
|
|
@ -0,0 +1,281 @@
|
|||
#!/usr/bin/env python3
|
||||
#opnk stand for "Open like a PuNK".
|
||||
#It will open any file or URL and display it nicely in less.
|
||||
#If not possible, it will fallback to xdg-open
|
||||
#URL are retrieved through netcache
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import argparse
|
||||
import shutil
|
||||
import time
|
||||
import fnmatch
|
||||
from offpunk import ansicat, netcache, utils
|
||||
from offpunk.utils import run,term_width,mode_url,unmode_url,is_local
|
||||
|
||||
_HAS_XDGOPEN = shutil.which('xdg-open')
|
||||
_GREP = "grep --color=auto"
|
||||
|
||||
less_version = 0
|
||||
if not shutil.which("less"):
|
||||
print("Please install the pager \"less\" to run Offpunk.")
|
||||
print("If you wish to use another pager, send me an email !")
|
||||
print("(I’m really curious to hear about people not having \"less\" on their system.)")
|
||||
sys.exit()
|
||||
output = run("less --version")
|
||||
# We get less Version (which is the only integer on the first line)
|
||||
words = output.split("\n")[0].split()
|
||||
less_version = 0
|
||||
for w in words:
|
||||
if w.isdigit():
|
||||
less_version = int(w)
|
||||
# restoring position only works for version of less > 572
|
||||
if less_version >= 572:
|
||||
_LESS_RESTORE_POSITION = True
|
||||
else:
|
||||
_LESS_RESTORE_POSITION = False
|
||||
#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s"
|
||||
# -E : quit when reaching end of file (to behave like "cat")
|
||||
# -F : quit if content fits the screen (behave like "cat")
|
||||
# -X : does not clear the screen
|
||||
# -R : interpret ANSI colors correctly
|
||||
# -f : suppress warning for some contents
|
||||
# -M : long prompt (to have info about where you are in the file)
|
||||
# -W : hilite the new first line after a page skip (space)
|
||||
# -i : ignore case in search
|
||||
# -S : do not wrap long lines. Wrapping is done by offpunk, longlines
|
||||
# are there on purpose (surch in asciiart)
|
||||
#--incsearch : incremental search starting rev581
|
||||
if less_version >= 581:
|
||||
less_base = "less --incsearch --save-marks -~ -XRfMWiS"
|
||||
elif less_version >= 572:
|
||||
less_base = "less --save-marks -XRfMWiS"
|
||||
else:
|
||||
less_base = "less -XRfMWiS"
|
||||
_DEFAULT_LESS = less_base + " \"+''\" %s"
|
||||
_DEFAULT_CAT = less_base + " -EF %s"
|
||||
|
||||
def less_cmd(file, histfile=None,cat=False,grep=None):
|
||||
if histfile:
|
||||
env = {"LESSHISTFILE": histfile}
|
||||
else:
|
||||
env = {}
|
||||
if cat:
|
||||
cmd_str = _DEFAULT_CAT
|
||||
elif grep:
|
||||
grep_cmd = _GREP
|
||||
#case insensitive for lowercase search
|
||||
if grep.islower():
|
||||
grep_cmd += " -i"
|
||||
cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep
|
||||
else:
|
||||
cmd_str = _DEFAULT_LESS
|
||||
run(cmd_str, parameter=file, direct_output=True, env=env)
|
||||
|
||||
class opencache():
|
||||
def __init__(self):
|
||||
# We have a cache of the rendering of file and, for each one,
|
||||
# a less_histfile containing the current position in the file
|
||||
self.temp_files = {}
|
||||
self.less_histfile = {}
|
||||
# This dictionary contains an url -> ansirenderer mapping. This allows
|
||||
# to reuse a renderer when visiting several times the same URL during
|
||||
# the same session
|
||||
# We save the time at which the renderer was created in renderer_time
|
||||
# This way, we can invalidate the renderer if a new version of the source
|
||||
# has been downloaded
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.mime_handlers = {}
|
||||
self.last_mode = {}
|
||||
self.last_width = term_width(absolute=True)
|
||||
|
||||
def _get_handler_cmd(self, mimetype):
|
||||
# Now look for a handler for this mimetype
|
||||
# Consider exact matches before wildcard matches
|
||||
exact_matches = []
|
||||
wildcard_matches = []
|
||||
for handled_mime, cmd_str in self.mime_handlers.items():
|
||||
if "*" in handled_mime:
|
||||
wildcard_matches.append((handled_mime, cmd_str))
|
||||
else:
|
||||
exact_matches.append((handled_mime, cmd_str))
|
||||
for handled_mime, cmd_str in exact_matches + wildcard_matches:
|
||||
if fnmatch.fnmatch(mimetype, handled_mime):
|
||||
break
|
||||
else:
|
||||
# Use "xdg-open" as a last resort.
|
||||
if _HAS_XDGOPEN:
|
||||
cmd_str = "xdg-open %s"
|
||||
else:
|
||||
cmd_str = "echo \"Can’t find how to open \"%s"
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return cmd_str
|
||||
|
||||
# Return the handler for a specific mimetype.
|
||||
# Return the whole dic if no specific mime provided
|
||||
def get_handlers(self,mime=None):
|
||||
if mime and mime in self.mime_handlers.keys():
|
||||
return self.mime_handlers[mime]
|
||||
elif mime:
|
||||
return None
|
||||
else:
|
||||
return self.mime_handlers
|
||||
|
||||
def set_handler(self,mime,handler):
|
||||
previous = None
|
||||
if mime in self.mime_handlers.keys():
|
||||
previous = self.mime_handlers[mime]
|
||||
self.mime_handlers[mime] = handler
|
||||
if "%s" not in handler:
|
||||
print("WARNING: this handler has no %%s, no filename will be provided to the command")
|
||||
if previous:
|
||||
print("Previous handler was %s"%previous)
|
||||
|
||||
def get_renderer(self,inpath,mode=None,theme=None):
|
||||
# We remove the ##offpunk_mode= from the URL
|
||||
# If mode is already set, we don’t use the part from the URL
|
||||
inpath,newmode = unmode_url(inpath)
|
||||
if not mode: mode = newmode
|
||||
# If we still doesn’t have a mode, we see if we used one before
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
elif not mode:
|
||||
#default mode is readable
|
||||
mode = "readable"
|
||||
renderer = None
|
||||
path = netcache.get_cache_path(inpath)
|
||||
if path:
|
||||
usecache = inpath in self.rendererdic.keys() and not is_local(inpath)
|
||||
#Screen size may have changed
|
||||
width = term_width(absolute=True)
|
||||
if usecache and self.last_width != width:
|
||||
self.cleanup()
|
||||
usecache = False
|
||||
self.last_width = width
|
||||
if usecache:
|
||||
if inpath in self.renderer_time.keys():
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = self.renderer_time[inpath]
|
||||
usecache = last_cached > last_downloaded
|
||||
else:
|
||||
usecache = False
|
||||
if not usecache:
|
||||
renderer = ansicat.renderer_from_file(path,inpath,theme=theme)
|
||||
if renderer:
|
||||
self.rendererdic[inpath] = renderer
|
||||
self.renderer_time[inpath] = int(time.time())
|
||||
else:
|
||||
renderer = self.rendererdic[inpath]
|
||||
return renderer
|
||||
|
||||
def get_temp_filename(self,url):
|
||||
if url in self.temp_files.keys():
|
||||
return self.temp_files[url]
|
||||
else:
|
||||
return None
|
||||
|
||||
def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs):
|
||||
#Return True if inpath opened in Terminal
|
||||
# False otherwise
|
||||
#if terminal = False, we don’t try to open in the terminal,
|
||||
#we immediately fallback to xdg-open.
|
||||
#netcache currently provide the path if it’s a file.
|
||||
#may this should be migrated here.
|
||||
if not utils.is_local(inpath):
|
||||
kwargs["images_mode"] = mode
|
||||
cachepath = netcache.fetch(inpath,**kwargs)
|
||||
if not cachepath:
|
||||
return False
|
||||
elif "://" in inpath:
|
||||
cachepath = netcache.fetch(inpath,**kwargs)
|
||||
elif os.path.exists(inpath):
|
||||
cachepath = inpath
|
||||
else:
|
||||
print("%s does not exist"%inpath)
|
||||
return
|
||||
renderer = self.get_renderer(inpath,mode=mode,theme=theme)
|
||||
if renderer and mode:
|
||||
renderer.set_mode(mode)
|
||||
self.last_mode[inpath] = mode
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
renderer.set_mode(mode)
|
||||
#we use the full moded url as key for the dictionary
|
||||
key = mode_url(inpath,mode)
|
||||
if terminal and renderer:
|
||||
#If this is an image and we have chafa/timg, we
|
||||
#don’t use less, we call it directly
|
||||
if renderer.has_direct_display():
|
||||
renderer.display(mode=mode,directdisplay=True)
|
||||
return True
|
||||
else:
|
||||
body = renderer.display(mode=mode)
|
||||
#Should we use the cache ? only if it is not local and there’s a cache
|
||||
usecache = key in self.temp_files and not is_local(inpath)
|
||||
if usecache:
|
||||
#and the cache is still valid!
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = os.path.getmtime(self.temp_files[key])
|
||||
if last_downloaded > last_cached:
|
||||
usecache = False
|
||||
self.temp_files.pop(key)
|
||||
self.less_histfile.pop(key)
|
||||
# We actually put the body in a tmpfile before giving it to less
|
||||
if not usecache:
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.temp_files[key] = tmpf.name
|
||||
tmpf.write(body)
|
||||
tmpf.close()
|
||||
if key not in self.less_histfile:
|
||||
firsttime = True
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.less_histfile[key] = tmpf.name
|
||||
else:
|
||||
#We don’t want to restore positions in lists
|
||||
firsttime = is_local(inpath)
|
||||
less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep)
|
||||
return True
|
||||
#maybe, we have no renderer. Or we want to skip it.
|
||||
else:
|
||||
mimetype = ansicat.get_mime(cachepath)
|
||||
if mimetype == "mailto":
|
||||
resp = input("Send an email to %s Y/N? " %inpath)
|
||||
if resp.strip().lower() in ("y", "yes"):
|
||||
if _HAS_XDGOPEN :
|
||||
run("xdg-open mailto:%s", parameter=inpath ,direct_output=True)
|
||||
else:
|
||||
print("Cannot find a mail client to send mail to %s" %inpath)
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return
|
||||
else:
|
||||
cmd_str = self._get_handler_cmd(mimetype)
|
||||
try:
|
||||
run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True)
|
||||
except FileNotFoundError:
|
||||
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
|
||||
print("You can use the ! command to specify another handler program or pipeline.")
|
||||
return False
|
||||
|
||||
#We remove the renderers from the cache and we also delete temp files
|
||||
def cleanup(self):
|
||||
while len(self.temp_files) > 0:
|
||||
os.remove(self.temp_files.popitem()[1])
|
||||
while len(self.less_histfile) > 0:
|
||||
os.remove(self.less_histfile.popitem()[1])
|
||||
self.last_width = None
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.last_mode = {}
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("content",metavar="INPUT", nargs="*",
|
||||
default=sys.stdin, help="Path to the file or URL to open")
|
||||
args = parser.parse_args()
|
||||
cache = opencache()
|
||||
for f in args.content:
|
||||
cache.opnk(f)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -13,7 +13,7 @@ import shutil
|
|||
import shlex
|
||||
import urllib.parse
|
||||
import urllib.parse
|
||||
import cache_migration
|
||||
from offpunk import cache_migration
|
||||
|
||||
CACHE_VERSION = 1
|
||||
|
285
opnk.py
285
opnk.py
|
@ -1,286 +1,3 @@
|
|||
#!/usr/bin/env python3
|
||||
#opnk stand for "Open like a PuNK".
|
||||
#It will open any file or URL and display it nicely in less.
|
||||
#If not possible, it will fallback to xdg-open
|
||||
#URL are retrieved through netcache
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import argparse
|
||||
import netcache
|
||||
import ansicat
|
||||
import offutils
|
||||
import shutil
|
||||
import time
|
||||
import fnmatch
|
||||
from offutils import run,term_width,mode_url,unmode_url,is_local
|
||||
|
||||
_HAS_XDGOPEN = shutil.which('xdg-open')
|
||||
_GREP = "grep --color=auto"
|
||||
|
||||
less_version = 0
|
||||
if not shutil.which("less"):
|
||||
print("Please install the pager \"less\" to run Offpunk.")
|
||||
print("If you wish to use another pager, send me an email !")
|
||||
print("(I’m really curious to hear about people not having \"less\" on their system.)")
|
||||
sys.exit()
|
||||
output = run("less --version")
|
||||
# We get less Version (which is the only integer on the first line)
|
||||
words = output.split("\n")[0].split()
|
||||
less_version = 0
|
||||
for w in words:
|
||||
if w.isdigit():
|
||||
less_version = int(w)
|
||||
# restoring position only works for version of less > 572
|
||||
if less_version >= 572:
|
||||
_LESS_RESTORE_POSITION = True
|
||||
else:
|
||||
_LESS_RESTORE_POSITION = False
|
||||
#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s"
|
||||
# -E : quit when reaching end of file (to behave like "cat")
|
||||
# -F : quit if content fits the screen (behave like "cat")
|
||||
# -X : does not clear the screen
|
||||
# -R : interpret ANSI colors correctly
|
||||
# -f : suppress warning for some contents
|
||||
# -M : long prompt (to have info about where you are in the file)
|
||||
# -W : hilite the new first line after a page skip (space)
|
||||
# -i : ignore case in search
|
||||
# -S : do not wrap long lines. Wrapping is done by offpunk, longlines
|
||||
# are there on purpose (surch in asciiart)
|
||||
#--incsearch : incremental search starting rev581
|
||||
if less_version >= 581:
|
||||
less_base = "less --incsearch --save-marks -~ -XRfMWiS"
|
||||
elif less_version >= 572:
|
||||
less_base = "less --save-marks -XRfMWiS"
|
||||
else:
|
||||
less_base = "less -XRfMWiS"
|
||||
_DEFAULT_LESS = less_base + " \"+''\" %s"
|
||||
_DEFAULT_CAT = less_base + " -EF %s"
|
||||
|
||||
def less_cmd(file, histfile=None,cat=False,grep=None):
|
||||
if histfile:
|
||||
env = {"LESSHISTFILE": histfile}
|
||||
else:
|
||||
env = {}
|
||||
if cat:
|
||||
cmd_str = _DEFAULT_CAT
|
||||
elif grep:
|
||||
grep_cmd = _GREP
|
||||
#case insensitive for lowercase search
|
||||
if grep.islower():
|
||||
grep_cmd += " -i"
|
||||
cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep
|
||||
else:
|
||||
cmd_str = _DEFAULT_LESS
|
||||
run(cmd_str, parameter=file, direct_output=True, env=env)
|
||||
|
||||
class opencache():
|
||||
def __init__(self):
|
||||
# We have a cache of the rendering of file and, for each one,
|
||||
# a less_histfile containing the current position in the file
|
||||
self.temp_files = {}
|
||||
self.less_histfile = {}
|
||||
# This dictionary contains an url -> ansirenderer mapping. This allows
|
||||
# to reuse a renderer when visiting several times the same URL during
|
||||
# the same session
|
||||
# We save the time at which the renderer was created in renderer_time
|
||||
# This way, we can invalidate the renderer if a new version of the source
|
||||
# has been downloaded
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.mime_handlers = {}
|
||||
self.last_mode = {}
|
||||
self.last_width = term_width(absolute=True)
|
||||
|
||||
def _get_handler_cmd(self, mimetype):
|
||||
# Now look for a handler for this mimetype
|
||||
# Consider exact matches before wildcard matches
|
||||
exact_matches = []
|
||||
wildcard_matches = []
|
||||
for handled_mime, cmd_str in self.mime_handlers.items():
|
||||
if "*" in handled_mime:
|
||||
wildcard_matches.append((handled_mime, cmd_str))
|
||||
else:
|
||||
exact_matches.append((handled_mime, cmd_str))
|
||||
for handled_mime, cmd_str in exact_matches + wildcard_matches:
|
||||
if fnmatch.fnmatch(mimetype, handled_mime):
|
||||
break
|
||||
else:
|
||||
# Use "xdg-open" as a last resort.
|
||||
if _HAS_XDGOPEN:
|
||||
cmd_str = "xdg-open %s"
|
||||
else:
|
||||
cmd_str = "echo \"Can’t find how to open \"%s"
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return cmd_str
|
||||
|
||||
# Return the handler for a specific mimetype.
|
||||
# Return the whole dic if no specific mime provided
|
||||
def get_handlers(self,mime=None):
|
||||
if mime and mime in self.mime_handlers.keys():
|
||||
return self.mime_handlers[mime]
|
||||
elif mime:
|
||||
return None
|
||||
else:
|
||||
return self.mime_handlers
|
||||
|
||||
def set_handler(self,mime,handler):
|
||||
previous = None
|
||||
if mime in self.mime_handlers.keys():
|
||||
previous = self.mime_handlers[mime]
|
||||
self.mime_handlers[mime] = handler
|
||||
if "%s" not in handler:
|
||||
print("WARNING: this handler has no %%s, no filename will be provided to the command")
|
||||
if previous:
|
||||
print("Previous handler was %s"%previous)
|
||||
|
||||
def get_renderer(self,inpath,mode=None,theme=None):
|
||||
# We remove the ##offpunk_mode= from the URL
|
||||
# If mode is already set, we don’t use the part from the URL
|
||||
inpath,newmode = unmode_url(inpath)
|
||||
if not mode: mode = newmode
|
||||
# If we still doesn’t have a mode, we see if we used one before
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
elif not mode:
|
||||
#default mode is readable
|
||||
mode = "readable"
|
||||
renderer = None
|
||||
path = netcache.get_cache_path(inpath)
|
||||
if path:
|
||||
usecache = inpath in self.rendererdic.keys() and not is_local(inpath)
|
||||
#Screen size may have changed
|
||||
width = term_width(absolute=True)
|
||||
if usecache and self.last_width != width:
|
||||
self.cleanup()
|
||||
usecache = False
|
||||
self.last_width = width
|
||||
if usecache:
|
||||
if inpath in self.renderer_time.keys():
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = self.renderer_time[inpath]
|
||||
usecache = last_cached > last_downloaded
|
||||
else:
|
||||
usecache = False
|
||||
if not usecache:
|
||||
renderer = ansicat.renderer_from_file(path,inpath,theme=theme)
|
||||
if renderer:
|
||||
self.rendererdic[inpath] = renderer
|
||||
self.renderer_time[inpath] = int(time.time())
|
||||
else:
|
||||
renderer = self.rendererdic[inpath]
|
||||
return renderer
|
||||
|
||||
def get_temp_filename(self,url):
|
||||
if url in self.temp_files.keys():
|
||||
return self.temp_files[url]
|
||||
else:
|
||||
return None
|
||||
|
||||
def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs):
|
||||
#Return True if inpath opened in Terminal
|
||||
# False otherwise
|
||||
#if terminal = False, we don’t try to open in the terminal,
|
||||
#we immediately fallback to xdg-open.
|
||||
#netcache currently provide the path if it’s a file.
|
||||
#may this should be migrated here.
|
||||
if not offutils.is_local(inpath):
|
||||
kwargs["images_mode"] = mode
|
||||
cachepath = netcache.fetch(inpath,**kwargs)
|
||||
if not cachepath:
|
||||
return False
|
||||
elif "://" in inpath:
|
||||
cachepath = netcache.fetch(inpath,**kwargs)
|
||||
elif inpath.startswith("mailto:"):
|
||||
cachepath = inpath
|
||||
elif os.path.exists(inpath):
|
||||
cachepath = inpath
|
||||
else:
|
||||
print("%s does not exist"%inpath)
|
||||
return
|
||||
renderer = self.get_renderer(inpath,mode=mode,theme=theme)
|
||||
if renderer and mode:
|
||||
renderer.set_mode(mode)
|
||||
self.last_mode[inpath] = mode
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
renderer.set_mode(mode)
|
||||
#we use the full moded url as key for the dictionary
|
||||
key = mode_url(inpath,mode)
|
||||
if terminal and renderer:
|
||||
#If this is an image and we have chafa/timg, we
|
||||
#don’t use less, we call it directly
|
||||
if renderer.has_direct_display():
|
||||
renderer.display(mode=mode,directdisplay=True)
|
||||
return True
|
||||
else:
|
||||
body = renderer.display(mode=mode)
|
||||
#Should we use the cache ? only if it is not local and there’s a cache
|
||||
usecache = key in self.temp_files and not is_local(inpath)
|
||||
if usecache:
|
||||
#and the cache is still valid!
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = os.path.getmtime(self.temp_files[key])
|
||||
if last_downloaded > last_cached:
|
||||
usecache = False
|
||||
self.temp_files.pop(key)
|
||||
self.less_histfile.pop(key)
|
||||
# We actually put the body in a tmpfile before giving it to less
|
||||
if not usecache:
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.temp_files[key] = tmpf.name
|
||||
tmpf.write(body)
|
||||
tmpf.close()
|
||||
if key not in self.less_histfile:
|
||||
firsttime = True
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.less_histfile[key] = tmpf.name
|
||||
else:
|
||||
#We don’t want to restore positions in lists
|
||||
firsttime = is_local(inpath)
|
||||
less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep)
|
||||
return True
|
||||
#maybe, we have no renderer. Or we want to skip it.
|
||||
else:
|
||||
mimetype = ansicat.get_mime(cachepath)
|
||||
if mimetype == "mailto":
|
||||
mail = inpath[7:]
|
||||
resp = input("Send an email to %s Y/N? " %mail)
|
||||
if resp.strip().lower() in ("y", "yes"):
|
||||
if _HAS_XDGOPEN :
|
||||
run("xdg-open mailto:%s", parameter=mail,direct_output=True)
|
||||
else:
|
||||
print("Cannot find a mail client to send mail to %s" %inpath)
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return
|
||||
else:
|
||||
cmd_str = self._get_handler_cmd(mimetype)
|
||||
try:
|
||||
run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True)
|
||||
except FileNotFoundError:
|
||||
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
|
||||
print("You can use the ! command to specify another handler program or pipeline.")
|
||||
return False
|
||||
|
||||
#We remove the renderers from the cache and we also delete temp files
|
||||
def cleanup(self):
|
||||
while len(self.temp_files) > 0:
|
||||
os.remove(self.temp_files.popitem()[1])
|
||||
while len(self.less_histfile) > 0:
|
||||
os.remove(self.less_histfile.popitem()[1])
|
||||
self.last_width = None
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.last_mode = {}
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("content",metavar="INPUT", nargs="*",
|
||||
default=sys.stdin, help="Path to the file or URL to open")
|
||||
args = parser.parse_args()
|
||||
cache = opencache()
|
||||
for f in args.content:
|
||||
cache.opnk(f)
|
||||
|
||||
if __name__ == "__main__":
|
||||
from offpunk.opnk import main
|
||||
main()
|
||||
|
|
|
@ -45,9 +45,9 @@ Source = "https://git.sr.ht/~lioploum/offpunk"
|
|||
|
||||
[project.scripts]
|
||||
offpunk = "offpunk:main"
|
||||
netcache = "netcache:main"
|
||||
ansicat = "ansicat:main"
|
||||
opnk = "opnk:main"
|
||||
netcache = "offpunk.netcache:main"
|
||||
ansicat = "offpunk.ansicat:main"
|
||||
opnk = "offpunk.opnk:main"
|
||||
|
||||
[tool.flit.sdist]
|
||||
include = ["doc/", "man/", "CHANGELOG"]
|
||||
|
|
Loading…
Reference in New Issue