offpunk/offutils.py

227 lines
7.7 KiB
Python
Raw Permalink Normal View History

2023-07-04 11:55:01 +00:00
#!/bin/python
#This file contains some utilities common to offpunk, ansirenderer and netcache.
#Currently, there are the following utilities:
#
# run : run a shell command and get the results with some security
# term_width : get or set the width to display on the terminal
import os
import io
import subprocess
import shutil
import shlex
2023-08-12 22:07:07 +00:00
import urllib.parse
import urllib.parse
import netcache_migration
import netcache
2023-08-29 09:47:51 +00:00
CACHE_VERSION = 1
2023-07-04 11:55:01 +00:00
# We upgrade the cache only once at startup, hence the UPGRADED variable
# This is only to avoid unecessary checks each time the cache is accessed
UPGRADED=False
def upgrade_cache(cache_folder):
#Lets read current version of the cache
version_path = cache_folder + ".version"
current_version = 0
if os.path.exists(version_path):
current_str = None
with open(version_path) as f:
current_str = f.read()
f.close()
try:
current_version = int(current_str)
except:
current_version = 0
#Now, lets upgrade the cache if needed
while current_version < CACHE_VERSION:
current_version += 1
upgrade_func = getattr(netcache_migration,"upgrade_to_"+str(current_version))
upgrade_func(cache_folder)
with open(version_path,"w") as f:
f.write(str(current_version))
f.close()
UPGRADED=True
#get xdg folder. Folder should be "cache", "data" or "config"
def xdg(folder="cache"):
## Config directories
## We implement our own python-xdg to avoid conflict with existing libraries.
_home = os.path.expanduser('~')
data_home = os.environ.get('XDG_DATA_HOME') or \
os.path.join(_home,'.local','share')
config_home = os.environ.get('XDG_CONFIG_HOME') or \
os.path.join(_home,'.config')
_CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/")
_DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/")
_old_config = os.path.expanduser("~/.offpunk/")
## Look for pre-existing config directory, if any
if os.path.exists(_old_config):
_CONFIG_DIR = _old_config
#if no XDG .local/share and not XDG .config, we use the old config
if not os.path.exists(data_home) and os.path.exists(_old_config):
_DATA_DIR = _CONFIG_DIR
## get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable
# if OFFPUNK_CACHE_PATH empty, set default to ~/.cache/offpunk
cache_home = os.environ.get('XDG_CACHE_HOME') or\
os.path.join(_home,'.cache')
_CACHE_PATH = os.environ.get('OFFPUNK_CACHE_PATH', \
os.path.join(os.path.expanduser(cache_home),"offpunk/"))
2024-01-23 20:21:15 +00:00
#Check that the cache path ends with "/"
if not _CACHE_PATH.endswith("/"):
_CACHE_PATH += "/"
os.makedirs(_CACHE_PATH,exist_ok=True)
if folder == "cache" and not UPGRADED:
upgrade_cache(_CACHE_PATH)
if folder == "cache":
return _CACHE_PATH
elif folder == "config":
return _CONFIG_DIR
elif folder == "data":
return _DATA_DIR
else:
print("No XDG folder for %s. Check your code."%folder)
return None
2023-08-29 09:47:51 +00:00
#An IPV6 URL should be put between []
#We try to detect them has location with more than 2 ":"
def fix_ipv6_url(url):
if not url or url.startswith("mailto"):
return url
if "://" in url:
schema, schemaless = url.split("://",maxsplit=1)
else:
schema, schemaless = None, url
if "/" in schemaless:
netloc, rest = schemaless.split("/",1)
if netloc.count(":") > 2 and "[" not in netloc and "]" not in netloc:
schemaless = "[" + netloc + "]" + "/" + rest
2024-02-15 15:16:37 +00:00
elif schemaless.count(":") > 2 and "[" not in schemaless and "]" not in schemaless:
schemaless = "[" + schemaless + "]/"
if schema:
return schema + "://" + schemaless
return schemaless
2023-08-29 09:47:51 +00:00
# Cheap and cheerful URL detector
def looks_like_url(word):
try:
if not word.strip():
return False
url = fix_ipv6_url(word).strip()
parsed = urllib.parse.urlparse(url)
#sometimes, urllib crashed only when requesting the port
port = parsed.port
scheme = word.split("://")[0]
mailto = word.startswith("mailto:")
start = scheme in netcache.standard_ports
local = scheme in ["file","list"]
if mailto:
return "@" in word
elif not local:
2024-02-15 15:16:37 +00:00
if start:
#IPv4
if "." in word or "localhost" in word:
return True
#IPv6
elif "[" in word and ":" in word and "]" in word:
return True
else: return False
else: return False
return start and ("." in word or "localhost" in word or ":" in word)
else:
return "/" in word
except ValueError:
return False
2023-08-29 09:47:51 +00:00
## Those two functions add/remove the mode to the
# URLs. This is a gross hack to remember the mode
def mode_url(url,mode):
if mode and mode!= "readable" and "##offpunk=" not in url:
url += "##offpunk_mode=" + mode
return url
def unmode_url(url):
mode = None
splitted = url.split("##offpunk_mode=")
if len(splitted) > 1:
url = splitted[0]
mode = splitted[1]
return [url,mode]
2023-07-04 11:55:01 +00:00
# In terms of arguments, this can take an input file/string to be passed to
# stdin, a parameter to do (well-escaped) "%" replacement on the command, a
# flag requesting that the output go directly to the stdout, and a list of
# additional environment variables to set.
def run(cmd, *, input=None, parameter=None, direct_output=False, env={}):
if parameter:
cmd = cmd % shlex.quote(parameter)
e = os.environ
e.update(env)
if isinstance(input, io.IOBase):
stdin = input
input = None
else:
if input:
input = input.encode()
stdin = None
if not direct_output:
# subprocess.check_output() wouldn't allow us to pass stdin.
result = subprocess.run(cmd, check=True, env=e, input=input,
shell=True, stdin=stdin, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return result.stdout.decode()
else:
subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin)
global TERM_WIDTH
2023-08-14 09:43:20 +00:00
TERM_WIDTH = 72
2023-07-04 11:55:01 +00:00
#if absolute, returns the real terminal width, not the text width
def term_width(new_width=None,absolute=False):
2023-07-04 11:55:01 +00:00
if new_width:
global TERM_WIDTH
TERM_WIDTH = new_width
cur = shutil.get_terminal_size()[0]
if absolute:
return cur
width = TERM_WIDTH
2023-07-04 11:55:01 +00:00
if cur < width:
width = cur
return width
2023-08-03 14:54:29 +00:00
def is_local(url):
2023-09-03 21:20:54 +00:00
if not url: return True
elif "://" in url:
scheme,path = url.split("://",maxsplit=1)
return scheme in ["file","mail","list","mailto"]
else:
return True
2023-08-03 14:54:29 +00:00
2023-08-12 22:07:07 +00:00
# This method return the image URL or invent it if its a base64 inline image
# It returns [url,image_data] where image_data is None for normal image
def looks_like_base64(src,baseurl):
imgdata = None
imgname = src
if src and src.startswith("data:image/"):
if ";base64," in src:
splitted = src.split(";base64,")
#splitted[0] is something like data:image/jpg
if "/" in splitted[0]:
extension = splitted[0].split("/")[1]
else:
extension = "data"
2023-08-12 22:07:07 +00:00
imgdata = splitted[1]
imgname = imgdata[:20] + "." + extension
imgurl = urllib.parse.urljoin(baseurl, imgname)
else:
#We cant handle other data:image such as svg for now
imgurl = None
2023-08-12 22:07:07 +00:00
else:
imgurl = urllib.parse.urljoin(baseurl, imgname)
return imgurl,imgdata