Compare commits

...

5 Commits

Author SHA1 Message Date
Lionel Dricot 8b97acc5e0 relative import back to offpunk 2023-09-16 10:53:15 +02:00
Lionel Dricot 963ac3d7a3 merging refactor 2023-09-16 10:32:26 +02:00
Austreelis 6e215c0512 Use relative imports when importing offpunk's own modules
This should allow running offpunk locally from any
directory.
2023-09-09 22:43:45 +02:00
Austreelis 370e7e4dc5 Add scripts to run tools locally
This adds a script for each executable, and a __main__.py for offpunk
itself.  This intends to "fix" (albeit unperfectly, see below) how
46f61bf forbade locally running executables without first installing
offpunk in your site packages (or hacking on PYTHONPATH).

Of those, most are "glue script", files in the top-level directory
and outside offpunk's module, which are purely there to allow running
the `netcache`, `ansicat` and `opnk` without any other setup than a
working python and a `git clone`.  Notably, they will not be included in
the source distribution (sdist) archive built by flit.

Sadly the `offpunk` executable doesn't get the same treatment, because
having both an `offpunk.py` script and an `offpunk` python package
is ambiguous, and flit disallows that, avoiding to package anything.
So instead it now has a pretty bare __main__.py, which really works the
same way as the "glue scripts", except this one does get packaged.
This means everything but `offpunk` may be run like `./netcache.py`,
making this setup a bit inconsistent (which may or may not be an issue).
Running `python -m <tool>` works for everything though, so there *is* a
consistent way to run them all, and in the darkness, bind them.
2023-09-08 17:21:11 +02:00
Austreelis bc43e3150b Put everything in a common offpunk package
This moves around and renames a bunch of code, as an effort toward
fixing the flit build process.  Simply put, all source code has been
moved in an `offpunk` directory, and renamed in ways that (hopefully)
make sense.  The `pyproject.toml` file has been updated, too.

This changes how to import code from offpunk modules: old top-level
modules are now under `offpunk` (`import netcache` ->
`from offpunk import netcache`), and "off" prefixes have been stripped
(`from offutils import run` -> `from offpunk.utils import run`).

Note file mode x (execute) was removed on all touched files (if it was
set before), meaning they cannot be executed exactly like before this
change.  Running scripts directly (`./offpunk/netcache.py`) is broken
because python doesn't populate parent modules of top-level modules.
Running `python -m offpunk.netcache` is broken because running inner
python modules as top-level this way *does* populate parent modules
(assuming my understanding is correct, but it may very well not be), but
it can somehow trigger undefined behavior (!?).  Running
`python -m offpunk` still works though.

Signed-off-by: Austreelis <dev@austreelis.net>
2023-09-08 17:21:07 +02:00
13 changed files with 4410 additions and 4446 deletions

1351
ansicat.py

File diff suppressed because it is too large Load Diff

View File

@ -1,891 +1,3 @@
#!/usr/bin/env python3
import os
import sys
import urllib.parse
import argparse
import requests
import codecs
import getpass
import socket
import ssl
import glob
import datetime
import hashlib
import sqlite3
from ssl import CertificateError
import ansicat
import offutils
from offutils import _CACHE_PATH,_DATA_DIR,_CONFIG_DIR
import time
try:
import chardet
_HAS_CHARDET = True
except ModuleNotFoundError:
_HAS_CHARDET = False
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
_HAS_CRYPTOGRAPHY = False
if not os.path.exists(_CACHE_PATH):
print("Creating cache directory {}".format(_CACHE_PATH))
os.makedirs(_CACHE_PATH)
# This list is also used as a list of supported protocols
standard_ports = {
"gemini" : 1965,
"gopher" : 70,
"finger" : 79,
"http" : 80,
"https" : 443,
"spartan": 300,
}
default_protocol = "gemini"
CRLF = '\r\n'
DEFAULT_TIMEOUT = 10
_MAX_REDIRECTS = 5
# monkey-patch Gemini support in urllib.parse
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
urllib.parse.uses_relative.append("gemini")
urllib.parse.uses_netloc.append("gemini")
urllib.parse.uses_relative.append("spartan")
urllib.parse.uses_netloc.append("spartan")
class UserAbortException(Exception):
pass
def parse_mime(mime):
options = {}
if mime:
if ";" in mime:
splited = mime.split(";",maxsplit=1)
mime = splited[0]
if len(splited) >= 1:
options_list = splited[1].split()
for o in options_list:
spl = o.split("=",maxsplit=1)
if len(spl) > 0:
options[spl[0]] = spl[1]
return mime, options
def normalize_url(url):
if "://" not in url and ("./" not in url and url[0] != "/"):
if not url.startswith("mailto:"):
url = "gemini://" + url
return url
def cache_last_modified(url):
if not url:
return None
path = get_cache_path(url)
if path:
return os.path.getmtime(path)
else:
print("ERROR :NOCACHE in cache_last_modified")
return None
def is_cache_valid(url,validity=0):
# Validity is the acceptable time for
# a cache to be valid (in seconds)
# If 0, then any cache is considered as valid
# (use validity = 1 if you want to refresh everything)
if offutils.is_local(url):
return True
cache = get_cache_path(url)
if cache :
# If path is too long, we always return True to avoid
# fetching it.
if len(cache) > 259:
print("We return False because path is too long")
return False
if os.path.exists(cache) and not os.path.isdir(cache):
if validity > 0 :
last_modification = cache_last_modified(url)
now = time.time()
age = now - last_modification
return age < validity
else:
return True
else:
#Cache has not been build
return False
else:
#Theres not even a cache!
return False
def get_cache_path(url):
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
# In that case, we need to reconstruct it
#First, we parse the URL
if not url:
return None
parsed = urllib.parse.urlparse(url)
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
scheme = "file"
elif parsed.scheme:
scheme = parsed.scheme
else:
scheme = default_protocol
if scheme in ["file","mailto","list"]:
local = True
host = ""
port = None
# file:// is 7 char
if url.startswith("file://"):
path = url[7:]
elif scheme == "mailto":
path = parsed.path
elif url.startswith("list://"):
listdir = os.path.join(_DATA_DIR,"lists")
listname = url[7:].lstrip("/")
if listname in [""]:
name = "My Lists"
path = listdir
else:
name = listname
path = os.path.join(listdir, "%s.gmi"%listname)
else:
path = url
else:
local = False
# Convert unicode hostname to punycode using idna RFC3490
host = parsed.hostname #.encode("idna").decode()
port = parsed.port or standard_ports.get(scheme, 0)
# special gopher selector case
if scheme == "gopher":
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
path = parsed.path[2:]
else:
itemtype = "1"
path = ""
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9","g","I","s"):
mime = "binary"
else:
mime = "text/gopher"
else:
path = parsed.path
if parsed.query:
# we dont add the query if path is too long because path above 260 char
# are not supported and crash python.
# Also, very long query are usually useless stuff
if len(path+parsed.query) < 258:
path += "/" + parsed.query
# Now, we have a partial path. Lets make it full path.
if local:
cache_path = path
elif scheme and host:
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
#Theres an OSlimitation of 260 characters per path.
#We will thus cut the path enough to add the index afterward
cache_path = cache_path[:249]
# FIXME : this is a gross hack to give a name to
# index files. This will break if the index is not
# index.gmi. I dont know how to know the real name
# of the file. But first, we need to ensure that the domain name
# finish by "/". Else, the cache will create a file, not a folder.
if scheme.startswith("http"):
index = "index.html"
elif scheme == "finger":
index = "index.txt"
elif scheme == "gopher":
index = "gophermap"
else:
index = "index.gmi"
if path == "" or os.path.isdir(cache_path):
if not cache_path.endswith("/"):
cache_path += "/"
if not url.endswith("/"):
url += "/"
if cache_path.endswith("/"):
cache_path += index
#sometimes, the index itself is a dir
#like when folder/index.gmi?param has been created
#and we try to access folder
if os.path.isdir(cache_path):
cache_path += "/" + index
else:
#URL is missing either a supported scheme or a valid host
#print("Error: %s is not a supported url"%url)
return None
if len(cache_path) > 259:
print("Path is too long. This is an OS limitation.\n\n")
print(url)
return None
return cache_path
def write_body(url,body,mime=None):
## body is a copy of the raw gemtext
## Write_body() also create the cache !
#DEFAULT GEMINIMIME
mime, options = parse_mime(mime)
cache_path = get_cache_path(url)
if cache_path:
if mime and mime.startswith("text/"):
mode = "w"
else:
mode = "wb"
cache_dir = os.path.dirname(cache_path)
# If the subdirectory already exists as a file (not a folder)
# We remove it (happens when accessing URL/subfolder before
# URL/subfolder/file.gmi.
# This causes loss of data in the cache
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
# If the subdirectory doesnt exist, we recursively try to find one
# until it exists to avoid a file blocking the creation of folders
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir,exist_ok=True)
with open(cache_path, mode=mode) as f:
f.write(body)
f.close()
return cache_path
def set_error(url,err):
# If we get an error, we want to keep an existing cache
# but we need to touch it or to create an empty one
# to avoid hitting the error at each refresh
cache = get_cache_path(url)
if is_cache_valid(url):
os.utime(cache)
else:
cache_dir = os.path.dirname(cache)
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir,exist_ok=True)
if os.path.isdir(cache_dir):
with open(cache, "w") as c:
c.write(str(datetime.datetime.now())+"\n")
c.write("ERROR while caching %s\n\n" %url)
c.write("*****\n\n")
c.write(str(type(err)) + " = " + str(err))
#cache.write("\n" + str(err.with_traceback(None)))
c.write("\n*****\n\n")
c.write("If you believe this error was temporary, type ""reload"".\n")
c.write("The ressource will be tentatively fetched during next sync.\n")
c.close()
return cache
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs):
def too_large_error(url,length,max_size):
err = "Size of %s is %s Mo\n"%(url,length)
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
err += "To retrieve this content anyway, type 'reload'."
return set_error(url,err)
if accept_bad_ssl_certificates:
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
requests.packages.urllib3.disable_warnings()
verify=False
else:
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2'
verify=True
header = {}
header["User-Agent"] = "Netcache"
with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
if "content-type" in response.headers:
mime = response.headers['content-type']
else:
mime = None
if "content-length" in response.headers:
length = int(response.headers['content-length'])
else:
length = 0
if max_size and length > max_size:
response.close()
return too_large_error(url,str(length/100),max_size)
elif max_size and length == 0:
body = b''
downloaded = 0
for r in response.iter_content():
body += r
#We divide max_size for streamed content
#in order to catch them faster
size = sys.getsizeof(body)
max = max_size/2
current = round(size*100/max,1)
if current > downloaded:
downloaded = current
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
if size > max_size/2:
response.close()
return too_large_error(url,"streaming",max_size)
response.close()
else:
body = response.content
response.close()
if mime and "text/" in mime:
body = body.decode("UTF-8","replace")
cache = write_body(url,body,mime)
return cache
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
parsed =urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or 70
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
selector = parsed.path[2:]
else:
itemtype = "1"
selector = ""
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
s = socket.create_connection((host,port))
for address in addresses:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
try:
s.connect(address[4])
break
except OSError as e:
err = e
if parsed.query:
request = selector + "\t" + parsed.query
else:
request = selector
request += "\r\n"
s.sendall(request.encode("UTF-8"))
response = s.makefile("rb").read()
# Transcode response into UTF-8
#if itemtype in ("0","1","h"):
if not itemtype in ("9","g","I","s"):
# Try most common encodings
for encoding in ("UTF-8", "ISO-8859-1"):
try:
response = response.decode("UTF-8")
break
except UnicodeDecodeError:
pass
else:
# try to find encoding
if _HAS_CHARDET:
detected = chardet.detect(response)
response = response.decode(detected["encoding"])
else:
raise UnicodeDecodeError
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9","g","I","s"):
mime = None
else:
# by default, we should consider Gopher
mime = "text/gopher"
cache = write_body(url,response,mime)
return cache
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or standard_ports["finger"]
query = parsed.path.lstrip("/") + "\r\n"
with socket.create_connection((host,port)) as sock:
sock.settimeout(timeout)
sock.send(query.encode())
response = sock.makefile("rb").read().decode("UTF-8")
cache = write_body(response,"text/plain")
return cache
# Originally copied from reference spartan client by Michael Lazar
def _fetch_spartan(url,**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["spartan"]
path = url_parts.path or "/"
query = url_parts.query
redirect_url = None
with socket.create_connection((host,port)) as sock:
if query:
data = urllib.parse.unquote_to_bytes(query)
else:
data = b""
encoded_host = host.encode("idna")
ascii_path = urllib.parse.unquote_to_bytes(path)
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
fp = sock.makefile("rb")
response = fp.readline(4096).decode("ascii").strip("\r\n")
parts = response.split(" ",maxsplit=1)
code,meta = int(parts[0]),parts[1]
if code == 2:
body = fp.read()
if meta.startswith("text"):
body = body.decode("UTF-8")
cache = write_body(url,body,meta)
elif code == 3:
redirect_url = url_parts._replace(path=meta).geturl()
else:
return set_error(url,"Spartan code %s: Error %s"%(code,meta))
if redirect_url:
cache = _fetch_spartan(redirect_url)
return cache
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if accept_bad_ssl:
if c.not_valid_before >= now:
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(str(name), host)
break
except CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
db_conn = sqlite3.connect(db_path)
db_cur = db_conn.cursor()
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
# Have we been here before?
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
db_conn.commit()
break
else:
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
if automatic_choice:
choice = automatic_choice
else:
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
db_conn.commit()
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
# In AV-98, this was the _send_request method
#Send a selector to a given host and port.
#Returns the resolved address and binary file with the reply."""
host = host.encode("idna").decode()
# Do DNS resolution
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
# IPv6 addresses or getaddrinfo will complain
family_mask = socket.AF_INET6
elif socket.has_ipv6:
# Accept either IPv4 or IPv6 addresses
family_mask = 0
else:
# IPv4 only
family_mask = socket.AF_INET
addresses = socket.getaddrinfo(host, port, family=family_mask,
type=socket.SOCK_STREAM)
# Sort addresses so IPv6 ones come first
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
## Continuation of send_request
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
context.check_hostname=False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
# Connect to remote host by any address possible
err = None
for address in addresses:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
# Do TOFU
cert = s.getpeercert(binary_form=True)
# Remember that we showed the current cert to this domain...
#TODO : accept badssl and automatic choice
_validate_cert(address[4][0], host, cert,automatic_choice="y")
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
new_netloc = host
if port != standard_ports["gemini"]:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
s.sendall((url + CRLF).encode("UTF-8"))
f= s.makefile(mode = "rb")
## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
header = urllib.parse.unquote(header.decode("UTF-8"))
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
previous_redirectors = set()
#TODO FIXME
else:
#we set a previous_redirectors anyway because refactoring in progress
previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
if interactive:
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
#TODO:FIXME we should not ask for user input while non-interactive
user_input = input("> ")
return _fetch_gemini(query(user_input))
else:
return None
# Redirects
elif status.startswith("3"):
newurl = urllib.parse.urljoin(url,meta)
if newurl == url:
raise RuntimeError("URL redirects to itself!")
elif newurl in previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
# TODO: redirections handling should be refactored
# elif "interactive" in options and not options["interactive"]:
# follow = self.automatic_choice
# # Never follow cross-domain redirects without asking
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# # Never follow cross-protocol redirects without asking
# elif new_gi.scheme != gi.scheme:
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# # Don't follow *any* redirect without asking if auto-follow is off
# elif not self.options["auto_follow_redirects"]:
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# # Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
previous_redirectors.add(url)
# if status == "31":
# # Permanent redirect
# self.permanent_redirects[gi.url] = new_gi.url
return _fetch_gemini(newurl)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
print("Handling certificates for status 6X are not supported by offpunk\n")
print("Please open a bug report")
_fetch_gemini(url)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
assert status.startswith("2")
mime = meta
# Read the response body over the network
fbody = f.read()
#DEFAULT GEMINIMIME
if mime == "":
mime = "text/gemini; charset=utf-8"
shortmime, mime_options = parse_mime(mime)
if "charset" in mime_options:
try:
codecs.lookup(mime_options["charset"])
except LookupError:
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
#If the encoding is wrong, theres a high probably its UTF-8 with a bad header
mime_options["charset"] = "UTF-8"
if shortmime.startswith("text/"):
#Get the charset and default to UTF-8 in none
encoding = mime_options.get("charset", "UTF-8")
try:
body = fbody.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s\
encoding declared in header!" % encoding)
else:
body = fbody
cache = write_body(url,body,mime)
return cache
def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs):
url = normalize_url(url)
path=None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
if is_cache_valid(url,validity=validity):
path = get_cache_path(url)
#If we are offline, any cache is better than nothing
elif offline and is_cache_valid(url,validity=0):
path = get_cache_path(url)
elif "://" in url and not offline:
try:
scheme = url.split("://")[0]
if scheme not in standard_ports:
if print_error:
print("%s is not a supported protocol"%scheme)
path = None
elif scheme in ("http","https"):
path=_fetch_http(url,**kwargs)
elif scheme == "gopher":
path=_fetch_gopher(url,**kwargs)
elif scheme == "finger":
path=_fetch_finger(url,**kwargs)
elif scheme == "gemini":
path=_fetch_gemini(url,**kwargs)
else:
print("scheme %s not implemented yet")
except UserAbortException:
return
except Exception as err:
cache = set_error(url, err)
# Print an error message
# we fail silently when sync_only
if isinstance(err, socket.gaierror):
if print_error:
print("ERROR: DNS error!")
elif isinstance(err, ConnectionRefusedError):
if print_error:
print("ERROR1: Connection refused!")
elif isinstance(err, ConnectionResetError):
if print_error:
print("ERROR2: Connection reset!")
elif isinstance(err, (TimeoutError, socket.timeout)):
if print_error:
print("""ERROR3: Connection timed out!
Slow internet connection? Use 'set timeout' to be more patient.""")
elif isinstance(err, FileExistsError):
if print_error:
print("""ERROR5: Trying to create a directory which already exists
in the cache : """)
print(err)
elif isinstance(err,requests.exceptions.SSLError):
if print_error:
print("""ERROR6: Bad SSL certificate:\n""")
print(err)
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
print("""set accept_bad_ssl_certificates True""")
elif isinstance(err,requests.exceptions.ConnectionError):
if print_error:
print("""ERROR7: Cannot connect to URL:\n""")
print(str(err))
else:
if print_error:
import traceback
print("ERROR4: " + str(type(err)) + " : " + str(err))
#print("\n" + str(err.with_traceback(None)))
print(traceback.format_exc())
return cache
# We download images contained in the document (from full mode)
if not offline and download_image_first and images_mode:
renderer = ansicat.renderer_from_file(path,url)
if renderer:
for image in renderer.get_images(mode=images_mode):
#Image should exist, should be an url (not a data image)
#and should not be already cached
if image and not image.startswith("data:image/") and not is_cache_valid(image):
width = offutils.term_width() - 1
toprint = "Downloading %s" %image
toprint = toprint[:width]
toprint += " "*(width-len(toprint))
print(toprint,end="\r")
#d_i_f and images_mode are False/None to avoid recursive downloading
#if that ever happen
fetch(image,offline=offline,download_image_first=False,\
images_mode=None,validity=0,**kwargs)
return path
def main():
# Parse arguments
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--path", action="store_true",
help="return path to the cache instead of the content of the cache")
parser.add_argument("--offline", action="store_true",
help="Do not attempt to download, return cached version or error")
parser.add_argument("--max-size", type=int,
help="Cancel download of items above that size (value in Mb).")
parser.add_argument("--timeout", type=int,
help="Time to wait before cancelling connection (in second).")
# No argument: write help
parser.add_argument('url', metavar='URL', nargs='*',
help='download URL and returns the content or the path to a cached version')
# arg = URL: download and returns cached URI
# --cache-validity : do not download if cache is valid
# --validity : returns the date of the cached version, Null if no version
# --force-download : download and replace cache, even if valid
args = parser.parse_args()
param = {}
for u in args.url:
if args.offline:
path = get_cache_path(u)
else:
print("Download URL: %s" %u)
path = fetch(u,max_size=args.max_size,timeout=args.timeout)
if args.path:
print(path)
else:
with open(path,"r") as f:
print(f.read())
f.close()
if __name__== '__main__':
main()
from offpunk.netcache import main
main()

1920
offpunk.py

File diff suppressed because it is too large Load Diff

1916
offpunk/__init__.py Normal file

File diff suppressed because it is too large Load Diff

3
offpunk/__main__.py Normal file
View File

@ -0,0 +1,3 @@
from offpunk import main
main()

1308
offpunk/ansicat.py Normal file

File diff suppressed because it is too large Load Diff

890
offpunk/netcache.py Normal file
View File

@ -0,0 +1,890 @@
#!/usr/bin/env python3
import os
import sys
import urllib.parse
import argparse
import requests
import codecs
import getpass
import socket
import ssl
import glob
import datetime
import hashlib
import sqlite3
from ssl import CertificateError
from offpunk import ansicat, utils
from offpunk.utils import _CACHE_PATH,_DATA_DIR,_CONFIG_DIR
import time
try:
import chardet
_HAS_CHARDET = True
except ModuleNotFoundError:
_HAS_CHARDET = False
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
_HAS_CRYPTOGRAPHY = False
if not os.path.exists(_CACHE_PATH):
print("Creating cache directory {}".format(_CACHE_PATH))
os.makedirs(_CACHE_PATH)
# This list is also used as a list of supported protocols
standard_ports = {
"gemini" : 1965,
"gopher" : 70,
"finger" : 79,
"http" : 80,
"https" : 443,
"spartan": 300,
}
default_protocol = "gemini"
CRLF = '\r\n'
DEFAULT_TIMEOUT = 10
_MAX_REDIRECTS = 5
# monkey-patch Gemini support in urllib.parse
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
urllib.parse.uses_relative.append("gemini")
urllib.parse.uses_netloc.append("gemini")
urllib.parse.uses_relative.append("spartan")
urllib.parse.uses_netloc.append("spartan")
class UserAbortException(Exception):
pass
def parse_mime(mime):
options = {}
if mime:
if ";" in mime:
splited = mime.split(";",maxsplit=1)
mime = splited[0]
if len(splited) >= 1:
options_list = splited[1].split()
for o in options_list:
spl = o.split("=",maxsplit=1)
if len(spl) > 0:
options[spl[0]] = spl[1]
return mime, options
def normalize_url(url):
if "://" not in url and ("./" not in url and url[0] != "/"):
if not url.startswith("mailto:"):
url = "gemini://" + url
return url
def cache_last_modified(url):
if not url:
return None
path = get_cache_path(url)
if path:
return os.path.getmtime(path)
else:
print("ERROR :NOCACHE in cache_last_modified")
return None
def is_cache_valid(url,validity=0):
# Validity is the acceptable time for
# a cache to be valid (in seconds)
# If 0, then any cache is considered as valid
# (use validity = 1 if you want to refresh everything)
if utils.is_local(url):
return True
cache = get_cache_path(url)
if cache :
# If path is too long, we always return True to avoid
# fetching it.
if len(cache) > 259:
print("We return False because path is too long")
return False
if os.path.exists(cache) and not os.path.isdir(cache):
if validity > 0 :
last_modification = cache_last_modified(url)
now = time.time()
age = now - last_modification
return age < validity
else:
return True
else:
#Cache has not been build
return False
else:
#Theres not even a cache!
return False
def get_cache_path(url):
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
# In that case, we need to reconstruct it
#First, we parse the URL
if not url:
return None
parsed = urllib.parse.urlparse(url)
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
scheme = "file"
elif parsed.scheme:
scheme = parsed.scheme
else:
scheme = default_protocol
if scheme in ["file","mailto","list"]:
local = True
host = ""
port = None
# file:// is 7 char
if url.startswith("file://"):
path = url[7:]
elif scheme == "mailto":
path = parsed.path
elif url.startswith("list://"):
listdir = os.path.join(_DATA_DIR,"lists")
listname = url[7:].lstrip("/")
if listname in [""]:
name = "My Lists"
path = listdir
else:
name = listname
path = os.path.join(listdir, "%s.gmi"%listname)
else:
path = url
else:
local = False
# Convert unicode hostname to punycode using idna RFC3490
host = parsed.hostname #.encode("idna").decode()
port = parsed.port or standard_ports.get(scheme, 0)
# special gopher selector case
if scheme == "gopher":
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
path = parsed.path[2:]
else:
itemtype = "1"
path = ""
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9","g","I","s"):
mime = "binary"
else:
mime = "text/gopher"
else:
path = parsed.path
if parsed.query:
# we dont add the query if path is too long because path above 260 char
# are not supported and crash python.
# Also, very long query are usually useless stuff
if len(path+parsed.query) < 258:
path += "/" + parsed.query
# Now, we have a partial path. Lets make it full path.
if local:
cache_path = path
elif scheme and host:
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
#Theres an OSlimitation of 260 characters per path.
#We will thus cut the path enough to add the index afterward
cache_path = cache_path[:249]
# FIXME : this is a gross hack to give a name to
# index files. This will break if the index is not
# index.gmi. I dont know how to know the real name
# of the file. But first, we need to ensure that the domain name
# finish by "/". Else, the cache will create a file, not a folder.
if scheme.startswith("http"):
index = "index.html"
elif scheme == "finger":
index = "index.txt"
elif scheme == "gopher":
index = "gophermap"
else:
index = "index.gmi"
if path == "" or os.path.isdir(cache_path):
if not cache_path.endswith("/"):
cache_path += "/"
if not url.endswith("/"):
url += "/"
if cache_path.endswith("/"):
cache_path += index
#sometimes, the index itself is a dir
#like when folder/index.gmi?param has been created
#and we try to access folder
if os.path.isdir(cache_path):
cache_path += "/" + index
else:
#URL is missing either a supported scheme or a valid host
#print("Error: %s is not a supported url"%url)
return None
if len(cache_path) > 259:
print("Path is too long. This is an OS limitation.\n\n")
print(url)
return None
return cache_path
def write_body(url,body,mime=None):
## body is a copy of the raw gemtext
## Write_body() also create the cache !
#DEFAULT GEMINIMIME
mime, options = parse_mime(mime)
cache_path = get_cache_path(url)
if cache_path:
if mime and mime.startswith("text/"):
mode = "w"
else:
mode = "wb"
cache_dir = os.path.dirname(cache_path)
# If the subdirectory already exists as a file (not a folder)
# We remove it (happens when accessing URL/subfolder before
# URL/subfolder/file.gmi.
# This causes loss of data in the cache
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
# If the subdirectory doesnt exist, we recursively try to find one
# until it exists to avoid a file blocking the creation of folders
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir,exist_ok=True)
with open(cache_path, mode=mode) as f:
f.write(body)
f.close()
return cache_path
def set_error(url,err):
# If we get an error, we want to keep an existing cache
# but we need to touch it or to create an empty one
# to avoid hitting the error at each refresh
cache = get_cache_path(url)
if is_cache_valid(url):
os.utime(cache)
else:
cache_dir = os.path.dirname(cache)
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir,exist_ok=True)
if os.path.isdir(cache_dir):
with open(cache, "w") as c:
c.write(str(datetime.datetime.now())+"\n")
c.write("ERROR while caching %s\n\n" %url)
c.write("*****\n\n")
c.write(str(type(err)) + " = " + str(err))
#cache.write("\n" + str(err.with_traceback(None)))
c.write("\n*****\n\n")
c.write("If you believe this error was temporary, type ""reload"".\n")
c.write("The ressource will be tentatively fetched during next sync.\n")
c.close()
return cache
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs):
def too_large_error(url,length,max_size):
err = "Size of %s is %s Mo\n"%(url,length)
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
err += "To retrieve this content anyway, type 'reload'."
return set_error(url,err)
if accept_bad_ssl_certificates:
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
requests.packages.urllib3.disable_warnings()
verify=False
else:
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2'
verify=True
header = {}
header["User-Agent"] = "Netcache"
with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
if "content-type" in response.headers:
mime = response.headers['content-type']
else:
mime = None
if "content-length" in response.headers:
length = int(response.headers['content-length'])
else:
length = 0
if max_size and length > max_size:
response.close()
return too_large_error(url,str(length/100),max_size)
elif max_size and length == 0:
body = b''
downloaded = 0
for r in response.iter_content():
body += r
#We divide max_size for streamed content
#in order to catch them faster
size = sys.getsizeof(body)
max = max_size/2
current = round(size*100/max,1)
if current > downloaded:
downloaded = current
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
if size > max_size/2:
response.close()
return too_large_error(url,"streaming",max_size)
response.close()
else:
body = response.content
response.close()
if mime and "text/" in mime:
body = body.decode("UTF-8","replace")
cache = write_body(url,body,mime)
return cache
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
parsed =urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or 70
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
selector = parsed.path[2:]
else:
itemtype = "1"
selector = ""
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
s = socket.create_connection((host,port))
for address in addresses:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
try:
s.connect(address[4])
break
except OSError as e:
err = e
if parsed.query:
request = selector + "\t" + parsed.query
else:
request = selector
request += "\r\n"
s.sendall(request.encode("UTF-8"))
response = s.makefile("rb").read()
# Transcode response into UTF-8
#if itemtype in ("0","1","h"):
if not itemtype in ("9","g","I","s"):
# Try most common encodings
for encoding in ("UTF-8", "ISO-8859-1"):
try:
response = response.decode("UTF-8")
break
except UnicodeDecodeError:
pass
else:
# try to find encoding
if _HAS_CHARDET:
detected = chardet.detect(response)
response = response.decode(detected["encoding"])
else:
raise UnicodeDecodeError
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9","g","I","s"):
mime = None
else:
# by default, we should consider Gopher
mime = "text/gopher"
cache = write_body(url,response,mime)
return cache
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or standard_ports["finger"]
query = parsed.path.lstrip("/") + "\r\n"
with socket.create_connection((host,port)) as sock:
sock.settimeout(timeout)
sock.send(query.encode())
response = sock.makefile("rb").read().decode("UTF-8")
cache = write_body(response,"text/plain")
return cache
# Originally copied from reference spartan client by Michael Lazar
def _fetch_spartan(url,**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["spartan"]
path = url_parts.path or "/"
query = url_parts.query
redirect_url = None
with socket.create_connection((host,port)) as sock:
if query:
data = urllib.parse.unquote_to_bytes(query)
else:
data = b""
encoded_host = host.encode("idna")
ascii_path = urllib.parse.unquote_to_bytes(path)
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
fp = sock.makefile("rb")
response = fp.readline(4096).decode("ascii").strip("\r\n")
parts = response.split(" ",maxsplit=1)
code,meta = int(parts[0]),parts[1]
if code == 2:
body = fp.read()
if meta.startswith("text"):
body = body.decode("UTF-8")
cache = write_body(url,body,meta)
elif code == 3:
redirect_url = url_parts._replace(path=meta).geturl()
else:
return set_error(url,"Spartan code %s: Error %s"%(code,meta))
if redirect_url:
cache = _fetch_spartan(redirect_url)
return cache
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.utcnow()
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if accept_bad_ssl:
if c.not_valid_before >= now:
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
elif c.not_valid_after <= now:
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(str(name), host)
break
except CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
db_conn = sqlite3.connect(db_path)
db_cur = db_conn.cursor()
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
(hostname text, address text, fingerprint text,
first_seen date, last_seen date, count integer)""")
# Have we been here before?
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
FROM cert_cache
WHERE hostname=? AND address=?""", (host, address))
cached_certs = db_cur.fetchall()
# If so, check for a match
if cached_certs:
max_count = 0
most_frequent_cert = None
for cached_fingerprint, first, last, count in cached_certs:
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
db_cur.execute("""UPDATE cert_cache
SET last_seen=?, count=?
WHERE hostname=? AND address=? AND fingerprint=?""",
(now, count+1, host, address, fingerprint))
db_conn.commit()
break
else:
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
previous_cert = fp.read()
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after - now
print(previous_ttl)
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
print("This MIGHT be a Man-in-the-Middle attack.")
print("A different certificate has previously been seen {} times.".format(max_count))
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
if automatic_choice:
choice = automatic_choice
else:
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
db_conn.commit()
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# If not, cache this cert
else:
db_cur.execute("""INSERT INTO cert_cache
VALUES (?, ?, ?, ?, ?, ?)""",
(host, address, fingerprint, now, now, 1))
db_conn.commit()
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
if not os.path.exists(certdir):
os.makedirs(certdir)
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
# In AV-98, this was the _send_request method
#Send a selector to a given host and port.
#Returns the resolved address and binary file with the reply."""
host = host.encode("idna").decode()
# Do DNS resolution
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
# IPv6 addresses or getaddrinfo will complain
family_mask = socket.AF_INET6
elif socket.has_ipv6:
# Accept either IPv4 or IPv6 addresses
family_mask = 0
else:
# IPv4 only
family_mask = socket.AF_INET
addresses = socket.getaddrinfo(host, port, family=family_mask,
type=socket.SOCK_STREAM)
# Sort addresses so IPv6 ones come first
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
## Continuation of send_request
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
context.check_hostname=False
context.verify_mode = ssl.CERT_NONE
# Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
## Otherwise, it seems very hard...
## The below is less strict than it ought to be, but trying to disable
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
## with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
# Connect to remote host by any address possible
err = None
for address in addresses:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname = host)
try:
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
# Do TOFU
cert = s.getpeercert(binary_form=True)
# Remember that we showed the current cert to this domain...
#TODO : accept badssl and automatic choice
_validate_cert(address[4][0], host, cert,automatic_choice="y")
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
new_netloc = host
if port != standard_ports["gemini"]:
new_netloc += ":" + str(port)
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
s.sendall((url + CRLF).encode("UTF-8"))
f= s.makefile(mode = "rb")
## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
header = urllib.parse.unquote(header.decode("UTF-8"))
if not header or header[-1] != '\n':
raise RuntimeError("Received invalid header from server!")
header = header.strip()
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
previous_redirectors = set()
#TODO FIXME
else:
#we set a previous_redirectors anyway because refactoring in progress
previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
if interactive:
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
#TODO:FIXME we should not ask for user input while non-interactive
user_input = input("> ")
return _fetch_gemini(query(user_input))
else:
return None
# Redirects
elif status.startswith("3"):
newurl = urllib.parse.urljoin(url,meta)
if newurl == url:
raise RuntimeError("URL redirects to itself!")
elif newurl in previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
# TODO: redirections handling should be refactored
# elif "interactive" in options and not options["interactive"]:
# follow = self.automatic_choice
# # Never follow cross-domain redirects without asking
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# # Never follow cross-protocol redirects without asking
# elif new_gi.scheme != gi.scheme:
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# # Don't follow *any* redirect without asking if auto-follow is off
# elif not self.options["auto_follow_redirects"]:
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# # Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
previous_redirectors.add(url)
# if status == "31":
# # Permanent redirect
# self.permanent_redirects[gi.url] = new_gi.url
return _fetch_gemini(newurl)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
print("Handling certificates for status 6X are not supported by offpunk\n")
print("Please open a bug report")
_fetch_gemini(url)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
assert status.startswith("2")
mime = meta
# Read the response body over the network
fbody = f.read()
#DEFAULT GEMINIMIME
if mime == "":
mime = "text/gemini; charset=utf-8"
shortmime, mime_options = parse_mime(mime)
if "charset" in mime_options:
try:
codecs.lookup(mime_options["charset"])
except LookupError:
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
#If the encoding is wrong, theres a high probably its UTF-8 with a bad header
mime_options["charset"] = "UTF-8"
if shortmime.startswith("text/"):
#Get the charset and default to UTF-8 in none
encoding = mime_options.get("charset", "UTF-8")
try:
body = fbody.decode(encoding)
except UnicodeError:
raise RuntimeError("Could not decode response body using %s\
encoding declared in header!" % encoding)
else:
body = fbody
cache = write_body(url,body,mime)
return cache
def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs):
url = normalize_url(url)
path=None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
if is_cache_valid(url,validity=validity):
path = get_cache_path(url)
#If we are offline, any cache is better than nothing
elif offline and is_cache_valid(url,validity=0):
path = get_cache_path(url)
elif "://" in url and not offline:
try:
scheme = url.split("://")[0]
if scheme not in standard_ports:
if print_error:
print("%s is not a supported protocol"%scheme)
path = None
elif scheme in ("http","https"):
path=_fetch_http(url,**kwargs)
elif scheme == "gopher":
path=_fetch_gopher(url,**kwargs)
elif scheme == "finger":
path=_fetch_finger(url,**kwargs)
elif scheme == "gemini":
path=_fetch_gemini(url,**kwargs)
else:
print("scheme %s not implemented yet")
except UserAbortException:
return
except Exception as err:
cache = set_error(url, err)
# Print an error message
# we fail silently when sync_only
if isinstance(err, socket.gaierror):
if print_error:
print("ERROR: DNS error!")
elif isinstance(err, ConnectionRefusedError):
if print_error:
print("ERROR1: Connection refused!")
elif isinstance(err, ConnectionResetError):
if print_error:
print("ERROR2: Connection reset!")
elif isinstance(err, (TimeoutError, socket.timeout)):
if print_error:
print("""ERROR3: Connection timed out!
Slow internet connection? Use 'set timeout' to be more patient.""")
elif isinstance(err, FileExistsError):
if print_error:
print("""ERROR5: Trying to create a directory which already exists
in the cache : """)
print(err)
elif isinstance(err,requests.exceptions.SSLError):
if print_error:
print("""ERROR6: Bad SSL certificate:\n""")
print(err)
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
print("""set accept_bad_ssl_certificates True""")
elif isinstance(err,requests.exceptions.ConnectionError):
if print_error:
print("""ERROR7: Cannot connect to URL:\n""")
print(str(err))
else:
if print_error:
import traceback
print("ERROR4: " + str(type(err)) + " : " + str(err))
#print("\n" + str(err.with_traceback(None)))
print(traceback.format_exc())
return cache
# We download images contained in the document (from full mode)
if not offline and download_image_first and images_mode:
renderer = ansicat.renderer_from_file(path,url)
if renderer:
for image in renderer.get_images(mode=images_mode):
#Image should exist, should be an url (not a data image)
#and should not be already cached
if image and not image.startswith("data:image/") and not is_cache_valid(image):
width = utils.term_width() - 1
toprint = "Downloading %s" %image
toprint = toprint[:width]
toprint += " "*(width-len(toprint))
print(toprint,end="\r")
#d_i_f and images_mode are False/None to avoid recursive downloading
#if that ever happen
fetch(image,offline=offline,download_image_first=False,\
images_mode=None,validity=0,**kwargs)
return path
def main():
# Parse arguments
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--path", action="store_true",
help="return path to the cache instead of the content of the cache")
parser.add_argument("--offline", action="store_true",
help="Do not attempt to download, return cached version or error")
parser.add_argument("--max-size", type=int,
help="Cancel download of items above that size (value in Mb).")
parser.add_argument("--timeout", type=int,
help="Time to wait before cancelling connection (in second).")
# No argument: write help
parser.add_argument('url', metavar='URL', nargs='*',
help='download URL and returns the content or the path to a cached version')
# arg = URL: download and returns cached URI
# --cache-validity : do not download if cache is valid
# --validity : returns the date of the cached version, Null if no version
# --force-download : download and replace cache, even if valid
args = parser.parse_args()
param = {}
for u in args.url:
if args.offline:
path = get_cache_path(u)
else:
print("Download URL: %s" %u)
path = fetch(u,max_size=args.max_size,timeout=args.timeout)
if args.path:
print(path)
else:
with open(path,"r") as f:
print(f.read())
f.close()
if __name__== '__main__':
main()

281
offpunk/opnk.py Normal file
View File

@ -0,0 +1,281 @@
#!/usr/bin/env python3
#opnk stand for "Open like a PuNK".
#It will open any file or URL and display it nicely in less.
#If not possible, it will fallback to xdg-open
#URL are retrieved through netcache
import os
import sys
import tempfile
import argparse
import shutil
import time
import fnmatch
from offpunk import ansicat, netcache, utils
from offpunk.utils import run,term_width,mode_url,unmode_url,is_local
_HAS_XDGOPEN = shutil.which('xdg-open')
_GREP = "grep --color=auto"
less_version = 0
if not shutil.which("less"):
print("Please install the pager \"less\" to run Offpunk.")
print("If you wish to use another pager, send me an email!")
print("(Im really curious to hear about people not having \"less\" on their system.)")
sys.exit()
output = run("less --version")
# We get less Version (which is the only integer on the first line)
words = output.split("\n")[0].split()
less_version = 0
for w in words:
if w.isdigit():
less_version = int(w)
# restoring position only works for version of less > 572
if less_version >= 572:
_LESS_RESTORE_POSITION = True
else:
_LESS_RESTORE_POSITION = False
#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s"
# -E : quit when reaching end of file (to behave like "cat")
# -F : quit if content fits the screen (behave like "cat")
# -X : does not clear the screen
# -R : interpret ANSI colors correctly
# -f : suppress warning for some contents
# -M : long prompt (to have info about where you are in the file)
# -W : hilite the new first line after a page skip (space)
# -i : ignore case in search
# -S : do not wrap long lines. Wrapping is done by offpunk, longlines
# are there on purpose (surch in asciiart)
#--incsearch : incremental search starting rev581
if less_version >= 581:
less_base = "less --incsearch --save-marks -~ -XRfMWiS"
elif less_version >= 572:
less_base = "less --save-marks -XRfMWiS"
else:
less_base = "less -XRfMWiS"
_DEFAULT_LESS = less_base + " \"+''\" %s"
_DEFAULT_CAT = less_base + " -EF %s"
def less_cmd(file, histfile=None,cat=False,grep=None):
if histfile:
env = {"LESSHISTFILE": histfile}
else:
env = {}
if cat:
cmd_str = _DEFAULT_CAT
elif grep:
grep_cmd = _GREP
#case insensitive for lowercase search
if grep.islower():
grep_cmd += " -i"
cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep
else:
cmd_str = _DEFAULT_LESS
run(cmd_str, parameter=file, direct_output=True, env=env)
class opencache():
def __init__(self):
# We have a cache of the rendering of file and, for each one,
# a less_histfile containing the current position in the file
self.temp_files = {}
self.less_histfile = {}
# This dictionary contains an url -> ansirenderer mapping. This allows
# to reuse a renderer when visiting several times the same URL during
# the same session
# We save the time at which the renderer was created in renderer_time
# This way, we can invalidate the renderer if a new version of the source
# has been downloaded
self.rendererdic = {}
self.renderer_time = {}
self.mime_handlers = {}
self.last_mode = {}
self.last_width = term_width(absolute=True)
def _get_handler_cmd(self, mimetype):
# Now look for a handler for this mimetype
# Consider exact matches before wildcard matches
exact_matches = []
wildcard_matches = []
for handled_mime, cmd_str in self.mime_handlers.items():
if "*" in handled_mime:
wildcard_matches.append((handled_mime, cmd_str))
else:
exact_matches.append((handled_mime, cmd_str))
for handled_mime, cmd_str in exact_matches + wildcard_matches:
if fnmatch.fnmatch(mimetype, handled_mime):
break
else:
# Use "xdg-open" as a last resort.
if _HAS_XDGOPEN:
cmd_str = "xdg-open %s"
else:
cmd_str = "echo \"Cant find how to open \"%s"
print("Please install xdg-open (usually from xdg-util package)")
return cmd_str
# Return the handler for a specific mimetype.
# Return the whole dic if no specific mime provided
def get_handlers(self,mime=None):
if mime and mime in self.mime_handlers.keys():
return self.mime_handlers[mime]
elif mime:
return None
else:
return self.mime_handlers
def set_handler(self,mime,handler):
previous = None
if mime in self.mime_handlers.keys():
previous = self.mime_handlers[mime]
self.mime_handlers[mime] = handler
if "%s" not in handler:
print("WARNING: this handler has no %%s, no filename will be provided to the command")
if previous:
print("Previous handler was %s"%previous)
def get_renderer(self,inpath,mode=None,theme=None):
# We remove the ##offpunk_mode= from the URL
# If mode is already set, we dont use the part from the URL
inpath,newmode = unmode_url(inpath)
if not mode: mode = newmode
# If we still doesnt have a mode, we see if we used one before
if not mode and inpath in self.last_mode.keys():
mode = self.last_mode[inpath]
elif not mode:
#default mode is readable
mode = "readable"
renderer = None
path = netcache.get_cache_path(inpath)
if path:
usecache = inpath in self.rendererdic.keys() and not is_local(inpath)
#Screen size may have changed
width = term_width(absolute=True)
if usecache and self.last_width != width:
self.cleanup()
usecache = False
self.last_width = width
if usecache:
if inpath in self.renderer_time.keys():
last_downloaded = netcache.cache_last_modified(inpath)
last_cached = self.renderer_time[inpath]
usecache = last_cached > last_downloaded
else:
usecache = False
if not usecache:
renderer = ansicat.renderer_from_file(path,inpath,theme=theme)
if renderer:
self.rendererdic[inpath] = renderer
self.renderer_time[inpath] = int(time.time())
else:
renderer = self.rendererdic[inpath]
return renderer
def get_temp_filename(self,url):
if url in self.temp_files.keys():
return self.temp_files[url]
else:
return None
def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs):
#Return True if inpath opened in Terminal
# False otherwise
#if terminal = False, we dont try to open in the terminal,
#we immediately fallback to xdg-open.
#netcache currently provide the path if its a file.
#may this should be migrated here.
if not utils.is_local(inpath):
kwargs["images_mode"] = mode
cachepath = netcache.fetch(inpath,**kwargs)
if not cachepath:
return False
elif "://" in inpath:
cachepath = netcache.fetch(inpath,**kwargs)
elif os.path.exists(inpath):
cachepath = inpath
else:
print("%s does not exist"%inpath)
return
renderer = self.get_renderer(inpath,mode=mode,theme=theme)
if renderer and mode:
renderer.set_mode(mode)
self.last_mode[inpath] = mode
if not mode and inpath in self.last_mode.keys():
mode = self.last_mode[inpath]
renderer.set_mode(mode)
#we use the full moded url as key for the dictionary
key = mode_url(inpath,mode)
if terminal and renderer:
#If this is an image and we have chafa/timg, we
#dont use less, we call it directly
if renderer.has_direct_display():
renderer.display(mode=mode,directdisplay=True)
return True
else:
body = renderer.display(mode=mode)
#Should we use the cache? only if it is not local and theres a cache
usecache = key in self.temp_files and not is_local(inpath)
if usecache:
#and the cache is still valid!
last_downloaded = netcache.cache_last_modified(inpath)
last_cached = os.path.getmtime(self.temp_files[key])
if last_downloaded > last_cached:
usecache = False
self.temp_files.pop(key)
self.less_histfile.pop(key)
# We actually put the body in a tmpfile before giving it to less
if not usecache:
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
self.temp_files[key] = tmpf.name
tmpf.write(body)
tmpf.close()
if key not in self.less_histfile:
firsttime = True
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
self.less_histfile[key] = tmpf.name
else:
#We dont want to restore positions in lists
firsttime = is_local(inpath)
less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep)
return True
#maybe, we have no renderer. Or we want to skip it.
else:
mimetype = ansicat.get_mime(cachepath)
if mimetype == "mailto":
resp = input("Send an email to %s Y/N? " %inpath)
if resp.strip().lower() in ("y", "yes"):
if _HAS_XDGOPEN :
run("xdg-open mailto:%s", parameter=inpath ,direct_output=True)
else:
print("Cannot find a mail client to send mail to %s" %inpath)
print("Please install xdg-open (usually from xdg-util package)")
return
else:
cmd_str = self._get_handler_cmd(mimetype)
try:
run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True)
except FileNotFoundError:
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
print("You can use the ! command to specify another handler program or pipeline.")
return False
#We remove the renderers from the cache and we also delete temp files
def cleanup(self):
while len(self.temp_files) > 0:
os.remove(self.temp_files.popitem()[1])
while len(self.less_histfile) > 0:
os.remove(self.less_histfile.popitem()[1])
self.last_width = None
self.rendererdic = {}
self.renderer_time = {}
self.last_mode = {}
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("content",metavar="INPUT", nargs="*",
default=sys.stdin, help="Path to the file or URL to open")
args = parser.parse_args()
cache = opencache()
for f in args.content:
cache.opnk(f)
if __name__ == "__main__":
main()

View File

@ -13,7 +13,7 @@ import shutil
import shlex
import urllib.parse
import urllib.parse
import cache_migration
from offpunk import cache_migration
CACHE_VERSION = 1

287
opnk.py
View File

@ -1,286 +1,3 @@
#!/usr/bin/env python3
#opnk stand for "Open like a PuNK".
#It will open any file or URL and display it nicely in less.
#If not possible, it will fallback to xdg-open
#URL are retrieved through netcache
import os
import sys
import tempfile
import argparse
import netcache
import ansicat
import offutils
import shutil
import time
import fnmatch
from offutils import run,term_width,mode_url,unmode_url,is_local
_HAS_XDGOPEN = shutil.which('xdg-open')
_GREP = "grep --color=auto"
less_version = 0
if not shutil.which("less"):
print("Please install the pager \"less\" to run Offpunk.")
print("If you wish to use another pager, send me an email!")
print("(Im really curious to hear about people not having \"less\" on their system.)")
sys.exit()
output = run("less --version")
# We get less Version (which is the only integer on the first line)
words = output.split("\n")[0].split()
less_version = 0
for w in words:
if w.isdigit():
less_version = int(w)
# restoring position only works for version of less > 572
if less_version >= 572:
_LESS_RESTORE_POSITION = True
else:
_LESS_RESTORE_POSITION = False
#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s"
# -E : quit when reaching end of file (to behave like "cat")
# -F : quit if content fits the screen (behave like "cat")
# -X : does not clear the screen
# -R : interpret ANSI colors correctly
# -f : suppress warning for some contents
# -M : long prompt (to have info about where you are in the file)
# -W : hilite the new first line after a page skip (space)
# -i : ignore case in search
# -S : do not wrap long lines. Wrapping is done by offpunk, longlines
# are there on purpose (surch in asciiart)
#--incsearch : incremental search starting rev581
if less_version >= 581:
less_base = "less --incsearch --save-marks -~ -XRfMWiS"
elif less_version >= 572:
less_base = "less --save-marks -XRfMWiS"
else:
less_base = "less -XRfMWiS"
_DEFAULT_LESS = less_base + " \"+''\" %s"
_DEFAULT_CAT = less_base + " -EF %s"
def less_cmd(file, histfile=None,cat=False,grep=None):
if histfile:
env = {"LESSHISTFILE": histfile}
else:
env = {}
if cat:
cmd_str = _DEFAULT_CAT
elif grep:
grep_cmd = _GREP
#case insensitive for lowercase search
if grep.islower():
grep_cmd += " -i"
cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep
else:
cmd_str = _DEFAULT_LESS
run(cmd_str, parameter=file, direct_output=True, env=env)
class opencache():
def __init__(self):
# We have a cache of the rendering of file and, for each one,
# a less_histfile containing the current position in the file
self.temp_files = {}
self.less_histfile = {}
# This dictionary contains an url -> ansirenderer mapping. This allows
# to reuse a renderer when visiting several times the same URL during
# the same session
# We save the time at which the renderer was created in renderer_time
# This way, we can invalidate the renderer if a new version of the source
# has been downloaded
self.rendererdic = {}
self.renderer_time = {}
self.mime_handlers = {}
self.last_mode = {}
self.last_width = term_width(absolute=True)
def _get_handler_cmd(self, mimetype):
# Now look for a handler for this mimetype
# Consider exact matches before wildcard matches
exact_matches = []
wildcard_matches = []
for handled_mime, cmd_str in self.mime_handlers.items():
if "*" in handled_mime:
wildcard_matches.append((handled_mime, cmd_str))
else:
exact_matches.append((handled_mime, cmd_str))
for handled_mime, cmd_str in exact_matches + wildcard_matches:
if fnmatch.fnmatch(mimetype, handled_mime):
break
else:
# Use "xdg-open" as a last resort.
if _HAS_XDGOPEN:
cmd_str = "xdg-open %s"
else:
cmd_str = "echo \"Cant find how to open \"%s"
print("Please install xdg-open (usually from xdg-util package)")
return cmd_str
# Return the handler for a specific mimetype.
# Return the whole dic if no specific mime provided
def get_handlers(self,mime=None):
if mime and mime in self.mime_handlers.keys():
return self.mime_handlers[mime]
elif mime:
return None
else:
return self.mime_handlers
def set_handler(self,mime,handler):
previous = None
if mime in self.mime_handlers.keys():
previous = self.mime_handlers[mime]
self.mime_handlers[mime] = handler
if "%s" not in handler:
print("WARNING: this handler has no %%s, no filename will be provided to the command")
if previous:
print("Previous handler was %s"%previous)
def get_renderer(self,inpath,mode=None,theme=None):
# We remove the ##offpunk_mode= from the URL
# If mode is already set, we dont use the part from the URL
inpath,newmode = unmode_url(inpath)
if not mode: mode = newmode
# If we still doesnt have a mode, we see if we used one before
if not mode and inpath in self.last_mode.keys():
mode = self.last_mode[inpath]
elif not mode:
#default mode is readable
mode = "readable"
renderer = None
path = netcache.get_cache_path(inpath)
if path:
usecache = inpath in self.rendererdic.keys() and not is_local(inpath)
#Screen size may have changed
width = term_width(absolute=True)
if usecache and self.last_width != width:
self.cleanup()
usecache = False
self.last_width = width
if usecache:
if inpath in self.renderer_time.keys():
last_downloaded = netcache.cache_last_modified(inpath)
last_cached = self.renderer_time[inpath]
usecache = last_cached > last_downloaded
else:
usecache = False
if not usecache:
renderer = ansicat.renderer_from_file(path,inpath,theme=theme)
if renderer:
self.rendererdic[inpath] = renderer
self.renderer_time[inpath] = int(time.time())
else:
renderer = self.rendererdic[inpath]
return renderer
def get_temp_filename(self,url):
if url in self.temp_files.keys():
return self.temp_files[url]
else:
return None
def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs):
#Return True if inpath opened in Terminal
# False otherwise
#if terminal = False, we dont try to open in the terminal,
#we immediately fallback to xdg-open.
#netcache currently provide the path if its a file.
#may this should be migrated here.
if not offutils.is_local(inpath):
kwargs["images_mode"] = mode
cachepath = netcache.fetch(inpath,**kwargs)
if not cachepath:
return False
elif "://" in inpath:
cachepath = netcache.fetch(inpath,**kwargs)
elif inpath.startswith("mailto:"):
cachepath = inpath
elif os.path.exists(inpath):
cachepath = inpath
else:
print("%s does not exist"%inpath)
return
renderer = self.get_renderer(inpath,mode=mode,theme=theme)
if renderer and mode:
renderer.set_mode(mode)
self.last_mode[inpath] = mode
if not mode and inpath in self.last_mode.keys():
mode = self.last_mode[inpath]
renderer.set_mode(mode)
#we use the full moded url as key for the dictionary
key = mode_url(inpath,mode)
if terminal and renderer:
#If this is an image and we have chafa/timg, we
#dont use less, we call it directly
if renderer.has_direct_display():
renderer.display(mode=mode,directdisplay=True)
return True
else:
body = renderer.display(mode=mode)
#Should we use the cache? only if it is not local and theres a cache
usecache = key in self.temp_files and not is_local(inpath)
if usecache:
#and the cache is still valid!
last_downloaded = netcache.cache_last_modified(inpath)
last_cached = os.path.getmtime(self.temp_files[key])
if last_downloaded > last_cached:
usecache = False
self.temp_files.pop(key)
self.less_histfile.pop(key)
# We actually put the body in a tmpfile before giving it to less
if not usecache:
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
self.temp_files[key] = tmpf.name
tmpf.write(body)
tmpf.close()
if key not in self.less_histfile:
firsttime = True
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
self.less_histfile[key] = tmpf.name
else:
#We dont want to restore positions in lists
firsttime = is_local(inpath)
less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep)
return True
#maybe, we have no renderer. Or we want to skip it.
else:
mimetype = ansicat.get_mime(cachepath)
if mimetype == "mailto":
mail = inpath[7:]
resp = input("Send an email to %s Y/N? " %mail)
if resp.strip().lower() in ("y", "yes"):
if _HAS_XDGOPEN :
run("xdg-open mailto:%s", parameter=mail,direct_output=True)
else:
print("Cannot find a mail client to send mail to %s" %inpath)
print("Please install xdg-open (usually from xdg-util package)")
return
else:
cmd_str = self._get_handler_cmd(mimetype)
try:
run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True)
except FileNotFoundError:
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
print("You can use the ! command to specify another handler program or pipeline.")
return False
#We remove the renderers from the cache and we also delete temp files
def cleanup(self):
while len(self.temp_files) > 0:
os.remove(self.temp_files.popitem()[1])
while len(self.less_histfile) > 0:
os.remove(self.less_histfile.popitem()[1])
self.last_width = None
self.rendererdic = {}
self.renderer_time = {}
self.last_mode = {}
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("content",metavar="INPUT", nargs="*",
default=sys.stdin, help="Path to the file or URL to open")
args = parser.parse_args()
cache = opencache()
for f in args.content:
cache.opnk(f)
if __name__ == "__main__":
main()
from offpunk.opnk import main
main()

View File

@ -45,9 +45,9 @@ Source = "https://git.sr.ht/~lioploum/offpunk"
[project.scripts]
offpunk = "offpunk:main"
netcache = "netcache:main"
ansicat = "ansicat:main"
opnk = "opnk:main"
netcache = "offpunk.netcache:main"
ansicat = "offpunk.ansicat:main"
opnk = "offpunk.opnk:main"
[tool.flit.sdist]
include = ["doc/", "man/", "CHANGELOG"]