2023-07-04 11:55:01 +00:00
|
|
|
|
#!/bin/python
|
|
|
|
|
|
|
|
|
|
#This file contains some utilities common to offpunk, ansirenderer and netcache.
|
|
|
|
|
#Currently, there are the following utilities:
|
|
|
|
|
#
|
|
|
|
|
# run : run a shell command and get the results with some security
|
|
|
|
|
# term_width : get or set the width to display on the terminal
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import io
|
|
|
|
|
import subprocess
|
|
|
|
|
import shutil
|
|
|
|
|
import shlex
|
2023-08-12 22:07:07 +00:00
|
|
|
|
import urllib.parse
|
|
|
|
|
import urllib.parse
|
2023-11-08 15:45:27 +00:00
|
|
|
|
import netcache_migration
|
2023-12-01 23:11:34 +00:00
|
|
|
|
import netcache
|
2023-08-29 09:47:51 +00:00
|
|
|
|
|
|
|
|
|
CACHE_VERSION = 1
|
2023-07-04 11:55:01 +00:00
|
|
|
|
|
2023-12-04 10:10:20 +00:00
|
|
|
|
# We upgrade the cache only once at startup, hence the UPGRADED variable
|
|
|
|
|
# This is only to avoid unecessary checks each time the cache is accessed
|
|
|
|
|
UPGRADED=False
|
|
|
|
|
def upgrade_cache(cache_folder):
|
|
|
|
|
#Let’s read current version of the cache
|
|
|
|
|
version_path = cache_folder + ".version"
|
|
|
|
|
current_version = 0
|
|
|
|
|
if os.path.exists(version_path):
|
|
|
|
|
current_str = None
|
|
|
|
|
with open(version_path) as f:
|
|
|
|
|
current_str = f.read()
|
|
|
|
|
f.close()
|
|
|
|
|
try:
|
|
|
|
|
current_version = int(current_str)
|
|
|
|
|
except:
|
|
|
|
|
current_version = 0
|
|
|
|
|
#Now, let’s upgrade the cache if needed
|
|
|
|
|
while current_version < CACHE_VERSION:
|
|
|
|
|
current_version += 1
|
|
|
|
|
upgrade_func = getattr(netcache_migration,"upgrade_to_"+str(current_version))
|
|
|
|
|
upgrade_func(cache_folder)
|
|
|
|
|
with open(version_path,"w") as f:
|
|
|
|
|
f.write(str(current_version))
|
|
|
|
|
f.close()
|
|
|
|
|
UPGRADED=True
|
|
|
|
|
|
|
|
|
|
#get xdg folder. Folder should be "cache", "data" or "config"
|
|
|
|
|
def xdg(folder="cache"):
|
|
|
|
|
## Config directories
|
|
|
|
|
## We implement our own python-xdg to avoid conflict with existing libraries.
|
|
|
|
|
_home = os.path.expanduser('~')
|
|
|
|
|
data_home = os.environ.get('XDG_DATA_HOME') or \
|
|
|
|
|
os.path.join(_home,'.local','share')
|
|
|
|
|
config_home = os.environ.get('XDG_CONFIG_HOME') or \
|
|
|
|
|
os.path.join(_home,'.config')
|
|
|
|
|
_CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/")
|
|
|
|
|
_DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/")
|
|
|
|
|
_old_config = os.path.expanduser("~/.offpunk/")
|
|
|
|
|
## Look for pre-existing config directory, if any
|
|
|
|
|
if os.path.exists(_old_config):
|
|
|
|
|
_CONFIG_DIR = _old_config
|
|
|
|
|
#if no XDG .local/share and not XDG .config, we use the old config
|
|
|
|
|
if not os.path.exists(data_home) and os.path.exists(_old_config):
|
|
|
|
|
_DATA_DIR = _CONFIG_DIR
|
2024-01-23 13:26:47 +00:00
|
|
|
|
## get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable
|
|
|
|
|
# if OFFPUNK_CACHE_PATH empty, set default to ~/.cache/offpunk
|
2023-12-04 10:10:20 +00:00
|
|
|
|
cache_home = os.environ.get('XDG_CACHE_HOME') or\
|
|
|
|
|
os.path.join(_home,'.cache')
|
2024-01-23 13:26:47 +00:00
|
|
|
|
_CACHE_PATH = os.environ.get('OFFPUNK_CACHE_PATH', \
|
|
|
|
|
os.path.join(os.path.expanduser(cache_home),"offpunk/"))
|
2024-01-23 20:21:15 +00:00
|
|
|
|
#Check that the cache path ends with "/"
|
|
|
|
|
if not _CACHE_PATH.endswith("/"):
|
|
|
|
|
_CACHE_PATH += "/"
|
2023-12-04 10:10:20 +00:00
|
|
|
|
os.makedirs(_CACHE_PATH,exist_ok=True)
|
|
|
|
|
if folder == "cache" and not UPGRADED:
|
|
|
|
|
upgrade_cache(_CACHE_PATH)
|
|
|
|
|
if folder == "cache":
|
|
|
|
|
return _CACHE_PATH
|
|
|
|
|
elif folder == "config":
|
|
|
|
|
return _CONFIG_DIR
|
|
|
|
|
elif folder == "data":
|
|
|
|
|
return _DATA_DIR
|
|
|
|
|
else:
|
|
|
|
|
print("No XDG folder for %s. Check your code."%folder)
|
|
|
|
|
return None
|
2023-08-13 10:29:32 +00:00
|
|
|
|
|
2023-08-29 09:47:51 +00:00
|
|
|
|
|
|
|
|
|
|
2023-12-01 23:11:34 +00:00
|
|
|
|
#An IPV6 URL should be put between []
|
|
|
|
|
#We try to detect them has location with more than 2 ":"
|
|
|
|
|
def fix_ipv6_url(url):
|
|
|
|
|
if not url or url.startswith("mailto"):
|
|
|
|
|
return url
|
|
|
|
|
if "://" in url:
|
|
|
|
|
schema, schemaless = url.split("://",maxsplit=1)
|
|
|
|
|
else:
|
|
|
|
|
schema, schemaless = None, url
|
|
|
|
|
if "/" in schemaless:
|
|
|
|
|
netloc, rest = schemaless.split("/",1)
|
|
|
|
|
if netloc.count(":") > 2 and "[" not in netloc and "]" not in netloc:
|
|
|
|
|
schemaless = "[" + netloc + "]" + "/" + rest
|
2024-02-15 15:16:37 +00:00
|
|
|
|
elif schemaless.count(":") > 2 and "[" not in schemaless and "]" not in schemaless:
|
2023-12-01 23:11:34 +00:00
|
|
|
|
schemaless = "[" + schemaless + "]/"
|
|
|
|
|
if schema:
|
|
|
|
|
return schema + "://" + schemaless
|
|
|
|
|
return schemaless
|
2023-08-29 09:47:51 +00:00
|
|
|
|
|
2023-12-01 23:11:34 +00:00
|
|
|
|
# Cheap and cheerful URL detector
|
|
|
|
|
def looks_like_url(word):
|
|
|
|
|
try:
|
|
|
|
|
if not word.strip():
|
|
|
|
|
return False
|
|
|
|
|
url = fix_ipv6_url(word).strip()
|
|
|
|
|
parsed = urllib.parse.urlparse(url)
|
|
|
|
|
#sometimes, urllib crashed only when requesting the port
|
|
|
|
|
port = parsed.port
|
|
|
|
|
scheme = word.split("://")[0]
|
|
|
|
|
mailto = word.startswith("mailto:")
|
|
|
|
|
start = scheme in netcache.standard_ports
|
|
|
|
|
local = scheme in ["file","list"]
|
|
|
|
|
if mailto:
|
|
|
|
|
return "@" in word
|
|
|
|
|
elif not local:
|
2024-02-15 15:16:37 +00:00
|
|
|
|
if start:
|
|
|
|
|
#IPv4
|
|
|
|
|
if "." in word or "localhost" in word:
|
|
|
|
|
return True
|
|
|
|
|
#IPv6
|
|
|
|
|
elif "[" in word and ":" in word and "]" in word:
|
|
|
|
|
return True
|
|
|
|
|
else: return False
|
|
|
|
|
else: return False
|
|
|
|
|
return start and ("." in word or "localhost" in word or ":" in word)
|
2023-12-01 23:11:34 +00:00
|
|
|
|
else:
|
|
|
|
|
return "/" in word
|
|
|
|
|
except ValueError:
|
|
|
|
|
return False
|
2023-08-29 09:47:51 +00:00
|
|
|
|
|
2023-08-13 10:29:32 +00:00
|
|
|
|
## Those two functions add/remove the mode to the
|
|
|
|
|
# URLs. This is a gross hack to remember the mode
|
|
|
|
|
def mode_url(url,mode):
|
|
|
|
|
if mode and mode!= "readable" and "##offpunk=" not in url:
|
|
|
|
|
url += "##offpunk_mode=" + mode
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def unmode_url(url):
|
|
|
|
|
mode = None
|
|
|
|
|
splitted = url.split("##offpunk_mode=")
|
|
|
|
|
if len(splitted) > 1:
|
|
|
|
|
url = splitted[0]
|
|
|
|
|
mode = splitted[1]
|
|
|
|
|
return [url,mode]
|
|
|
|
|
|
2023-07-04 11:55:01 +00:00
|
|
|
|
# In terms of arguments, this can take an input file/string to be passed to
|
|
|
|
|
# stdin, a parameter to do (well-escaped) "%" replacement on the command, a
|
|
|
|
|
# flag requesting that the output go directly to the stdout, and a list of
|
|
|
|
|
# additional environment variables to set.
|
|
|
|
|
def run(cmd, *, input=None, parameter=None, direct_output=False, env={}):
|
|
|
|
|
if parameter:
|
|
|
|
|
cmd = cmd % shlex.quote(parameter)
|
|
|
|
|
e = os.environ
|
|
|
|
|
e.update(env)
|
|
|
|
|
if isinstance(input, io.IOBase):
|
|
|
|
|
stdin = input
|
|
|
|
|
input = None
|
|
|
|
|
else:
|
|
|
|
|
if input:
|
|
|
|
|
input = input.encode()
|
|
|
|
|
stdin = None
|
|
|
|
|
if not direct_output:
|
|
|
|
|
# subprocess.check_output() wouldn't allow us to pass stdin.
|
|
|
|
|
result = subprocess.run(cmd, check=True, env=e, input=input,
|
|
|
|
|
shell=True, stdin=stdin, stdout=subprocess.PIPE,
|
|
|
|
|
stderr=subprocess.STDOUT)
|
|
|
|
|
return result.stdout.decode()
|
|
|
|
|
else:
|
|
|
|
|
subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
global TERM_WIDTH
|
2023-08-14 09:43:20 +00:00
|
|
|
|
TERM_WIDTH = 72
|
2023-07-04 11:55:01 +00:00
|
|
|
|
|
2023-08-17 13:38:47 +00:00
|
|
|
|
#if absolute, returns the real terminal width, not the text width
|
|
|
|
|
def term_width(new_width=None,absolute=False):
|
2023-07-04 11:55:01 +00:00
|
|
|
|
if new_width:
|
|
|
|
|
global TERM_WIDTH
|
|
|
|
|
TERM_WIDTH = new_width
|
|
|
|
|
cur = shutil.get_terminal_size()[0]
|
2023-08-17 13:38:47 +00:00
|
|
|
|
if absolute:
|
|
|
|
|
return cur
|
|
|
|
|
width = TERM_WIDTH
|
2023-07-04 11:55:01 +00:00
|
|
|
|
if cur < width:
|
|
|
|
|
width = cur
|
|
|
|
|
return width
|
2023-08-03 14:54:29 +00:00
|
|
|
|
|
2023-08-11 21:31:33 +00:00
|
|
|
|
def is_local(url):
|
2023-09-03 21:20:54 +00:00
|
|
|
|
if not url: return True
|
|
|
|
|
elif "://" in url:
|
2023-08-11 21:31:33 +00:00
|
|
|
|
scheme,path = url.split("://",maxsplit=1)
|
|
|
|
|
return scheme in ["file","mail","list","mailto"]
|
|
|
|
|
else:
|
|
|
|
|
return True
|
2023-08-03 14:54:29 +00:00
|
|
|
|
|
2023-08-12 22:07:07 +00:00
|
|
|
|
|
|
|
|
|
# This method return the image URL or invent it if it’s a base64 inline image
|
|
|
|
|
# It returns [url,image_data] where image_data is None for normal image
|
|
|
|
|
def looks_like_base64(src,baseurl):
|
|
|
|
|
imgdata = None
|
|
|
|
|
imgname = src
|
|
|
|
|
if src and src.startswith("data:image/"):
|
|
|
|
|
if ";base64," in src:
|
|
|
|
|
splitted = src.split(";base64,")
|
2023-09-13 19:07:32 +00:00
|
|
|
|
#splitted[0] is something like data:image/jpg
|
|
|
|
|
if "/" in splitted[0]:
|
|
|
|
|
extension = splitted[0].split("/")[1]
|
|
|
|
|
else:
|
|
|
|
|
extension = "data"
|
2023-08-12 22:07:07 +00:00
|
|
|
|
imgdata = splitted[1]
|
|
|
|
|
imgname = imgdata[:20] + "." + extension
|
|
|
|
|
imgurl = urllib.parse.urljoin(baseurl, imgname)
|
|
|
|
|
else:
|
|
|
|
|
#We can’t handle other data:image such as svg for now
|
2023-10-19 22:06:59 +00:00
|
|
|
|
imgurl = None
|
2023-08-12 22:07:07 +00:00
|
|
|
|
else:
|
|
|
|
|
imgurl = urllib.parse.urljoin(baseurl, imgname)
|
|
|
|
|
return imgurl,imgdata
|