forked from solderpunk/AV-98
Compare commits
5 Commits
Author | SHA1 | Date |
---|---|---|
Lionel Dricot | 8b97acc5e0 | |
Lionel Dricot | 963ac3d7a3 | |
Austreelis | 6e215c0512 | |
Austreelis | 370e7e4dc5 | |
Austreelis | bc43e3150b |
89
CHANGELOG
89
CHANGELOG
|
@ -1,101 +1,14 @@
|
|||
# Offpunk History
|
||||
|
||||
## 2.3 - Unreleased
|
||||
- offpunk/netcache: fix IPv6 as an URL (bug #40)
|
||||
- ansicat: display empty files (instead of opening them with xdg-open)
|
||||
- fix escape sequence warning in python 3.12 (by Étienne Mollier) (Debian #1064209)
|
||||
- ansicat : fix crash when feedparser is crashing on bad RSS
|
||||
- netcache: fix spartan protocol error
|
||||
|
||||
## 2.2 - February 13th 2023
|
||||
- cache folder is now configurable through $OFFPUNK_CACHE_PATH environment variable (by prx)
|
||||
- offpunk: adding an URL to a list now update the view mode if url already present
|
||||
- netcache: solve an infinite gemini loop with code 6X (see also bug #31)
|
||||
- ansicat: added support for <video> HTML-element
|
||||
- ansicat: if chafa fails to load an image, fallback to timg if available
|
||||
- offpunk: add list autocompletion to "tour"
|
||||
- offpunk: removed "blackbox", which has not been used nor maintained
|
||||
- offpunk: "gus" was broken, it is functionnal again
|
||||
- opnk/offpunk: more informative prompt in less
|
||||
- ansicat: added support for HTML description elements <dt> and <dd> (by Bert Livens)
|
||||
- opnk: added "--mode" command-line argument (bug #39)
|
||||
- offpunk: support for "preformatted" theming (bug #38)
|
||||
- opnk/netcache: added "--cache-validity" command-line argument (bug #37)
|
||||
- ansicat: consider files as XML, not SVG, if they don’t have .svg extension
|
||||
- offpunk: fix "view link" crashing with link to empty files
|
||||
|
||||
## 2.1 - December 15th 2023
|
||||
- freshly updated gemtext/rss links are highlighted ("new_link" theme option)
|
||||
- offpunk : new "copy title" and "copy link" function
|
||||
- offpunk : new "view XX" feature where XX is a number to view information about a link
|
||||
- ansicat: added "--mode" option
|
||||
- redirections are now reflected in links and the cache (bug #28)
|
||||
- ansicat: avoid a crash when urllib.parse.urljoin fails
|
||||
- offpunk: Fix a crash when gus is called without parameters (Von Hohenheiden)
|
||||
- ansicat: fixed a crash when parsing wrong hidden_url in gemini (bug #32)
|
||||
- offpunk: offpunk --version doesn’t create the cache anymore (bug #27)
|
||||
- ansicat: fix a crash with HTML without title (bug #33)
|
||||
- netcache: gemini socket code can crash when IPv6 is disabled (mailing-list)
|
||||
|
||||
## 2.0 - November 16th 2023
|
||||
Changes since 1.10
|
||||
- IMPORTANT: Licence has been changed to AGPL for ideological reasons
|
||||
- IMPORTANT: Contact adress has been changed to offpunk2 on the same domain (because of spam)
|
||||
- IMPORTANT: code has been splitted into several differents files.
|
||||
- IMPORTANT: migrating from flit to hatchling (patch by Jean Abou Samra)
|
||||
Major features:
|
||||
- New command-line tool: "netcache"
|
||||
- New command-line tool: "ansicat"
|
||||
- New command-line tool: "opnk"
|
||||
- "theme" command allows customization of the colours
|
||||
- "--config-file" allows to start offpunk with custom config (#16)
|
||||
- "view source" to view the source code of a page
|
||||
- introduced the "default_protocol" options (default to gemini)
|
||||
Improvments:
|
||||
- Reading position is saved in less for the whole session
|
||||
- Rendering is cached for the session, allowing faster browsing of a page already visited
|
||||
- "redirect" supports domains starting with "*" to also block all subdomins
|
||||
- "--images-mode" allow to choose at startup which images should be dowloaded (none,readable,full)
|
||||
- Support for embedded multi-format rendering (such as RSS feeds with html elements)
|
||||
- The cache is now automatically upgraded if needed (see .version in your cache)
|
||||
- Images of html files are now downloaded with the html (slower sync but better reading experience)
|
||||
- "--sync" can optionnaly take some lists as arguments, in order to make for specific sync
|
||||
- initial tentative to support podcasts in RSS/Atom feeds
|
||||
Other notable changes from 1.X:
|
||||
- "accept_bad_ssl_certificates" now more agressive for http and really accepts them all
|
||||
- Gopher-only: we don’t support naming a page after the name of the incoming link
|
||||
- Gemini-only: support for client generated certificates has been removed
|
||||
- "file" is now marked as a dependency (thank Guillaume Loret)
|
||||
|
||||
## 2.0 (beta3 - final 2.0) - Released as 2.0
|
||||
Changes since beta2:
|
||||
- bug #25 : makes python-requests optional again
|
||||
- --disable-http had no effect: reimplemented
|
||||
- introduced the "default_protocol" options (default to gemini) to enter URLs without the :// part (fixes bug #21)
|
||||
|
||||
## 2.0-beta2 - November 8th 2023
|
||||
## 2.0-beta2 - unreleased
|
||||
Changes since beta1
|
||||
- IMPORTANT: migrating from flit to hatchling (patch by Jean Abou Samra)
|
||||
- "--sync" can optionnaly take some lists as arguments, in order to make for specific sync
|
||||
- "view source" to view the source code of a page
|
||||
- initial tentative to support podcasts in RSS/Atom feeds
|
||||
- new PlaintextRenderer which display .txt files without any margin/color/linebreaks
|
||||
- default URL blocked list is now its own file to make contributions easier
|
||||
- prompt color is now part of the theme
|
||||
- improves handling of base64 images
|
||||
- fixes gophermap being considered as gemtext files
|
||||
- fixes opening mailto links
|
||||
- fixes existing non-html ressources marked a to_fetch even when not needed (simple and/or confusion)
|
||||
- fixes a crash with RSS feeds without <link> element
|
||||
- fixes a crash with data:image/svg+xml links
|
||||
- fixes a bug in HTML renderer where some hX element were not closed properly
|
||||
- fixes input in Gemini while online
|
||||
- fixes a crash with invalid URL
|
||||
- fixes a crash while parsing invalid dates in RSS
|
||||
- fixes hang/crash when meeting the ";" itemtype in gopher
|
||||
- attempt at hiding XMLparsedAsHTMLWarning from BS4 library
|
||||
- chafa now used by default everywhere if version > 1.10
|
||||
- ignoring encoding error in ansicat
|
||||
|
||||
## 2.0-beta1 - September 05th 2023
|
||||
This is an an experimental release. Bug reports and feedbacks are welcome on the offpunk-devel list.
|
||||
|
|
47
README.md
47
README.md
|
@ -13,13 +13,13 @@ Offpunk is a fork of the original [AV-98](https://tildegit.org/solderpunk/AV-98)
|
|||
|
||||
## How to use
|
||||
|
||||
Offpunk is a set of python files. Installation is optional, you can simply git clone the project and run "./offpunk.py" or "python3 offpunk.py" in a terminal.
|
||||
Offpunk is a single python file. Installation is optional, you can simply download and run "./offpunk.py" or "python3 offpunk.py" in a terminal.
|
||||
|
||||
You use the `go` command to visit a URL, e.g. `go gemini.circumlunar.space`. (gemini:// is assumed if no protocol is specified. Supported protocols are gemini, gopher, finger, http, https, mailto, spartan and file. Default protocol is configurable).
|
||||
You use the `go` command to visit a URL, e.g. `go gemini.circumlunar.space`. (gemini:// is assumed if no protocol is specified. Supported protocols are gemini, gopher, finger, http, https, mailto, spartan and file).
|
||||
|
||||
Links in pages are assigned numerical indices. Just type an index to follow that link. If page is too long to fit on your screen, the content is displayed in the less pager. Type `q` to quit and go back to Offpunk prompt. Type `view` or `v` to display it again. (`view full` or `v full` allows to see the full html page instead of the article view. `v feed` try to display the linked RSS feed and `v feeds` displays a list of available feeds. This only applies to html pages. `v source` allows you to see the source code of a page and `v normal` to go back to normal view)
|
||||
Links in pages are assigned numerical indices. Just type an index to follow that link. If page is too long to fit on your screen, the content is displayed in the less pager (by default). Type `q` to quit and go back to Offpunk prompt. Type `view` or `v` to display it again. (`view full` or `v full` allows to see the full html page instead of the article view. `v feed` try to display the linked RSS feed and `v feeds` displays a list of available feeds. This only applies to html pages)
|
||||
|
||||
Use `add` to add a page to your bookmarks and `bookmarks` or `bm` to show your bookmarks (you can create multiple bookmarks lists, edit and remove them. See the `list` manual with `help list`).
|
||||
Use `add` to add a capsule to your bookmarks and `bookmarks` or `bm` to show your bookmarks (you can create multiple bookmarks lists, edit and remove them. See the `list` manual with `help list`).
|
||||
|
||||
Use `offline` to only browse cached content and `online` to go back online. While offline, the `reload` command will force a re-fetch during the next synchronisation.
|
||||
|
||||
|
@ -35,10 +35,6 @@ For example, running
|
|||
|
||||
will refresh your bookmarks if those are at least 12h old. If cache-validity is not set or set to 0, any cache is considered good and only content never cached before will be fetched. `--assume-yes` will automatically accept SSL certificates with errors instead of refusing them.
|
||||
|
||||
Sync can be applied to only a subset of list.
|
||||
|
||||
`offpunk --sync bookmarks tour to_fetch --cache-validity 3600`
|
||||
|
||||
Offpunk can also be configured as a browser by other tool. If you want to use offpunk directly with a given URL, simply type:
|
||||
|
||||
`offpunk URL`
|
||||
|
@ -57,16 +53,19 @@ Questions can be asked on the users mailing list:
|
|||
|
||||
## Dependencies
|
||||
|
||||
Offpunk has few "strict dependencies", i.e. it should run and work without anything
|
||||
Offpunk has no "strict dependencies", i.e. it should run and work without anything
|
||||
else beyond the Python standard library and the "less" pager. However, it will "opportunistically
|
||||
import" a few other libraries if they are available to offer an improved
|
||||
experience or some other features such as HTTP/HTML or image support.
|
||||
experience or some other features. Python libraries requests, bs4 and readability are required for http/html support. Images are displayed if chafa or timg are presents (python-pil is needed for chafa version before 1.10). When displaying only a picture (not inline), rendering will be pixel perfect in compatible terminals (such as Kitty) if chafa is at least version 1.8 or if timg is used.
|
||||
|
||||
To avoid using unstable or too recent libraries, the rule of thumb is that a library should be packaged in Debian/Ubuntu. Keep in mind that Offpunk is mainly tested will all libraries installed. If you encounter a crash without one optional dependencies, please report it. Patches and contributions to remove dependencies or support alternatives are highly appreciated.
|
||||
|
||||
* [List of existing Offpunk packages (Repology)](https://repology.org/project/offpunk/versions)
|
||||
* PIP: [requirements file to install dependencies with pip](requirements.txt)
|
||||
* Debian Unstable: [Official Package by Étienne Mollier](https://packages.debian.org/sid/offpunk)
|
||||
* Ubuntu/Debian: [command to install dependencies on Ubuntu/Debian without pip](ubuntu_dependencies.txt)
|
||||
* Arch: [AUR package for Arch Linux, maintained by kseistrup](https://aur.archlinux.org/packages/offpunk-git)
|
||||
* [Nix](https://nixos.org/): [package](https://github.com/NixOS/nixpkgs/blob/master/pkgs/applications/networking/browsers/offpunk/default.nix), maintained by [DamienCassou](https://github.com/DamienCassou)
|
||||
* Alpine Linux: [package maintained by mio](https://pkgs.alpinelinux.org/packages?name=offpunk)
|
||||
* Please contribute packages for other systems, there’s a [mailing-list dedicated to packaging](https://lists.sr.ht/~lioploum/offpunk-packagers).
|
||||
|
||||
Run command `version` in offpunk to see if you are missing some dependencies.
|
||||
|
@ -82,22 +81,17 @@ Dependencies to enable web browsing (packagers may put those in an offpunk-web m
|
|||
* [BeautifulSoup4](https://www.crummy.com/software/BeautifulSoup) and [Readability](https://github.com/buriy/python-readability) are both needed to render HTML. Without them, HTML will not be rendered or be sent to an external parser like Lynx. (apt-get install python3-bs4 python3-readability or pip3 install readability-lxml)
|
||||
* [Python-feedparser](https://github.com/kurtmckee/feedparser) will allow parsing of RSS/Atom feeds and thus subscriptions to them. (apt-get install python3-feedparser)
|
||||
* [Chafa](https://hpjansson.org/chafa/) allows to display pictures in your console. Install it and browse to an HTML page with picture to see the magic.
|
||||
|
||||
Gopher dependencies:
|
||||
* [Python-chardet](https://github.com/chardet/chardet) is used to detect the character encoding on Gopher (and may be used more in the future)
|
||||
|
||||
Older dependencies which are only needed on older systems (where chafa < 1.10)
|
||||
* [Timg](https://github.com/hzeller/timg) is a slower alternative to chafa for inline images. Might be deprecated in the future.
|
||||
* [Python-pil](http://python-pillow.github.io/) is required to only display the first frame of animated gif with chafa if chafa version is lower than 1.10. Might be deprecated in the future.
|
||||
* [Timg](https://github.com/hzeller/timg) is a slower alternative to chafa for inline images. But it has better rendering when displaying only the image. Install both to get the best of both world but if you need to choose one, choose Chafa.
|
||||
* [Python-pil](http://python-pillow.github.io/) is required to only display the first frame of animated gif with chafa if chafa version is lower than 1.10.
|
||||
|
||||
Nice to have (packagers should may make those optional):
|
||||
* [Xsel](http://www.vergenet.net/~conrad/software/xsel/) allows to `go` to the URL copied in the clipboard without having to paste it (both X and traditional clipboards are supported). Also needed to use the `copy` command. (apt-get install xsel)
|
||||
* [Python-setproctitle](https://github.com/dvarrazzo/py-setproctitle) will change the process name from "python" to "offpunk". Useful to kill it without killing every python service.
|
||||
* [Python-chardet](https://github.com/chardet/chardet) is used to detect the character encoding on Gopher (and may be used more in the future)
|
||||
|
||||
## Features
|
||||
|
||||
* Browse https/gemini/gopher without leaving your keyboard and without distractions
|
||||
* Customize your experience with the `theme` command.
|
||||
* Browse https/gemini/gopher/spartan without leaving your keyboard and without distractions
|
||||
* Built-in documentation: type `help` to get the list of command or a specific help about a command.
|
||||
* Offline mode to browse cached content without a connection. Requested elements are automatically fetched during the next synchronization and are added to your tour.
|
||||
* HTML pages are prettified to focus on content. Read without being disturbed or see the full page with `view full`.
|
||||
|
@ -108,15 +102,16 @@ Nice to have (packagers should may make those optional):
|
|||
* Ability to specify external handler programs for different MIME types (use `handler`)
|
||||
* Enhanced privacy with `redirect` which allows to block a http domain or to redirect all request to a privacy friendly frontent (such as nitter for twitter).
|
||||
* Non-interactive cache-building with configurable depth through the --sync command. The cache can easily be used by other software.
|
||||
* `netcache`, a standalone CLI tool to retrieve the cached version of a network ressource.
|
||||
* `ansicat`, a standalone CLI tool to render HTML/Gemtext/image in a terminal.
|
||||
* `opnk`, a standalone CLI tool to open any kind of ressources (local or network) and display it in your terminal or, if not possible, fallback to `xdg-open`.
|
||||
* IPv6 support
|
||||
* Supports any character encoding recognised by Python
|
||||
* Cryptography : TOFU or CA server certificate validation
|
||||
* Cryptography : Extensive client certificate support if an `openssl` binary is available
|
||||
|
||||
## RC files
|
||||
|
||||
You can use an RC file to automatically run any sequence of valid Offpunk
|
||||
commands upon start up. This can be used to make settings controlled with the
|
||||
`set`, `handler` or `themes` commands persistent. You can also put a `go` command in
|
||||
`set` or `handler` commanders persistent. You can also put a `go` command in
|
||||
your RC file to visit a "homepage" automatically on startup, or to pre-prepare
|
||||
a `tour` of your favourite Gemini sites or `offline` to go offline by default.
|
||||
|
||||
|
@ -126,7 +121,5 @@ The RC file should be called `offpunkrc` and goes in $XDG_CONFIG_DIR/offpunk (or
|
|||
|
||||
The offline content is stored in ~/.cache/offpunk/ as plain .gmi/.html files. The structure of the Gemini-space is tentatively recreated. One key element of the design is to avoid any database. The cache can thus be modified by hand, content can be removed, used or added by software other than offpunk.
|
||||
|
||||
The cache can be accessed/built with the `netcache` tool. See `netcache -h` for more informations.
|
||||
|
||||
There’s no feature to automatically trim the cache. But any part of the cache can safely be removed manually as there are no databases or complex synchronisation.
|
||||
There’s no feature to automatically trim the cache. But part of the cache can safely be removed manually.
|
||||
|
||||
|
|
1521
ansicat.py
1521
ansicat.py
File diff suppressed because it is too large
Load Diff
|
@ -54,8 +54,6 @@ either thanks to the MIME type,
|
|||
or from the file being rendered itself.
|
||||
.It Fl \-mime Ar MIME
|
||||
MIME type of the content to parse.
|
||||
.It Fl \-mode Ar MODE
|
||||
MODE to use to render to choose between normal (default), full or source
|
||||
.It Fl \-url Ar URL ...
|
||||
original URL of the content.
|
||||
.El
|
||||
|
|
|
@ -27,15 +27,6 @@ otherwise it would always refresh it from the version available online.
|
|||
It is also useful for mapping a given URL to its location in the cache,
|
||||
independently of whether it has been downloaded first.
|
||||
.Pp
|
||||
Default cache path is
|
||||
.Pa ~/.cache/offpunk .
|
||||
Set
|
||||
.Ev OFFPUNK_CACHE_PATH
|
||||
environment variable to use another location.
|
||||
.Bd -literal
|
||||
OFFPUNK_CACHE_PATH=/home/ploum/custom-cache netcache.py gemini://some.url
|
||||
.Ed
|
||||
.Pp
|
||||
.Xr Offpunk 1
|
||||
is a command-line browser and feed reader dedicated to browsing the Web,
|
||||
Gemini, Gopher and Spartan.
|
||||
|
@ -56,8 +47,6 @@ The value is expressed in megabytes.
|
|||
.It Fl \-timeout Ar TIMEOUT
|
||||
time to wait before cancelling connection.
|
||||
The value is expressed in seconds.
|
||||
.It Fl \-cache-validity CACHE_VALIDITY
|
||||
Maximum age (in second) of the cached version before redownloading a new version.
|
||||
.El
|
||||
.
|
||||
.Sh EXIT STATUS
|
||||
|
|
|
@ -37,10 +37,6 @@ path to the file or URL to open.
|
|||
.Bl -tag -width Ds -offset indent
|
||||
.It Fl h , \-help
|
||||
Show a help message and exit
|
||||
.It Fl \-mode Ar MODE
|
||||
MODE to use to render to choose between normal (default), full or source
|
||||
.It Fl \-cache-validity CACHE_VALIDITY
|
||||
Maximum age (in second) of the cached version before redownloading a new version.
|
||||
.El
|
||||
.
|
||||
.Sh EXIT STATUS
|
||||
|
|
914
netcache.py
914
netcache.py
|
@ -1,913 +1,3 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import urllib.parse
|
||||
import argparse
|
||||
import codecs
|
||||
import getpass
|
||||
import socket
|
||||
import ssl
|
||||
import glob
|
||||
import datetime
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from ssl import CertificateError
|
||||
import ansicat
|
||||
import offutils
|
||||
from offutils import xdg
|
||||
import time
|
||||
try:
|
||||
import chardet
|
||||
_HAS_CHARDET = True
|
||||
except ModuleNotFoundError:
|
||||
_HAS_CHARDET = False
|
||||
|
||||
try:
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
_HAS_CRYPTOGRAPHY = True
|
||||
_BACKEND = default_backend()
|
||||
except(ModuleNotFoundError,ImportError):
|
||||
_HAS_CRYPTOGRAPHY = False
|
||||
try:
|
||||
import requests
|
||||
_DO_HTTP = True
|
||||
except (ModuleNotFoundError,ImportError):
|
||||
_DO_HTTP = False
|
||||
|
||||
# This list is also used as a list of supported protocols
|
||||
standard_ports = {
|
||||
"gemini" : 1965,
|
||||
"gopher" : 70,
|
||||
"finger" : 79,
|
||||
"http" : 80,
|
||||
"https" : 443,
|
||||
"spartan": 300,
|
||||
}
|
||||
default_protocol = "gemini"
|
||||
|
||||
CRLF = '\r\n'
|
||||
DEFAULT_TIMEOUT = 10
|
||||
_MAX_REDIRECTS = 5
|
||||
|
||||
# monkey-patch Gemini support in urllib.parse
|
||||
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
|
||||
urllib.parse.uses_relative.append("gemini")
|
||||
urllib.parse.uses_netloc.append("gemini")
|
||||
urllib.parse.uses_relative.append("spartan")
|
||||
urllib.parse.uses_netloc.append("spartan")
|
||||
|
||||
|
||||
class UserAbortException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def parse_mime(mime):
|
||||
options = {}
|
||||
if mime:
|
||||
if ";" in mime:
|
||||
splited = mime.split(";",maxsplit=1)
|
||||
mime = splited[0]
|
||||
if len(splited) >= 1:
|
||||
options_list = splited[1].split()
|
||||
for o in options_list:
|
||||
spl = o.split("=",maxsplit=1)
|
||||
if len(spl) > 0:
|
||||
options[spl[0]] = spl[1]
|
||||
return mime, options
|
||||
|
||||
def normalize_url(url):
|
||||
if "://" not in url and ("./" not in url and url[0] != "/"):
|
||||
if not url.startswith("mailto:"):
|
||||
url = "gemini://" + url
|
||||
return url
|
||||
|
||||
|
||||
def cache_last_modified(url):
|
||||
if not url:
|
||||
return None
|
||||
path = get_cache_path(url)
|
||||
if path and os.path.isfile(path):
|
||||
return os.path.getmtime(path)
|
||||
else:
|
||||
return None
|
||||
|
||||
def is_cache_valid(url,validity=0):
|
||||
# Validity is the acceptable time for
|
||||
# a cache to be valid (in seconds)
|
||||
# If 0, then any cache is considered as valid
|
||||
# (use validity = 1 if you want to refresh everything)
|
||||
if offutils.is_local(url):
|
||||
return True
|
||||
cache = get_cache_path(url)
|
||||
if cache :
|
||||
# If path is too long, we always return True to avoid
|
||||
# fetching it.
|
||||
if len(cache) > 259:
|
||||
print("We return False because path is too long")
|
||||
return False
|
||||
if os.path.exists(cache) and not os.path.isdir(cache):
|
||||
if validity > 0 :
|
||||
last_modification = cache_last_modified(url)
|
||||
now = time.time()
|
||||
age = now - last_modification
|
||||
return age < validity
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
#Cache has not been build
|
||||
return False
|
||||
else:
|
||||
#There’s not even a cache!
|
||||
return False
|
||||
|
||||
def get_cache_path(url,add_index=True):
|
||||
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
|
||||
# In that case, we need to reconstruct it
|
||||
# if add_index=False, we don’t add that "index.gmi" at the ends of the cache_path
|
||||
#First, we parse the URL
|
||||
if not url:
|
||||
return None
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
|
||||
scheme = "file"
|
||||
elif parsed.scheme:
|
||||
scheme = parsed.scheme
|
||||
else:
|
||||
scheme = default_protocol
|
||||
if scheme in ["file","mailto","list"]:
|
||||
local = True
|
||||
host = ""
|
||||
port = None
|
||||
# file:// is 7 char
|
||||
if url.startswith("file://"):
|
||||
path = url[7:]
|
||||
elif scheme == "mailto":
|
||||
path = parsed.path
|
||||
elif url.startswith("list://"):
|
||||
listdir = os.path.join(xdg("data"),"lists")
|
||||
listname = url[7:].lstrip("/")
|
||||
if listname in [""]:
|
||||
name = "My Lists"
|
||||
path = listdir
|
||||
else:
|
||||
name = listname
|
||||
path = os.path.join(listdir, "%s.gmi"%listname)
|
||||
else:
|
||||
path = url
|
||||
else:
|
||||
local = False
|
||||
# Convert unicode hostname to punycode using idna RFC3490
|
||||
host = parsed.hostname #.encode("idna").decode()
|
||||
port = parsed.port or standard_ports.get(scheme, 0)
|
||||
# special gopher selector case
|
||||
if scheme == "gopher":
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
path = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
path = ""
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s",";"):
|
||||
mime = "binary"
|
||||
else:
|
||||
mime = "text/gopher"
|
||||
else:
|
||||
path = parsed.path
|
||||
if parsed.query:
|
||||
# we don’t add the query if path is too long because path above 260 char
|
||||
# are not supported and crash python.
|
||||
# Also, very long query are usually useless stuff
|
||||
if len(path+parsed.query) < 258:
|
||||
path += "/" + parsed.query
|
||||
|
||||
# Now, we have a partial path. Let’s make it full path.
|
||||
if local:
|
||||
cache_path = path
|
||||
elif scheme and host:
|
||||
cache_path = os.path.expanduser(xdg("cache") + scheme + "/" + host + path)
|
||||
#There’s an OS limitation of 260 characters per path.
|
||||
#We will thus cut the path enough to add the index afterward
|
||||
cache_path = cache_path[:249]
|
||||
# this is a gross hack to give a name to
|
||||
# index files. This will break if the index is not
|
||||
# index.gmi. I don’t know how to know the real name
|
||||
# of the file. But first, we need to ensure that the domain name
|
||||
# finish by "/". Else, the cache will create a file, not a folder.
|
||||
if scheme.startswith("http"):
|
||||
index = "index.html"
|
||||
elif scheme == "finger":
|
||||
index = "index.txt"
|
||||
elif scheme == "gopher":
|
||||
index = "gophermap"
|
||||
else:
|
||||
index = "index.gmi"
|
||||
if path == "" or os.path.isdir(cache_path):
|
||||
if not cache_path.endswith("/"):
|
||||
cache_path += "/"
|
||||
if not url.endswith("/"):
|
||||
url += "/"
|
||||
if add_index and cache_path.endswith("/"):
|
||||
cache_path += index
|
||||
#sometimes, the index itself is a dir
|
||||
#like when folder/index.gmi?param has been created
|
||||
#and we try to access folder
|
||||
if add_index and os.path.isdir(cache_path):
|
||||
cache_path += "/" + index
|
||||
else:
|
||||
#URL is missing either a supported scheme or a valid host
|
||||
#print("Error: %s is not a supported url"%url)
|
||||
return None
|
||||
if len(cache_path) > 259:
|
||||
print("Path is too long. This is an OS limitation.\n\n")
|
||||
print(url)
|
||||
return None
|
||||
return cache_path
|
||||
|
||||
def write_body(url,body,mime=None):
|
||||
## body is a copy of the raw gemtext
|
||||
## Write_body() also create the cache !
|
||||
# DEFAULT GEMINI MIME
|
||||
mime, options = parse_mime(mime)
|
||||
cache_path = get_cache_path(url)
|
||||
if cache_path:
|
||||
if mime and mime.startswith("text/"):
|
||||
mode = "w"
|
||||
else:
|
||||
mode = "wb"
|
||||
cache_dir = os.path.dirname(cache_path)
|
||||
# If the subdirectory already exists as a file (not a folder)
|
||||
# We remove it (happens when accessing URL/subfolder before
|
||||
# URL/subfolder/file.gmi.
|
||||
# This causes loss of data in the cache
|
||||
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
|
||||
# If the subdirectory doesn’t exist, we recursively try to find one
|
||||
# until it exists to avoid a file blocking the creation of folders
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
with open(cache_path, mode=mode) as f:
|
||||
f.write(body)
|
||||
f.close()
|
||||
return cache_path
|
||||
|
||||
|
||||
def set_error(url,err):
|
||||
# If we get an error, we want to keep an existing cache
|
||||
# but we need to touch it or to create an empty one
|
||||
# to avoid hitting the error at each refresh
|
||||
cache = get_cache_path(url)
|
||||
if is_cache_valid(url):
|
||||
os.utime(cache)
|
||||
elif cache:
|
||||
cache_dir = os.path.dirname(cache)
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
if os.path.isdir(cache_dir):
|
||||
with open(cache, "w") as c:
|
||||
c.write(str(datetime.datetime.now())+"\n")
|
||||
c.write("ERROR while caching %s\n\n" %url)
|
||||
c.write("*****\n\n")
|
||||
c.write(str(type(err)) + " = " + str(err))
|
||||
#cache.write("\n" + str(err.with_traceback(None)))
|
||||
c.write("\n*****\n\n")
|
||||
c.write("If you believe this error was temporary, type ""reload"".\n")
|
||||
c.write("The ressource will be tentatively fetched during next sync.\n")
|
||||
c.close()
|
||||
return cache
|
||||
|
||||
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs):
|
||||
if not _DO_HTTP: return None
|
||||
def too_large_error(url,length,max_size):
|
||||
err = "Size of %s is %s Mo\n"%(url,length)
|
||||
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
|
||||
err += "To retrieve this content anyway, type 'reload'."
|
||||
return set_error(url,err)
|
||||
if accept_bad_ssl_certificates:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
|
||||
requests.packages.urllib3.disable_warnings()
|
||||
verify=False
|
||||
else:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2'
|
||||
verify=True
|
||||
header = {}
|
||||
header["User-Agent"] = "Netcache"
|
||||
with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
|
||||
if "content-type" in response.headers:
|
||||
mime = response.headers['content-type']
|
||||
else:
|
||||
mime = None
|
||||
if "content-length" in response.headers:
|
||||
length = int(response.headers['content-length'])
|
||||
else:
|
||||
length = 0
|
||||
if max_size and length > max_size:
|
||||
response.close()
|
||||
return too_large_error(url,str(length/100),max_size)
|
||||
elif max_size and length == 0:
|
||||
body = b''
|
||||
downloaded = 0
|
||||
for r in response.iter_content():
|
||||
body += r
|
||||
#We divide max_size for streamed content
|
||||
#in order to catch them faster
|
||||
size = sys.getsizeof(body)
|
||||
max = max_size/2
|
||||
current = round(size*100/max,1)
|
||||
if current > downloaded:
|
||||
downloaded = current
|
||||
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
|
||||
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
|
||||
if size > max_size/2:
|
||||
response.close()
|
||||
return too_large_error(url,"streaming",max_size)
|
||||
response.close()
|
||||
else:
|
||||
body = response.content
|
||||
response.close()
|
||||
if mime and "text/" in mime:
|
||||
body = body.decode("UTF-8","replace")
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed =urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or 70
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
selector = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
selector = ""
|
||||
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
|
||||
s = socket.create_connection((host,port))
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
if parsed.query:
|
||||
request = selector + "\t" + parsed.query
|
||||
else:
|
||||
request = selector
|
||||
request += "\r\n"
|
||||
s.sendall(request.encode("UTF-8"))
|
||||
response1 = s.makefile("rb")
|
||||
response = response1.read()
|
||||
# Transcode response into UTF-8
|
||||
#if itemtype in ("0","1","h"):
|
||||
if not itemtype in ("9","g","I","s",";"):
|
||||
# Try most common encodings
|
||||
for encoding in ("UTF-8", "ISO-8859-1"):
|
||||
try:
|
||||
response = response.decode("UTF-8")
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
else:
|
||||
# try to find encoding
|
||||
if _HAS_CHARDET:
|
||||
detected = chardet.detect(response)
|
||||
response = response.decode(detected["encoding"])
|
||||
else:
|
||||
raise UnicodeDecodeError
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s",";"):
|
||||
mime = None
|
||||
else:
|
||||
# by default, we should consider Gopher
|
||||
mime = "text/gopher"
|
||||
cache = write_body(url,response,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or standard_ports["finger"]
|
||||
query = parsed.path.lstrip("/") + "\r\n"
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
sock.settimeout(timeout)
|
||||
sock.send(query.encode())
|
||||
response = sock.makefile("rb").read().decode("UTF-8")
|
||||
cache = write_body(response,"text/plain")
|
||||
return cache
|
||||
|
||||
# Originally copied from reference spartan client by Michael Lazar
|
||||
def _fetch_spartan(url,**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["spartan"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
redirect_url = None
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
if query:
|
||||
data = urllib.parse.unquote_to_bytes(query)
|
||||
else:
|
||||
data = b""
|
||||
encoded_host = host.encode("idna")
|
||||
ascii_path = urllib.parse.unquote_to_bytes(path)
|
||||
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
|
||||
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
|
||||
fp = sock.makefile("rb")
|
||||
response = fp.readline(4096).decode("ascii").strip("\r\n")
|
||||
parts = response.split(" ",maxsplit=1)
|
||||
code,meta = int(parts[0]),parts[1]
|
||||
if code == 2:
|
||||
body = fp.read()
|
||||
if meta.startswith("text"):
|
||||
body = body.decode("UTF-8")
|
||||
cache = write_body(url,body,meta)
|
||||
elif code == 3:
|
||||
redirect_url = url_parts._replace(path=meta).geturl()
|
||||
else:
|
||||
return set_error(url,"Spartan code %s: Error %s"%(code,meta))
|
||||
if redirect_url:
|
||||
cache = _fetch_spartan(redirect_url)
|
||||
return cache
|
||||
|
||||
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
|
||||
"""
|
||||
Validate a TLS certificate in TOFU mode.
|
||||
|
||||
If the cryptography module is installed:
|
||||
- Check the certificate Common Name or SAN matches `host`
|
||||
- Check the certificate's not valid before date is in the past
|
||||
- Check the certificate's not valid after date is in the future
|
||||
|
||||
Whether the cryptography module is installed or not, check the
|
||||
certificate's fingerprint against the TOFU database to see if we've
|
||||
previously encountered a different certificate for this IP address and
|
||||
hostname.
|
||||
"""
|
||||
now = datetime.datetime.utcnow()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Using the cryptography module we can get detailed access
|
||||
# to the properties of even self-signed certs, unlike in
|
||||
# the standard ssl library...
|
||||
c = x509.load_der_x509_certificate(cert, _BACKEND)
|
||||
# Check certificate validity dates
|
||||
if accept_bad_ssl:
|
||||
if c.not_valid_before >= now:
|
||||
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
|
||||
elif c.not_valid_after <= now:
|
||||
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
|
||||
|
||||
# Check certificate hostnames
|
||||
names = []
|
||||
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
|
||||
if common_name:
|
||||
names.append(common_name[0].value)
|
||||
try:
|
||||
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
|
||||
except x509.ExtensionNotFound:
|
||||
pass
|
||||
names = set(names)
|
||||
for name in names:
|
||||
try:
|
||||
ssl._dnsname_match(str(name), host)
|
||||
break
|
||||
except CertificateError:
|
||||
continue
|
||||
else:
|
||||
# If we didn't break out, none of the names were valid
|
||||
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
|
||||
|
||||
sha = hashlib.sha256()
|
||||
sha.update(cert)
|
||||
fingerprint = sha.hexdigest()
|
||||
|
||||
db_path = os.path.join(xdg("config"), "tofu.db")
|
||||
db_conn = sqlite3.connect(db_path)
|
||||
db_cur = db_conn.cursor()
|
||||
|
||||
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
|
||||
(hostname text, address text, fingerprint text,
|
||||
first_seen date, last_seen date, count integer)""")
|
||||
# Have we been here before?
|
||||
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
|
||||
FROM cert_cache
|
||||
WHERE hostname=? AND address=?""", (host, address))
|
||||
cached_certs = db_cur.fetchall()
|
||||
|
||||
# If so, check for a match
|
||||
if cached_certs:
|
||||
max_count = 0
|
||||
most_frequent_cert = None
|
||||
for cached_fingerprint, first, last, count in cached_certs:
|
||||
if count > max_count:
|
||||
max_count = count
|
||||
most_frequent_cert = cached_fingerprint
|
||||
if fingerprint == cached_fingerprint:
|
||||
# Matched!
|
||||
db_cur.execute("""UPDATE cert_cache
|
||||
SET last_seen=?, count=?
|
||||
WHERE hostname=? AND address=? AND fingerprint=?""",
|
||||
(now, count+1, host, address, fingerprint))
|
||||
db_conn.commit()
|
||||
break
|
||||
else:
|
||||
certdir = os.path.join(xdg("config"), "cert_cache")
|
||||
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
|
||||
previous_cert = fp.read()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Load the most frequently seen certificate to see if it has
|
||||
# expired
|
||||
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
|
||||
previous_ttl = previous_cert.not_valid_after - now
|
||||
print(previous_ttl)
|
||||
|
||||
print("****************************************")
|
||||
print("[SECURITY WARNING] Unrecognised certificate!")
|
||||
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
|
||||
print("This MIGHT be a Man-in-the-Middle attack.")
|
||||
print("A different certificate has previously been seen {} times.".format(max_count))
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
if previous_ttl < datetime.timedelta():
|
||||
print("That certificate has expired, which reduces suspicion somewhat.")
|
||||
else:
|
||||
print("That certificate is still valid for: {}".format(previous_ttl))
|
||||
print("****************************************")
|
||||
print("Attempt to verify the new certificate fingerprint out-of-band:")
|
||||
print(fingerprint)
|
||||
if automatic_choice:
|
||||
choice = automatic_choice
|
||||
else:
|
||||
choice = input("Accept this new certificate? Y/N ").strip().lower()
|
||||
if choice in ("y", "yes"):
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
else:
|
||||
raise Exception("TOFU Failure!")
|
||||
|
||||
# If not, cache this cert
|
||||
else:
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
certdir = os.path.join(xdg("config"), "cert_cache")
|
||||
if not os.path.exists(certdir):
|
||||
os.makedirs(certdir)
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
|
||||
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
|
||||
**kwargs):
|
||||
cache = None
|
||||
newurl = url
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["gemini"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
# In AV-98, this was the _send_request method
|
||||
#Send a selector to a given host and port.
|
||||
#Returns the resolved address and binary file with the reply."""
|
||||
host = host.encode("idna").decode()
|
||||
# Do DNS resolution
|
||||
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
|
||||
if ":" in host:
|
||||
# This is likely a literal IPv6 address, so we can *only* ask for
|
||||
# IPv6 addresses or getaddrinfo will complain
|
||||
family_mask = socket.AF_INET6
|
||||
elif socket.has_ipv6:
|
||||
# Accept either IPv4 or IPv6 addresses
|
||||
family_mask = 0
|
||||
else:
|
||||
# IPv4 only
|
||||
family_mask = socket.AF_INET
|
||||
addresses = socket.getaddrinfo(host, port, family=family_mask,
|
||||
type=socket.SOCK_STREAM)
|
||||
# Sort addresses so IPv6 ones come first
|
||||
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
|
||||
## Continuation of send_request
|
||||
# Prepare TLS context
|
||||
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
|
||||
context = ssl.SSLContext(protocol)
|
||||
context.check_hostname=False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
# Impose minimum TLS version
|
||||
## In 3.7 and above, this is easy...
|
||||
if sys.version_info.minor >= 7:
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
## Otherwise, it seems very hard...
|
||||
## The below is less strict than it ought to be, but trying to disable
|
||||
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
|
||||
## with recent versions of OpenSSL. What a mess...
|
||||
else:
|
||||
context.options |= ssl.OP_NO_SSLv3
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
# Try to enforce sensible ciphers
|
||||
try:
|
||||
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
|
||||
except ssl.SSLError:
|
||||
# Rely on the server to only support sensible things, I guess...
|
||||
pass
|
||||
# Connect to remote host by any address possible
|
||||
err = None
|
||||
for address in addresses:
|
||||
try:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
s = context.wrap_socket(s, server_hostname = host)
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
else:
|
||||
# If we couldn't connect to *any* of the addresses, just
|
||||
# bubble up the exception from the last attempt and deny
|
||||
# knowledge of earlier failures.
|
||||
raise err
|
||||
|
||||
# Do TOFU
|
||||
cert = s.getpeercert(binary_form=True)
|
||||
# Remember that we showed the current cert to this domain...
|
||||
#TODO : accept badssl and automatic choice
|
||||
_validate_cert(address[4][0], host, cert,automatic_choice="y")
|
||||
# Send request and wrap response in a file descriptor
|
||||
url = urllib.parse.urlparse(url)
|
||||
new_netloc = host
|
||||
#Handle IPV6 hostname
|
||||
if ":" in new_netloc:
|
||||
new_netloc = "[" + new_netloc + "]"
|
||||
if port != standard_ports["gemini"]:
|
||||
new_netloc += ":" + str(port)
|
||||
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
|
||||
s.sendall((url + CRLF).encode("UTF-8"))
|
||||
f= s.makefile(mode = "rb")
|
||||
## end of send_request in AV98
|
||||
# Spec dictates <META> should not exceed 1024 bytes,
|
||||
# so maximum valid header length is 1027 bytes.
|
||||
header = f.readline(1027)
|
||||
header = urllib.parse.unquote(header.decode("UTF-8"))
|
||||
if not header or header[-1] != '\n':
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
header = header.strip()
|
||||
# Validate header
|
||||
status, meta = header.split(maxsplit=1)
|
||||
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
|
||||
f.close()
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
# Update redirect loop/maze escaping state
|
||||
if not status.startswith("3"):
|
||||
previous_redirectors = set()
|
||||
#TODO FIXME
|
||||
else:
|
||||
#we set a previous_redirectors anyway because refactoring in progress
|
||||
previous_redirectors = set()
|
||||
# Handle non-SUCCESS headers, which don't have a response body
|
||||
# Inputs
|
||||
if status.startswith("1"):
|
||||
if interactive:
|
||||
print(meta)
|
||||
if status == "11":
|
||||
user_input = getpass.getpass("> ")
|
||||
else:
|
||||
#TODO:FIXME we should not ask for user input while non-interactive
|
||||
user_input = input("> ")
|
||||
newurl = url.split("?")[0]
|
||||
return _fetch_gemini(newurl+"?"+user_input)
|
||||
else:
|
||||
return None,None
|
||||
# Redirects
|
||||
elif status.startswith("3"):
|
||||
newurl = urllib.parse.urljoin(url,meta)
|
||||
if newurl == url:
|
||||
raise RuntimeError("URL redirects to itself!")
|
||||
elif newurl in previous_redirectors:
|
||||
raise RuntimeError("Caught in redirect loop!")
|
||||
elif len(previous_redirectors) == _MAX_REDIRECTS:
|
||||
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
|
||||
# TODO: redirections handling should be refactored
|
||||
# elif "interactive" in options and not options["interactive"]:
|
||||
# follow = self.automatic_choice
|
||||
# # Never follow cross-domain redirects without asking
|
||||
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
|
||||
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Never follow cross-protocol redirects without asking
|
||||
# elif new_gi.scheme != gi.scheme:
|
||||
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Don't follow *any* redirect without asking if auto-follow is off
|
||||
# elif not self.options["auto_follow_redirects"]:
|
||||
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Otherwise, follow away
|
||||
else:
|
||||
follow = "yes"
|
||||
if follow.strip().lower() not in ("y", "yes"):
|
||||
raise UserAbortException()
|
||||
previous_redirectors.add(url)
|
||||
# if status == "31":
|
||||
# # Permanent redirect
|
||||
# self.permanent_redirects[gi.url] = new_gi.url
|
||||
return _fetch_gemini(newurl)
|
||||
# Errors
|
||||
elif status.startswith("4") or status.startswith("5"):
|
||||
raise RuntimeError(meta)
|
||||
# Client cert
|
||||
elif status.startswith("6"):
|
||||
error = "Handling certificates for status 6X are not supported by offpunk\n"
|
||||
error += "See bug #31 for discussion about the problem"
|
||||
raise RuntimeError(error)
|
||||
# Invalid status
|
||||
elif not status.startswith("2"):
|
||||
raise RuntimeError("Server returned undefined status code %s!" % status)
|
||||
# If we're here, this must be a success and there's a response body
|
||||
assert status.startswith("2")
|
||||
mime = meta
|
||||
# Read the response body over the network
|
||||
fbody = f.read()
|
||||
# DEFAULT GEMINI MIME
|
||||
if mime == "":
|
||||
mime = "text/gemini; charset=utf-8"
|
||||
shortmime, mime_options = parse_mime(mime)
|
||||
if "charset" in mime_options:
|
||||
try:
|
||||
codecs.lookup(mime_options["charset"])
|
||||
except LookupError:
|
||||
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
|
||||
#If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header
|
||||
mime_options["charset"] = "UTF-8"
|
||||
if shortmime.startswith("text/"):
|
||||
#Get the charset and default to UTF-8 in none
|
||||
encoding = mime_options.get("charset", "UTF-8")
|
||||
try:
|
||||
body = fbody.decode(encoding)
|
||||
except UnicodeError:
|
||||
raise RuntimeError("Could not decode response body using %s\
|
||||
encoding declared in header!" % encoding)
|
||||
else:
|
||||
body = fbody
|
||||
cache = write_body(url,body,mime)
|
||||
return cache,newurl
|
||||
|
||||
|
||||
def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs):
|
||||
url = normalize_url(url)
|
||||
newurl = url
|
||||
path=None
|
||||
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
|
||||
#Firt, we look if we have a valid cache, even if offline
|
||||
#If we are offline, any cache is better than nothing
|
||||
if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
|
||||
path = get_cache_path(url)
|
||||
#if the cache is a folder, we should add a "/" at the end of the URL
|
||||
if not url.endswith("/") and os.path.isdir(get_cache_path(url,add_index=False)) :
|
||||
newurl = url+"/"
|
||||
elif offline and is_cache_valid(url,validity=0):
|
||||
path = get_cache_path(url)
|
||||
elif "://" in url and not offline:
|
||||
try:
|
||||
scheme = url.split("://")[0]
|
||||
if scheme not in standard_ports:
|
||||
if print_error:
|
||||
print("%s is not a supported protocol"%scheme)
|
||||
path = None
|
||||
elif scheme in ("http","https"):
|
||||
if _DO_HTTP:
|
||||
path=_fetch_http(url,**kwargs)
|
||||
else:
|
||||
print("HTTP requires python-requests")
|
||||
elif scheme == "gopher":
|
||||
path=_fetch_gopher(url,**kwargs)
|
||||
elif scheme == "finger":
|
||||
path=_fetch_finger(url,**kwargs)
|
||||
elif scheme == "gemini":
|
||||
path,newurl=_fetch_gemini(url,**kwargs)
|
||||
elif scheme == "spartan":
|
||||
path,newurl=_fetch_spartan(url,**kwargs)
|
||||
else:
|
||||
print("scheme %s not implemented yet"%scheme)
|
||||
except UserAbortException:
|
||||
return None, newurl
|
||||
except Exception as err:
|
||||
cache = set_error(url, err)
|
||||
# Print an error message
|
||||
# we fail silently when sync_only
|
||||
if isinstance(err, socket.gaierror):
|
||||
if print_error:
|
||||
print("ERROR: DNS error!")
|
||||
elif isinstance(err, ConnectionRefusedError):
|
||||
if print_error:
|
||||
print("ERROR1: Connection refused!")
|
||||
elif isinstance(err, ConnectionResetError):
|
||||
if print_error:
|
||||
print("ERROR2: Connection reset!")
|
||||
elif isinstance(err, (TimeoutError, socket.timeout)):
|
||||
if print_error:
|
||||
print("""ERROR3: Connection timed out!
|
||||
Slow internet connection? Use 'set timeout' to be more patient.""")
|
||||
elif isinstance(err, FileExistsError):
|
||||
if print_error:
|
||||
print("""ERROR5: Trying to create a directory which already exists
|
||||
in the cache : """)
|
||||
print(err)
|
||||
elif _DO_HTTP and isinstance(err,requests.exceptions.SSLError):
|
||||
if print_error:
|
||||
print("""ERROR6: Bad SSL certificate:\n""")
|
||||
print(err)
|
||||
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
|
||||
print("""set accept_bad_ssl_certificates True""")
|
||||
elif _DO_HTTP and isinstance(err,requests.exceptions.ConnectionError):
|
||||
if print_error:
|
||||
print("""ERROR7: Cannot connect to URL:\n""")
|
||||
print(str(err))
|
||||
else:
|
||||
if print_error:
|
||||
import traceback
|
||||
print("ERROR4: " + str(type(err)) + " : " + str(err))
|
||||
#print("\n" + str(err.with_traceback(None)))
|
||||
print(traceback.format_exc())
|
||||
return cache, newurl
|
||||
# We download images contained in the document (from full mode)
|
||||
if not offline and download_image_first and images_mode:
|
||||
renderer = ansicat.renderer_from_file(path,newurl)
|
||||
if renderer:
|
||||
for image in renderer.get_images(mode=images_mode):
|
||||
#Image should exist, should be an url (not a data image)
|
||||
#and should not be already cached
|
||||
if image and not image.startswith("data:image/") and not is_cache_valid(image):
|
||||
width = offutils.term_width() - 1
|
||||
toprint = "Downloading %s" %image
|
||||
toprint = toprint[:width]
|
||||
toprint += " "*(width-len(toprint))
|
||||
print(toprint,end="\r")
|
||||
#d_i_f and images_mode are False/None to avoid recursive downloading
|
||||
#if that ever happen
|
||||
fetch(image,offline=offline,download_image_first=False,\
|
||||
images_mode=None,validity=0,**kwargs)
|
||||
return path, newurl
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
|
||||
By default, netcache will returns a cached version of a given URL, downloading it \
|
||||
only if not existing. A validity duration, in seconds, can also be given so that \
|
||||
netcache downloads the content only if the existing cache is older than the validity."
|
||||
# Parse arguments
|
||||
parser = argparse.ArgumentParser(prog="netcache",description=descri)
|
||||
parser.add_argument("--path", action="store_true",
|
||||
help="return path to the cache instead of the content of the cache")
|
||||
parser.add_argument("--offline", action="store_true",
|
||||
help="Do not attempt to download, return cached version or error")
|
||||
parser.add_argument("--max-size", type=int,
|
||||
help="Cancel download of items above that size (value in Mb).")
|
||||
parser.add_argument("--timeout", type=int,
|
||||
help="Time to wait before cancelling connection (in second).")
|
||||
parser.add_argument("--cache-validity",type=int, default=0,
|
||||
help="maximum age, in second, of the cached version before \
|
||||
redownloading a new version")
|
||||
# No argument: write help
|
||||
parser.add_argument('url', metavar='URL', nargs='*',
|
||||
help='download URL and returns the content or the path to a cached version')
|
||||
# --validity : returns the date of the cached version, Null if no version
|
||||
# --force-download : download and replace cache, even if valid
|
||||
args = parser.parse_args()
|
||||
|
||||
param = {}
|
||||
|
||||
for u in args.url:
|
||||
if args.offline:
|
||||
path = get_cache_path(u)
|
||||
else:
|
||||
path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
|
||||
validity=args.cache_validity)
|
||||
if args.path:
|
||||
print(path)
|
||||
else:
|
||||
with open(path,"r") as f:
|
||||
print(f.read())
|
||||
f.close()
|
||||
|
||||
|
||||
if __name__== '__main__':
|
||||
main()
|
||||
from offpunk.netcache import main
|
||||
main()
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
# The following are the default redirections from Offpunk
|
||||
# Those are by default because they should make sens with offpunk
|
||||
|
||||
redirects = {
|
||||
"*twitter.com" : "nitter.net",
|
||||
"youtube.com" : "yewtu.be",
|
||||
"youtu.be" : "yewtu.be",
|
||||
"*reddit.com" : "teddit.net",
|
||||
"*medium.com" : "scribe.rip",
|
||||
}
|
||||
|
||||
|
||||
#following are blocked URLs. Visiting them with offpunk doesn’t make sense.
|
||||
#Blocking them will save a lot of bandwith
|
||||
|
||||
blocked = {
|
||||
"*facebook.com",
|
||||
"*facebook.net",
|
||||
"*fbcdn.net",
|
||||
"*linkedin.com",
|
||||
"*licdn.com",
|
||||
"*admanager.google.com",
|
||||
"*google-health-ads.blogspot.com",
|
||||
"*firebase.google.com",
|
||||
"*google-webfonts-helper.herokuapp.com",
|
||||
"*tiktok.com" ,
|
||||
"*doubleclick.net",
|
||||
"*google-analytics.com" ,
|
||||
"*ads.yahoo.com",
|
||||
"*advertising.amazon.com",
|
||||
"*advertising.theguardian.com",
|
||||
"*advertise.newrepublic.com",
|
||||
}
|
1911
offpunk.py
1911
offpunk.py
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,3 @@
|
|||
from offpunk import main
|
||||
main()
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,890 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import urllib.parse
|
||||
import argparse
|
||||
import requests
|
||||
import codecs
|
||||
import getpass
|
||||
import socket
|
||||
import ssl
|
||||
import glob
|
||||
import datetime
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from ssl import CertificateError
|
||||
from offpunk import ansicat, utils
|
||||
from offpunk.utils import _CACHE_PATH,_DATA_DIR,_CONFIG_DIR
|
||||
import time
|
||||
try:
|
||||
import chardet
|
||||
_HAS_CHARDET = True
|
||||
except ModuleNotFoundError:
|
||||
_HAS_CHARDET = False
|
||||
|
||||
try:
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
_HAS_CRYPTOGRAPHY = True
|
||||
_BACKEND = default_backend()
|
||||
except(ModuleNotFoundError,ImportError):
|
||||
_HAS_CRYPTOGRAPHY = False
|
||||
|
||||
if not os.path.exists(_CACHE_PATH):
|
||||
print("Creating cache directory {}".format(_CACHE_PATH))
|
||||
os.makedirs(_CACHE_PATH)
|
||||
|
||||
# This list is also used as a list of supported protocols
|
||||
standard_ports = {
|
||||
"gemini" : 1965,
|
||||
"gopher" : 70,
|
||||
"finger" : 79,
|
||||
"http" : 80,
|
||||
"https" : 443,
|
||||
"spartan": 300,
|
||||
}
|
||||
default_protocol = "gemini"
|
||||
|
||||
CRLF = '\r\n'
|
||||
DEFAULT_TIMEOUT = 10
|
||||
_MAX_REDIRECTS = 5
|
||||
|
||||
# monkey-patch Gemini support in urllib.parse
|
||||
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
|
||||
urllib.parse.uses_relative.append("gemini")
|
||||
urllib.parse.uses_netloc.append("gemini")
|
||||
urllib.parse.uses_relative.append("spartan")
|
||||
urllib.parse.uses_netloc.append("spartan")
|
||||
|
||||
|
||||
class UserAbortException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def parse_mime(mime):
|
||||
options = {}
|
||||
if mime:
|
||||
if ";" in mime:
|
||||
splited = mime.split(";",maxsplit=1)
|
||||
mime = splited[0]
|
||||
if len(splited) >= 1:
|
||||
options_list = splited[1].split()
|
||||
for o in options_list:
|
||||
spl = o.split("=",maxsplit=1)
|
||||
if len(spl) > 0:
|
||||
options[spl[0]] = spl[1]
|
||||
return mime, options
|
||||
|
||||
def normalize_url(url):
|
||||
if "://" not in url and ("./" not in url and url[0] != "/"):
|
||||
if not url.startswith("mailto:"):
|
||||
url = "gemini://" + url
|
||||
return url
|
||||
|
||||
|
||||
def cache_last_modified(url):
|
||||
if not url:
|
||||
return None
|
||||
path = get_cache_path(url)
|
||||
if path:
|
||||
return os.path.getmtime(path)
|
||||
else:
|
||||
print("ERROR : NO CACHE in cache_last_modified")
|
||||
return None
|
||||
|
||||
def is_cache_valid(url,validity=0):
|
||||
# Validity is the acceptable time for
|
||||
# a cache to be valid (in seconds)
|
||||
# If 0, then any cache is considered as valid
|
||||
# (use validity = 1 if you want to refresh everything)
|
||||
if utils.is_local(url):
|
||||
return True
|
||||
cache = get_cache_path(url)
|
||||
if cache :
|
||||
# If path is too long, we always return True to avoid
|
||||
# fetching it.
|
||||
if len(cache) > 259:
|
||||
print("We return False because path is too long")
|
||||
return False
|
||||
if os.path.exists(cache) and not os.path.isdir(cache):
|
||||
if validity > 0 :
|
||||
last_modification = cache_last_modified(url)
|
||||
now = time.time()
|
||||
age = now - last_modification
|
||||
return age < validity
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
#Cache has not been build
|
||||
return False
|
||||
else:
|
||||
#There’s not even a cache!
|
||||
return False
|
||||
|
||||
def get_cache_path(url):
|
||||
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
|
||||
# In that case, we need to reconstruct it
|
||||
#First, we parse the URL
|
||||
if not url:
|
||||
return None
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
|
||||
scheme = "file"
|
||||
elif parsed.scheme:
|
||||
scheme = parsed.scheme
|
||||
else:
|
||||
scheme = default_protocol
|
||||
if scheme in ["file","mailto","list"]:
|
||||
local = True
|
||||
host = ""
|
||||
port = None
|
||||
# file:// is 7 char
|
||||
if url.startswith("file://"):
|
||||
path = url[7:]
|
||||
elif scheme == "mailto":
|
||||
path = parsed.path
|
||||
elif url.startswith("list://"):
|
||||
listdir = os.path.join(_DATA_DIR,"lists")
|
||||
listname = url[7:].lstrip("/")
|
||||
if listname in [""]:
|
||||
name = "My Lists"
|
||||
path = listdir
|
||||
else:
|
||||
name = listname
|
||||
path = os.path.join(listdir, "%s.gmi"%listname)
|
||||
else:
|
||||
path = url
|
||||
else:
|
||||
local = False
|
||||
# Convert unicode hostname to punycode using idna RFC3490
|
||||
host = parsed.hostname #.encode("idna").decode()
|
||||
port = parsed.port or standard_ports.get(scheme, 0)
|
||||
# special gopher selector case
|
||||
if scheme == "gopher":
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
path = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
path = ""
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s"):
|
||||
mime = "binary"
|
||||
else:
|
||||
mime = "text/gopher"
|
||||
else:
|
||||
path = parsed.path
|
||||
if parsed.query:
|
||||
# we don’t add the query if path is too long because path above 260 char
|
||||
# are not supported and crash python.
|
||||
# Also, very long query are usually useless stuff
|
||||
if len(path+parsed.query) < 258:
|
||||
path += "/" + parsed.query
|
||||
|
||||
# Now, we have a partial path. Let’s make it full path.
|
||||
if local:
|
||||
cache_path = path
|
||||
elif scheme and host:
|
||||
cache_path = os.path.expanduser(_CACHE_PATH + scheme + "/" + host + path)
|
||||
#There’s an OS limitation of 260 characters per path.
|
||||
#We will thus cut the path enough to add the index afterward
|
||||
cache_path = cache_path[:249]
|
||||
# FIXME : this is a gross hack to give a name to
|
||||
# index files. This will break if the index is not
|
||||
# index.gmi. I don’t know how to know the real name
|
||||
# of the file. But first, we need to ensure that the domain name
|
||||
# finish by "/". Else, the cache will create a file, not a folder.
|
||||
if scheme.startswith("http"):
|
||||
index = "index.html"
|
||||
elif scheme == "finger":
|
||||
index = "index.txt"
|
||||
elif scheme == "gopher":
|
||||
index = "gophermap"
|
||||
else:
|
||||
index = "index.gmi"
|
||||
if path == "" or os.path.isdir(cache_path):
|
||||
if not cache_path.endswith("/"):
|
||||
cache_path += "/"
|
||||
if not url.endswith("/"):
|
||||
url += "/"
|
||||
if cache_path.endswith("/"):
|
||||
cache_path += index
|
||||
#sometimes, the index itself is a dir
|
||||
#like when folder/index.gmi?param has been created
|
||||
#and we try to access folder
|
||||
if os.path.isdir(cache_path):
|
||||
cache_path += "/" + index
|
||||
else:
|
||||
#URL is missing either a supported scheme or a valid host
|
||||
#print("Error: %s is not a supported url"%url)
|
||||
return None
|
||||
if len(cache_path) > 259:
|
||||
print("Path is too long. This is an OS limitation.\n\n")
|
||||
print(url)
|
||||
return None
|
||||
return cache_path
|
||||
|
||||
def write_body(url,body,mime=None):
|
||||
## body is a copy of the raw gemtext
|
||||
## Write_body() also create the cache !
|
||||
# DEFAULT GEMINI MIME
|
||||
mime, options = parse_mime(mime)
|
||||
cache_path = get_cache_path(url)
|
||||
if cache_path:
|
||||
if mime and mime.startswith("text/"):
|
||||
mode = "w"
|
||||
else:
|
||||
mode = "wb"
|
||||
cache_dir = os.path.dirname(cache_path)
|
||||
# If the subdirectory already exists as a file (not a folder)
|
||||
# We remove it (happens when accessing URL/subfolder before
|
||||
# URL/subfolder/file.gmi.
|
||||
# This causes loss of data in the cache
|
||||
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
|
||||
# If the subdirectory doesn’t exist, we recursively try to find one
|
||||
# until it exists to avoid a file blocking the creation of folders
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
with open(cache_path, mode=mode) as f:
|
||||
f.write(body)
|
||||
f.close()
|
||||
return cache_path
|
||||
|
||||
|
||||
def set_error(url,err):
|
||||
# If we get an error, we want to keep an existing cache
|
||||
# but we need to touch it or to create an empty one
|
||||
# to avoid hitting the error at each refresh
|
||||
cache = get_cache_path(url)
|
||||
if is_cache_valid(url):
|
||||
os.utime(cache)
|
||||
else:
|
||||
cache_dir = os.path.dirname(cache)
|
||||
root_dir = cache_dir
|
||||
while not os.path.exists(root_dir):
|
||||
root_dir = os.path.dirname(root_dir)
|
||||
if os.path.isfile(root_dir):
|
||||
os.remove(root_dir)
|
||||
os.makedirs(cache_dir,exist_ok=True)
|
||||
if os.path.isdir(cache_dir):
|
||||
with open(cache, "w") as c:
|
||||
c.write(str(datetime.datetime.now())+"\n")
|
||||
c.write("ERROR while caching %s\n\n" %url)
|
||||
c.write("*****\n\n")
|
||||
c.write(str(type(err)) + " = " + str(err))
|
||||
#cache.write("\n" + str(err.with_traceback(None)))
|
||||
c.write("\n*****\n\n")
|
||||
c.write("If you believe this error was temporary, type ""reload"".\n")
|
||||
c.write("The ressource will be tentatively fetched during next sync.\n")
|
||||
c.close()
|
||||
return cache
|
||||
|
||||
def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs):
|
||||
def too_large_error(url,length,max_size):
|
||||
err = "Size of %s is %s Mo\n"%(url,length)
|
||||
err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000)
|
||||
err += "To retrieve this content anyway, type 'reload'."
|
||||
return set_error(url,err)
|
||||
if accept_bad_ssl_certificates:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
|
||||
requests.packages.urllib3.disable_warnings()
|
||||
verify=False
|
||||
else:
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2'
|
||||
verify=True
|
||||
header = {}
|
||||
header["User-Agent"] = "Netcache"
|
||||
with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response:
|
||||
if "content-type" in response.headers:
|
||||
mime = response.headers['content-type']
|
||||
else:
|
||||
mime = None
|
||||
if "content-length" in response.headers:
|
||||
length = int(response.headers['content-length'])
|
||||
else:
|
||||
length = 0
|
||||
if max_size and length > max_size:
|
||||
response.close()
|
||||
return too_large_error(url,str(length/100),max_size)
|
||||
elif max_size and length == 0:
|
||||
body = b''
|
||||
downloaded = 0
|
||||
for r in response.iter_content():
|
||||
body += r
|
||||
#We divide max_size for streamed content
|
||||
#in order to catch them faster
|
||||
size = sys.getsizeof(body)
|
||||
max = max_size/2
|
||||
current = round(size*100/max,1)
|
||||
if current > downloaded:
|
||||
downloaded = current
|
||||
print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r')
|
||||
#print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
|
||||
if size > max_size/2:
|
||||
response.close()
|
||||
return too_large_error(url,"streaming",max_size)
|
||||
response.close()
|
||||
else:
|
||||
body = response.content
|
||||
response.close()
|
||||
if mime and "text/" in mime:
|
||||
body = body.decode("UTF-8","replace")
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed =urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or 70
|
||||
if len(parsed.path) >= 2:
|
||||
itemtype = parsed.path[1]
|
||||
selector = parsed.path[2:]
|
||||
else:
|
||||
itemtype = "1"
|
||||
selector = ""
|
||||
addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM)
|
||||
s = socket.create_connection((host,port))
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
if parsed.query:
|
||||
request = selector + "\t" + parsed.query
|
||||
else:
|
||||
request = selector
|
||||
request += "\r\n"
|
||||
s.sendall(request.encode("UTF-8"))
|
||||
response = s.makefile("rb").read()
|
||||
# Transcode response into UTF-8
|
||||
#if itemtype in ("0","1","h"):
|
||||
if not itemtype in ("9","g","I","s"):
|
||||
# Try most common encodings
|
||||
for encoding in ("UTF-8", "ISO-8859-1"):
|
||||
try:
|
||||
response = response.decode("UTF-8")
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
else:
|
||||
# try to find encoding
|
||||
if _HAS_CHARDET:
|
||||
detected = chardet.detect(response)
|
||||
response = response.decode(detected["encoding"])
|
||||
else:
|
||||
raise UnicodeDecodeError
|
||||
if itemtype == "0":
|
||||
mime = "text/gemini"
|
||||
elif itemtype == "1":
|
||||
mime = "text/gopher"
|
||||
elif itemtype == "h":
|
||||
mime = "text/html"
|
||||
elif itemtype in ("9","g","I","s"):
|
||||
mime = None
|
||||
else:
|
||||
# by default, we should consider Gopher
|
||||
mime = "text/gopher"
|
||||
cache = write_body(url,response,mime)
|
||||
return cache
|
||||
|
||||
def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs):
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
host = parsed.hostname
|
||||
port = parsed.port or standard_ports["finger"]
|
||||
query = parsed.path.lstrip("/") + "\r\n"
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
sock.settimeout(timeout)
|
||||
sock.send(query.encode())
|
||||
response = sock.makefile("rb").read().decode("UTF-8")
|
||||
cache = write_body(response,"text/plain")
|
||||
return cache
|
||||
|
||||
# Originally copied from reference spartan client by Michael Lazar
|
||||
def _fetch_spartan(url,**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["spartan"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
redirect_url = None
|
||||
with socket.create_connection((host,port)) as sock:
|
||||
if query:
|
||||
data = urllib.parse.unquote_to_bytes(query)
|
||||
else:
|
||||
data = b""
|
||||
encoded_host = host.encode("idna")
|
||||
ascii_path = urllib.parse.unquote_to_bytes(path)
|
||||
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
|
||||
sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data)))
|
||||
fp = sock.makefile("rb")
|
||||
response = fp.readline(4096).decode("ascii").strip("\r\n")
|
||||
parts = response.split(" ",maxsplit=1)
|
||||
code,meta = int(parts[0]),parts[1]
|
||||
if code == 2:
|
||||
body = fp.read()
|
||||
if meta.startswith("text"):
|
||||
body = body.decode("UTF-8")
|
||||
cache = write_body(url,body,meta)
|
||||
elif code == 3:
|
||||
redirect_url = url_parts._replace(path=meta).geturl()
|
||||
else:
|
||||
return set_error(url,"Spartan code %s: Error %s"%(code,meta))
|
||||
if redirect_url:
|
||||
cache = _fetch_spartan(redirect_url)
|
||||
return cache
|
||||
|
||||
def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None):
|
||||
"""
|
||||
Validate a TLS certificate in TOFU mode.
|
||||
|
||||
If the cryptography module is installed:
|
||||
- Check the certificate Common Name or SAN matches `host`
|
||||
- Check the certificate's not valid before date is in the past
|
||||
- Check the certificate's not valid after date is in the future
|
||||
|
||||
Whether the cryptography module is installed or not, check the
|
||||
certificate's fingerprint against the TOFU database to see if we've
|
||||
previously encountered a different certificate for this IP address and
|
||||
hostname.
|
||||
"""
|
||||
now = datetime.datetime.utcnow()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Using the cryptography module we can get detailed access
|
||||
# to the properties of even self-signed certs, unlike in
|
||||
# the standard ssl library...
|
||||
c = x509.load_der_x509_certificate(cert, _BACKEND)
|
||||
# Check certificate validity dates
|
||||
if accept_bad_ssl:
|
||||
if c.not_valid_before >= now:
|
||||
raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before))
|
||||
elif c.not_valid_after <= now:
|
||||
raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after))
|
||||
|
||||
# Check certificate hostnames
|
||||
names = []
|
||||
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
|
||||
if common_name:
|
||||
names.append(common_name[0].value)
|
||||
try:
|
||||
names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value])
|
||||
except x509.ExtensionNotFound:
|
||||
pass
|
||||
names = set(names)
|
||||
for name in names:
|
||||
try:
|
||||
ssl._dnsname_match(str(name), host)
|
||||
break
|
||||
except CertificateError:
|
||||
continue
|
||||
else:
|
||||
# If we didn't break out, none of the names were valid
|
||||
raise CertificateError("Hostname does not match certificate common name or any alternative names.")
|
||||
|
||||
sha = hashlib.sha256()
|
||||
sha.update(cert)
|
||||
fingerprint = sha.hexdigest()
|
||||
|
||||
db_path = os.path.join(_CONFIG_DIR, "tofu.db")
|
||||
db_conn = sqlite3.connect(db_path)
|
||||
db_cur = db_conn.cursor()
|
||||
|
||||
db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
|
||||
(hostname text, address text, fingerprint text,
|
||||
first_seen date, last_seen date, count integer)""")
|
||||
# Have we been here before?
|
||||
db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
|
||||
FROM cert_cache
|
||||
WHERE hostname=? AND address=?""", (host, address))
|
||||
cached_certs = db_cur.fetchall()
|
||||
|
||||
# If so, check for a match
|
||||
if cached_certs:
|
||||
max_count = 0
|
||||
most_frequent_cert = None
|
||||
for cached_fingerprint, first, last, count in cached_certs:
|
||||
if count > max_count:
|
||||
max_count = count
|
||||
most_frequent_cert = cached_fingerprint
|
||||
if fingerprint == cached_fingerprint:
|
||||
# Matched!
|
||||
db_cur.execute("""UPDATE cert_cache
|
||||
SET last_seen=?, count=?
|
||||
WHERE hostname=? AND address=? AND fingerprint=?""",
|
||||
(now, count+1, host, address, fingerprint))
|
||||
db_conn.commit()
|
||||
break
|
||||
else:
|
||||
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
|
||||
with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
|
||||
previous_cert = fp.read()
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
# Load the most frequently seen certificate to see if it has
|
||||
# expired
|
||||
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
|
||||
previous_ttl = previous_cert.not_valid_after - now
|
||||
print(previous_ttl)
|
||||
|
||||
print("****************************************")
|
||||
print("[SECURITY WARNING] Unrecognised certificate!")
|
||||
print("The certificate presented for {} ({}) has never been seen before.".format(host, address))
|
||||
print("This MIGHT be a Man-in-the-Middle attack.")
|
||||
print("A different certificate has previously been seen {} times.".format(max_count))
|
||||
if _HAS_CRYPTOGRAPHY:
|
||||
if previous_ttl < datetime.timedelta():
|
||||
print("That certificate has expired, which reduces suspicion somewhat.")
|
||||
else:
|
||||
print("That certificate is still valid for: {}".format(previous_ttl))
|
||||
print("****************************************")
|
||||
print("Attempt to verify the new certificate fingerprint out-of-band:")
|
||||
print(fingerprint)
|
||||
if automatic_choice:
|
||||
choice = automatic_choice
|
||||
else:
|
||||
choice = input("Accept this new certificate? Y/N ").strip().lower()
|
||||
if choice in ("y", "yes"):
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
else:
|
||||
raise Exception("TOFU Failure!")
|
||||
|
||||
# If not, cache this cert
|
||||
else:
|
||||
db_cur.execute("""INSERT INTO cert_cache
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(host, address, fingerprint, now, now, 1))
|
||||
db_conn.commit()
|
||||
certdir = os.path.join(_CONFIG_DIR, "cert_cache")
|
||||
if not os.path.exists(certdir):
|
||||
os.makedirs(certdir)
|
||||
with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
|
||||
fp.write(cert)
|
||||
|
||||
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
|
||||
**kwargs):
|
||||
cache = None
|
||||
url_parts = urllib.parse.urlparse(url)
|
||||
host = url_parts.hostname
|
||||
port = url_parts.port or standard_ports["gemini"]
|
||||
path = url_parts.path or "/"
|
||||
query = url_parts.query
|
||||
# In AV-98, this was the _send_request method
|
||||
#Send a selector to a given host and port.
|
||||
#Returns the resolved address and binary file with the reply."""
|
||||
host = host.encode("idna").decode()
|
||||
# Do DNS resolution
|
||||
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
|
||||
if ":" in host:
|
||||
# This is likely a literal IPv6 address, so we can *only* ask for
|
||||
# IPv6 addresses or getaddrinfo will complain
|
||||
family_mask = socket.AF_INET6
|
||||
elif socket.has_ipv6:
|
||||
# Accept either IPv4 or IPv6 addresses
|
||||
family_mask = 0
|
||||
else:
|
||||
# IPv4 only
|
||||
family_mask = socket.AF_INET
|
||||
addresses = socket.getaddrinfo(host, port, family=family_mask,
|
||||
type=socket.SOCK_STREAM)
|
||||
# Sort addresses so IPv6 ones come first
|
||||
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
|
||||
## Continuation of send_request
|
||||
# Prepare TLS context
|
||||
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
|
||||
context = ssl.SSLContext(protocol)
|
||||
context.check_hostname=False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
# Impose minimum TLS version
|
||||
## In 3.7 and above, this is easy...
|
||||
if sys.version_info.minor >= 7:
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
## Otherwise, it seems very hard...
|
||||
## The below is less strict than it ought to be, but trying to disable
|
||||
## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
|
||||
## with recent versions of OpenSSL. What a mess...
|
||||
else:
|
||||
context.options |= ssl.OP_NO_SSLv3
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
# Try to enforce sensible ciphers
|
||||
try:
|
||||
context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH")
|
||||
except ssl.SSLError:
|
||||
# Rely on the server to only support sensible things, I guess...
|
||||
pass
|
||||
# Connect to remote host by any address possible
|
||||
err = None
|
||||
for address in addresses:
|
||||
s = socket.socket(address[0], address[1])
|
||||
s.settimeout(timeout)
|
||||
s = context.wrap_socket(s, server_hostname = host)
|
||||
try:
|
||||
s.connect(address[4])
|
||||
break
|
||||
except OSError as e:
|
||||
err = e
|
||||
else:
|
||||
# If we couldn't connect to *any* of the addresses, just
|
||||
# bubble up the exception from the last attempt and deny
|
||||
# knowledge of earlier failures.
|
||||
raise err
|
||||
|
||||
# Do TOFU
|
||||
cert = s.getpeercert(binary_form=True)
|
||||
# Remember that we showed the current cert to this domain...
|
||||
#TODO : accept badssl and automatic choice
|
||||
_validate_cert(address[4][0], host, cert,automatic_choice="y")
|
||||
# Send request and wrap response in a file descriptor
|
||||
url = urllib.parse.urlparse(url)
|
||||
new_netloc = host
|
||||
if port != standard_ports["gemini"]:
|
||||
new_netloc += ":" + str(port)
|
||||
url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
|
||||
s.sendall((url + CRLF).encode("UTF-8"))
|
||||
f= s.makefile(mode = "rb")
|
||||
## end of send_request in AV98
|
||||
# Spec dictates <META> should not exceed 1024 bytes,
|
||||
# so maximum valid header length is 1027 bytes.
|
||||
header = f.readline(1027)
|
||||
header = urllib.parse.unquote(header.decode("UTF-8"))
|
||||
if not header or header[-1] != '\n':
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
header = header.strip()
|
||||
# Validate header
|
||||
status, meta = header.split(maxsplit=1)
|
||||
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
|
||||
f.close()
|
||||
raise RuntimeError("Received invalid header from server!")
|
||||
# Update redirect loop/maze escaping state
|
||||
if not status.startswith("3"):
|
||||
previous_redirectors = set()
|
||||
#TODO FIXME
|
||||
else:
|
||||
#we set a previous_redirectors anyway because refactoring in progress
|
||||
previous_redirectors = set()
|
||||
# Handle non-SUCCESS headers, which don't have a response body
|
||||
# Inputs
|
||||
if status.startswith("1"):
|
||||
if interactive:
|
||||
print(meta)
|
||||
if status == "11":
|
||||
user_input = getpass.getpass("> ")
|
||||
else:
|
||||
#TODO:FIXME we should not ask for user input while non-interactive
|
||||
user_input = input("> ")
|
||||
return _fetch_gemini(query(user_input))
|
||||
else:
|
||||
return None
|
||||
# Redirects
|
||||
elif status.startswith("3"):
|
||||
newurl = urllib.parse.urljoin(url,meta)
|
||||
if newurl == url:
|
||||
raise RuntimeError("URL redirects to itself!")
|
||||
elif newurl in previous_redirectors:
|
||||
raise RuntimeError("Caught in redirect loop!")
|
||||
elif len(previous_redirectors) == _MAX_REDIRECTS:
|
||||
raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS)
|
||||
# TODO: redirections handling should be refactored
|
||||
# elif "interactive" in options and not options["interactive"]:
|
||||
# follow = self.automatic_choice
|
||||
# # Never follow cross-domain redirects without asking
|
||||
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
|
||||
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Never follow cross-protocol redirects without asking
|
||||
# elif new_gi.scheme != gi.scheme:
|
||||
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Don't follow *any* redirect without asking if auto-follow is off
|
||||
# elif not self.options["auto_follow_redirects"]:
|
||||
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
|
||||
# # Otherwise, follow away
|
||||
else:
|
||||
follow = "yes"
|
||||
if follow.strip().lower() not in ("y", "yes"):
|
||||
raise UserAbortException()
|
||||
previous_redirectors.add(url)
|
||||
# if status == "31":
|
||||
# # Permanent redirect
|
||||
# self.permanent_redirects[gi.url] = new_gi.url
|
||||
return _fetch_gemini(newurl)
|
||||
# Errors
|
||||
elif status.startswith("4") or status.startswith("5"):
|
||||
raise RuntimeError(meta)
|
||||
# Client cert
|
||||
elif status.startswith("6"):
|
||||
print("Handling certificates for status 6X are not supported by offpunk\n")
|
||||
print("Please open a bug report")
|
||||
_fetch_gemini(url)
|
||||
# Invalid status
|
||||
elif not status.startswith("2"):
|
||||
raise RuntimeError("Server returned undefined status code %s!" % status)
|
||||
# If we're here, this must be a success and there's a response body
|
||||
assert status.startswith("2")
|
||||
mime = meta
|
||||
# Read the response body over the network
|
||||
fbody = f.read()
|
||||
# DEFAULT GEMINI MIME
|
||||
if mime == "":
|
||||
mime = "text/gemini; charset=utf-8"
|
||||
shortmime, mime_options = parse_mime(mime)
|
||||
if "charset" in mime_options:
|
||||
try:
|
||||
codecs.lookup(mime_options["charset"])
|
||||
except LookupError:
|
||||
#raise RuntimeError("Header declared unknown encoding %s" % mime_options)
|
||||
#If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header
|
||||
mime_options["charset"] = "UTF-8"
|
||||
if shortmime.startswith("text/"):
|
||||
#Get the charset and default to UTF-8 in none
|
||||
encoding = mime_options.get("charset", "UTF-8")
|
||||
try:
|
||||
body = fbody.decode(encoding)
|
||||
except UnicodeError:
|
||||
raise RuntimeError("Could not decode response body using %s\
|
||||
encoding declared in header!" % encoding)
|
||||
else:
|
||||
body = fbody
|
||||
cache = write_body(url,body,mime)
|
||||
return cache
|
||||
|
||||
|
||||
def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs):
|
||||
url = normalize_url(url)
|
||||
path=None
|
||||
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
|
||||
if is_cache_valid(url,validity=validity):
|
||||
path = get_cache_path(url)
|
||||
#If we are offline, any cache is better than nothing
|
||||
elif offline and is_cache_valid(url,validity=0):
|
||||
path = get_cache_path(url)
|
||||
elif "://" in url and not offline:
|
||||
try:
|
||||
scheme = url.split("://")[0]
|
||||
if scheme not in standard_ports:
|
||||
if print_error:
|
||||
print("%s is not a supported protocol"%scheme)
|
||||
path = None
|
||||
elif scheme in ("http","https"):
|
||||
path=_fetch_http(url,**kwargs)
|
||||
elif scheme == "gopher":
|
||||
path=_fetch_gopher(url,**kwargs)
|
||||
elif scheme == "finger":
|
||||
path=_fetch_finger(url,**kwargs)
|
||||
elif scheme == "gemini":
|
||||
path=_fetch_gemini(url,**kwargs)
|
||||
else:
|
||||
print("scheme %s not implemented yet")
|
||||
except UserAbortException:
|
||||
return
|
||||
except Exception as err:
|
||||
cache = set_error(url, err)
|
||||
# Print an error message
|
||||
# we fail silently when sync_only
|
||||
if isinstance(err, socket.gaierror):
|
||||
if print_error:
|
||||
print("ERROR: DNS error!")
|
||||
elif isinstance(err, ConnectionRefusedError):
|
||||
if print_error:
|
||||
print("ERROR1: Connection refused!")
|
||||
elif isinstance(err, ConnectionResetError):
|
||||
if print_error:
|
||||
print("ERROR2: Connection reset!")
|
||||
elif isinstance(err, (TimeoutError, socket.timeout)):
|
||||
if print_error:
|
||||
print("""ERROR3: Connection timed out!
|
||||
Slow internet connection? Use 'set timeout' to be more patient.""")
|
||||
elif isinstance(err, FileExistsError):
|
||||
if print_error:
|
||||
print("""ERROR5: Trying to create a directory which already exists
|
||||
in the cache : """)
|
||||
print(err)
|
||||
elif isinstance(err,requests.exceptions.SSLError):
|
||||
if print_error:
|
||||
print("""ERROR6: Bad SSL certificate:\n""")
|
||||
print(err)
|
||||
print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""")
|
||||
print("""set accept_bad_ssl_certificates True""")
|
||||
elif isinstance(err,requests.exceptions.ConnectionError):
|
||||
if print_error:
|
||||
print("""ERROR7: Cannot connect to URL:\n""")
|
||||
print(str(err))
|
||||
else:
|
||||
if print_error:
|
||||
import traceback
|
||||
print("ERROR4: " + str(type(err)) + " : " + str(err))
|
||||
#print("\n" + str(err.with_traceback(None)))
|
||||
print(traceback.format_exc())
|
||||
return cache
|
||||
# We download images contained in the document (from full mode)
|
||||
if not offline and download_image_first and images_mode:
|
||||
renderer = ansicat.renderer_from_file(path,url)
|
||||
if renderer:
|
||||
for image in renderer.get_images(mode=images_mode):
|
||||
#Image should exist, should be an url (not a data image)
|
||||
#and should not be already cached
|
||||
if image and not image.startswith("data:image/") and not is_cache_valid(image):
|
||||
width = utils.term_width() - 1
|
||||
toprint = "Downloading %s" %image
|
||||
toprint = toprint[:width]
|
||||
toprint += " "*(width-len(toprint))
|
||||
print(toprint,end="\r")
|
||||
#d_i_f and images_mode are False/None to avoid recursive downloading
|
||||
#if that ever happen
|
||||
fetch(image,offline=offline,download_image_first=False,\
|
||||
images_mode=None,validity=0,**kwargs)
|
||||
return path
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
# Parse arguments
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--path", action="store_true",
|
||||
help="return path to the cache instead of the content of the cache")
|
||||
parser.add_argument("--offline", action="store_true",
|
||||
help="Do not attempt to download, return cached version or error")
|
||||
parser.add_argument("--max-size", type=int,
|
||||
help="Cancel download of items above that size (value in Mb).")
|
||||
parser.add_argument("--timeout", type=int,
|
||||
help="Time to wait before cancelling connection (in second).")
|
||||
# No argument: write help
|
||||
parser.add_argument('url', metavar='URL', nargs='*',
|
||||
help='download URL and returns the content or the path to a cached version')
|
||||
# arg = URL: download and returns cached URI
|
||||
# --cache-validity : do not download if cache is valid
|
||||
# --validity : returns the date of the cached version, Null if no version
|
||||
# --force-download : download and replace cache, even if valid
|
||||
args = parser.parse_args()
|
||||
|
||||
param = {}
|
||||
|
||||
for u in args.url:
|
||||
if args.offline:
|
||||
path = get_cache_path(u)
|
||||
else:
|
||||
print("Download URL: %s" %u)
|
||||
path = fetch(u,max_size=args.max_size,timeout=args.timeout)
|
||||
if args.path:
|
||||
print(path)
|
||||
else:
|
||||
with open(path,"r") as f:
|
||||
print(f.read())
|
||||
f.close()
|
||||
|
||||
|
||||
if __name__== '__main__':
|
||||
main()
|
|
@ -0,0 +1,281 @@
|
|||
#!/usr/bin/env python3
|
||||
#opnk stand for "Open like a PuNK".
|
||||
#It will open any file or URL and display it nicely in less.
|
||||
#If not possible, it will fallback to xdg-open
|
||||
#URL are retrieved through netcache
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import argparse
|
||||
import shutil
|
||||
import time
|
||||
import fnmatch
|
||||
from offpunk import ansicat, netcache, utils
|
||||
from offpunk.utils import run,term_width,mode_url,unmode_url,is_local
|
||||
|
||||
_HAS_XDGOPEN = shutil.which('xdg-open')
|
||||
_GREP = "grep --color=auto"
|
||||
|
||||
less_version = 0
|
||||
if not shutil.which("less"):
|
||||
print("Please install the pager \"less\" to run Offpunk.")
|
||||
print("If you wish to use another pager, send me an email !")
|
||||
print("(I’m really curious to hear about people not having \"less\" on their system.)")
|
||||
sys.exit()
|
||||
output = run("less --version")
|
||||
# We get less Version (which is the only integer on the first line)
|
||||
words = output.split("\n")[0].split()
|
||||
less_version = 0
|
||||
for w in words:
|
||||
if w.isdigit():
|
||||
less_version = int(w)
|
||||
# restoring position only works for version of less > 572
|
||||
if less_version >= 572:
|
||||
_LESS_RESTORE_POSITION = True
|
||||
else:
|
||||
_LESS_RESTORE_POSITION = False
|
||||
#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s"
|
||||
# -E : quit when reaching end of file (to behave like "cat")
|
||||
# -F : quit if content fits the screen (behave like "cat")
|
||||
# -X : does not clear the screen
|
||||
# -R : interpret ANSI colors correctly
|
||||
# -f : suppress warning for some contents
|
||||
# -M : long prompt (to have info about where you are in the file)
|
||||
# -W : hilite the new first line after a page skip (space)
|
||||
# -i : ignore case in search
|
||||
# -S : do not wrap long lines. Wrapping is done by offpunk, longlines
|
||||
# are there on purpose (surch in asciiart)
|
||||
#--incsearch : incremental search starting rev581
|
||||
if less_version >= 581:
|
||||
less_base = "less --incsearch --save-marks -~ -XRfMWiS"
|
||||
elif less_version >= 572:
|
||||
less_base = "less --save-marks -XRfMWiS"
|
||||
else:
|
||||
less_base = "less -XRfMWiS"
|
||||
_DEFAULT_LESS = less_base + " \"+''\" %s"
|
||||
_DEFAULT_CAT = less_base + " -EF %s"
|
||||
|
||||
def less_cmd(file, histfile=None,cat=False,grep=None):
|
||||
if histfile:
|
||||
env = {"LESSHISTFILE": histfile}
|
||||
else:
|
||||
env = {}
|
||||
if cat:
|
||||
cmd_str = _DEFAULT_CAT
|
||||
elif grep:
|
||||
grep_cmd = _GREP
|
||||
#case insensitive for lowercase search
|
||||
if grep.islower():
|
||||
grep_cmd += " -i"
|
||||
cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep
|
||||
else:
|
||||
cmd_str = _DEFAULT_LESS
|
||||
run(cmd_str, parameter=file, direct_output=True, env=env)
|
||||
|
||||
class opencache():
|
||||
def __init__(self):
|
||||
# We have a cache of the rendering of file and, for each one,
|
||||
# a less_histfile containing the current position in the file
|
||||
self.temp_files = {}
|
||||
self.less_histfile = {}
|
||||
# This dictionary contains an url -> ansirenderer mapping. This allows
|
||||
# to reuse a renderer when visiting several times the same URL during
|
||||
# the same session
|
||||
# We save the time at which the renderer was created in renderer_time
|
||||
# This way, we can invalidate the renderer if a new version of the source
|
||||
# has been downloaded
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.mime_handlers = {}
|
||||
self.last_mode = {}
|
||||
self.last_width = term_width(absolute=True)
|
||||
|
||||
def _get_handler_cmd(self, mimetype):
|
||||
# Now look for a handler for this mimetype
|
||||
# Consider exact matches before wildcard matches
|
||||
exact_matches = []
|
||||
wildcard_matches = []
|
||||
for handled_mime, cmd_str in self.mime_handlers.items():
|
||||
if "*" in handled_mime:
|
||||
wildcard_matches.append((handled_mime, cmd_str))
|
||||
else:
|
||||
exact_matches.append((handled_mime, cmd_str))
|
||||
for handled_mime, cmd_str in exact_matches + wildcard_matches:
|
||||
if fnmatch.fnmatch(mimetype, handled_mime):
|
||||
break
|
||||
else:
|
||||
# Use "xdg-open" as a last resort.
|
||||
if _HAS_XDGOPEN:
|
||||
cmd_str = "xdg-open %s"
|
||||
else:
|
||||
cmd_str = "echo \"Can’t find how to open \"%s"
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return cmd_str
|
||||
|
||||
# Return the handler for a specific mimetype.
|
||||
# Return the whole dic if no specific mime provided
|
||||
def get_handlers(self,mime=None):
|
||||
if mime and mime in self.mime_handlers.keys():
|
||||
return self.mime_handlers[mime]
|
||||
elif mime:
|
||||
return None
|
||||
else:
|
||||
return self.mime_handlers
|
||||
|
||||
def set_handler(self,mime,handler):
|
||||
previous = None
|
||||
if mime in self.mime_handlers.keys():
|
||||
previous = self.mime_handlers[mime]
|
||||
self.mime_handlers[mime] = handler
|
||||
if "%s" not in handler:
|
||||
print("WARNING: this handler has no %%s, no filename will be provided to the command")
|
||||
if previous:
|
||||
print("Previous handler was %s"%previous)
|
||||
|
||||
def get_renderer(self,inpath,mode=None,theme=None):
|
||||
# We remove the ##offpunk_mode= from the URL
|
||||
# If mode is already set, we don’t use the part from the URL
|
||||
inpath,newmode = unmode_url(inpath)
|
||||
if not mode: mode = newmode
|
||||
# If we still doesn’t have a mode, we see if we used one before
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
elif not mode:
|
||||
#default mode is readable
|
||||
mode = "readable"
|
||||
renderer = None
|
||||
path = netcache.get_cache_path(inpath)
|
||||
if path:
|
||||
usecache = inpath in self.rendererdic.keys() and not is_local(inpath)
|
||||
#Screen size may have changed
|
||||
width = term_width(absolute=True)
|
||||
if usecache and self.last_width != width:
|
||||
self.cleanup()
|
||||
usecache = False
|
||||
self.last_width = width
|
||||
if usecache:
|
||||
if inpath in self.renderer_time.keys():
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = self.renderer_time[inpath]
|
||||
usecache = last_cached > last_downloaded
|
||||
else:
|
||||
usecache = False
|
||||
if not usecache:
|
||||
renderer = ansicat.renderer_from_file(path,inpath,theme=theme)
|
||||
if renderer:
|
||||
self.rendererdic[inpath] = renderer
|
||||
self.renderer_time[inpath] = int(time.time())
|
||||
else:
|
||||
renderer = self.rendererdic[inpath]
|
||||
return renderer
|
||||
|
||||
def get_temp_filename(self,url):
|
||||
if url in self.temp_files.keys():
|
||||
return self.temp_files[url]
|
||||
else:
|
||||
return None
|
||||
|
||||
def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs):
|
||||
#Return True if inpath opened in Terminal
|
||||
# False otherwise
|
||||
#if terminal = False, we don’t try to open in the terminal,
|
||||
#we immediately fallback to xdg-open.
|
||||
#netcache currently provide the path if it’s a file.
|
||||
#may this should be migrated here.
|
||||
if not utils.is_local(inpath):
|
||||
kwargs["images_mode"] = mode
|
||||
cachepath = netcache.fetch(inpath,**kwargs)
|
||||
if not cachepath:
|
||||
return False
|
||||
elif "://" in inpath:
|
||||
cachepath = netcache.fetch(inpath,**kwargs)
|
||||
elif os.path.exists(inpath):
|
||||
cachepath = inpath
|
||||
else:
|
||||
print("%s does not exist"%inpath)
|
||||
return
|
||||
renderer = self.get_renderer(inpath,mode=mode,theme=theme)
|
||||
if renderer and mode:
|
||||
renderer.set_mode(mode)
|
||||
self.last_mode[inpath] = mode
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
renderer.set_mode(mode)
|
||||
#we use the full moded url as key for the dictionary
|
||||
key = mode_url(inpath,mode)
|
||||
if terminal and renderer:
|
||||
#If this is an image and we have chafa/timg, we
|
||||
#don’t use less, we call it directly
|
||||
if renderer.has_direct_display():
|
||||
renderer.display(mode=mode,directdisplay=True)
|
||||
return True
|
||||
else:
|
||||
body = renderer.display(mode=mode)
|
||||
#Should we use the cache ? only if it is not local and there’s a cache
|
||||
usecache = key in self.temp_files and not is_local(inpath)
|
||||
if usecache:
|
||||
#and the cache is still valid!
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = os.path.getmtime(self.temp_files[key])
|
||||
if last_downloaded > last_cached:
|
||||
usecache = False
|
||||
self.temp_files.pop(key)
|
||||
self.less_histfile.pop(key)
|
||||
# We actually put the body in a tmpfile before giving it to less
|
||||
if not usecache:
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.temp_files[key] = tmpf.name
|
||||
tmpf.write(body)
|
||||
tmpf.close()
|
||||
if key not in self.less_histfile:
|
||||
firsttime = True
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.less_histfile[key] = tmpf.name
|
||||
else:
|
||||
#We don’t want to restore positions in lists
|
||||
firsttime = is_local(inpath)
|
||||
less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep)
|
||||
return True
|
||||
#maybe, we have no renderer. Or we want to skip it.
|
||||
else:
|
||||
mimetype = ansicat.get_mime(cachepath)
|
||||
if mimetype == "mailto":
|
||||
resp = input("Send an email to %s Y/N? " %inpath)
|
||||
if resp.strip().lower() in ("y", "yes"):
|
||||
if _HAS_XDGOPEN :
|
||||
run("xdg-open mailto:%s", parameter=inpath ,direct_output=True)
|
||||
else:
|
||||
print("Cannot find a mail client to send mail to %s" %inpath)
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return
|
||||
else:
|
||||
cmd_str = self._get_handler_cmd(mimetype)
|
||||
try:
|
||||
run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True)
|
||||
except FileNotFoundError:
|
||||
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
|
||||
print("You can use the ! command to specify another handler program or pipeline.")
|
||||
return False
|
||||
|
||||
#We remove the renderers from the cache and we also delete temp files
|
||||
def cleanup(self):
|
||||
while len(self.temp_files) > 0:
|
||||
os.remove(self.temp_files.popitem()[1])
|
||||
while len(self.less_histfile) > 0:
|
||||
os.remove(self.less_histfile.popitem()[1])
|
||||
self.last_width = None
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.last_mode = {}
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("content",metavar="INPUT", nargs="*",
|
||||
default=sys.stdin, help="Path to the file or URL to open")
|
||||
args = parser.parse_args()
|
||||
cache = opencache()
|
||||
for f in args.content:
|
||||
cache.opnk(f)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -37,7 +37,6 @@ offpunk1 = {
|
|||
"subtitle" : ["blue"],
|
||||
"subsubtitle" : ["blue","faint"], #fallback to subtitle if none
|
||||
"link" : ["blue","faint"],
|
||||
"new_link": ["bold"],
|
||||
"oneline_link": [], #for gopher/gemini. fallback to link if none
|
||||
"image_link" : ["yellow","faint"],
|
||||
"preformatted": ["faint"],
|
|
@ -0,0 +1,152 @@
|
|||
#!/bin/python
|
||||
|
||||
#This file contains some utilities common to offpunk, ansirenderer and netcache.
|
||||
#Currently, there are the following utilities:
|
||||
#
|
||||
# run : run a shell command and get the results with some security
|
||||
# term_width : get or set the width to display on the terminal
|
||||
|
||||
import os
|
||||
import io
|
||||
import subprocess
|
||||
import shutil
|
||||
import shlex
|
||||
import urllib.parse
|
||||
import urllib.parse
|
||||
from offpunk import cache_migration
|
||||
|
||||
CACHE_VERSION = 1
|
||||
|
||||
## Config directories
|
||||
## We implement our own python-xdg to avoid conflict with existing libraries.
|
||||
_home = os.path.expanduser('~')
|
||||
data_home = os.environ.get('XDG_DATA_HOME') or \
|
||||
os.path.join(_home,'.local','share')
|
||||
config_home = os.environ.get('XDG_CONFIG_HOME') or \
|
||||
os.path.join(_home,'.config')
|
||||
_CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/")
|
||||
_DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/")
|
||||
_old_config = os.path.expanduser("~/.offpunk/")
|
||||
## Look for pre-existing config directory, if any
|
||||
if os.path.exists(_old_config):
|
||||
_CONFIG_DIR = _old_config
|
||||
#if no XDG .local/share and not XDG .config, we use the old config
|
||||
if not os.path.exists(data_home) and os.path.exists(_old_config):
|
||||
_DATA_DIR = _CONFIG_DIR
|
||||
cache_home = os.environ.get('XDG_CACHE_HOME') or\
|
||||
os.path.join(_home,'.cache')
|
||||
_CACHE_PATH = os.path.join(os.path.expanduser(cache_home),"offpunk/")
|
||||
os.makedirs(_CACHE_PATH,exist_ok=True)
|
||||
|
||||
#Let’s read current version of the cache
|
||||
version_path = _CACHE_PATH + ".version"
|
||||
current_version = 0
|
||||
if os.path.exists(version_path):
|
||||
current_str = None
|
||||
with open(version_path) as f:
|
||||
current_str = f.read()
|
||||
f.close()
|
||||
try:
|
||||
current_version = int(current_str)
|
||||
except:
|
||||
current_version = 0
|
||||
|
||||
#Now, let’s upgrade the cache if needed
|
||||
while current_version < CACHE_VERSION:
|
||||
current_version += 1
|
||||
upgrade_func = getattr(cache_migration,"upgrade_to_"+str(current_version))
|
||||
upgrade_func(_CACHE_PATH)
|
||||
with open(version_path,"w") as f:
|
||||
f.write(str(current_version))
|
||||
f.close()
|
||||
|
||||
|
||||
|
||||
## Those two functions add/remove the mode to the
|
||||
# URLs. This is a gross hack to remember the mode
|
||||
def mode_url(url,mode):
|
||||
if mode and mode!= "readable" and "##offpunk=" not in url:
|
||||
url += "##offpunk_mode=" + mode
|
||||
return url
|
||||
|
||||
def unmode_url(url):
|
||||
mode = None
|
||||
splitted = url.split("##offpunk_mode=")
|
||||
if len(splitted) > 1:
|
||||
url = splitted[0]
|
||||
mode = splitted[1]
|
||||
return [url,mode]
|
||||
|
||||
# In terms of arguments, this can take an input file/string to be passed to
|
||||
# stdin, a parameter to do (well-escaped) "%" replacement on the command, a
|
||||
# flag requesting that the output go directly to the stdout, and a list of
|
||||
# additional environment variables to set.
|
||||
def run(cmd, *, input=None, parameter=None, direct_output=False, env={}):
|
||||
if parameter:
|
||||
cmd = cmd % shlex.quote(parameter)
|
||||
e = os.environ
|
||||
e.update(env)
|
||||
if isinstance(input, io.IOBase):
|
||||
stdin = input
|
||||
input = None
|
||||
else:
|
||||
if input:
|
||||
input = input.encode()
|
||||
stdin = None
|
||||
if not direct_output:
|
||||
# subprocess.check_output() wouldn't allow us to pass stdin.
|
||||
result = subprocess.run(cmd, check=True, env=e, input=input,
|
||||
shell=True, stdin=stdin, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
return result.stdout.decode()
|
||||
else:
|
||||
subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin)
|
||||
|
||||
|
||||
global TERM_WIDTH
|
||||
TERM_WIDTH = 72
|
||||
|
||||
#if absolute, returns the real terminal width, not the text width
|
||||
def term_width(new_width=None,absolute=False):
|
||||
if new_width:
|
||||
global TERM_WIDTH
|
||||
TERM_WIDTH = new_width
|
||||
cur = shutil.get_terminal_size()[0]
|
||||
if absolute:
|
||||
return cur
|
||||
width = TERM_WIDTH
|
||||
if cur < width:
|
||||
width = cur
|
||||
return width
|
||||
|
||||
def is_local(url):
|
||||
if not url: return True
|
||||
elif "://" in url:
|
||||
scheme,path = url.split("://",maxsplit=1)
|
||||
return scheme in ["file","mail","list","mailto"]
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
# This method return the image URL or invent it if it’s a base64 inline image
|
||||
# It returns [url,image_data] where image_data is None for normal image
|
||||
def looks_like_base64(src,baseurl):
|
||||
imgdata = None
|
||||
imgname = src
|
||||
if src and src.startswith("data:image/"):
|
||||
if ";base64," in src:
|
||||
splitted = src.split(";base64,")
|
||||
#splitted[0] is something like data:image/jpg
|
||||
if "/" in splitted[0]:
|
||||
extension = splitted[0].split("/")[1]
|
||||
else:
|
||||
extension = "data"
|
||||
imgdata = splitted[1]
|
||||
imgname = imgdata[:20] + "." + extension
|
||||
imgurl = urllib.parse.urljoin(baseurl, imgname)
|
||||
else:
|
||||
#We can’t handle other data:image such as svg for now
|
||||
imgurl = None
|
||||
else:
|
||||
imgurl = urllib.parse.urljoin(baseurl, imgname)
|
||||
return imgurl,imgdata
|
226
offutils.py
226
offutils.py
|
@ -1,226 +0,0 @@
|
|||
#!/bin/python
|
||||
|
||||
#This file contains some utilities common to offpunk, ansirenderer and netcache.
|
||||
#Currently, there are the following utilities:
|
||||
#
|
||||
# run : run a shell command and get the results with some security
|
||||
# term_width : get or set the width to display on the terminal
|
||||
|
||||
import os
|
||||
import io
|
||||
import subprocess
|
||||
import shutil
|
||||
import shlex
|
||||
import urllib.parse
|
||||
import urllib.parse
|
||||
import netcache_migration
|
||||
import netcache
|
||||
|
||||
CACHE_VERSION = 1
|
||||
|
||||
# We upgrade the cache only once at startup, hence the UPGRADED variable
|
||||
# This is only to avoid unecessary checks each time the cache is accessed
|
||||
UPGRADED=False
|
||||
def upgrade_cache(cache_folder):
|
||||
#Let’s read current version of the cache
|
||||
version_path = cache_folder + ".version"
|
||||
current_version = 0
|
||||
if os.path.exists(version_path):
|
||||
current_str = None
|
||||
with open(version_path) as f:
|
||||
current_str = f.read()
|
||||
f.close()
|
||||
try:
|
||||
current_version = int(current_str)
|
||||
except:
|
||||
current_version = 0
|
||||
#Now, let’s upgrade the cache if needed
|
||||
while current_version < CACHE_VERSION:
|
||||
current_version += 1
|
||||
upgrade_func = getattr(netcache_migration,"upgrade_to_"+str(current_version))
|
||||
upgrade_func(cache_folder)
|
||||
with open(version_path,"w") as f:
|
||||
f.write(str(current_version))
|
||||
f.close()
|
||||
UPGRADED=True
|
||||
|
||||
#get xdg folder. Folder should be "cache", "data" or "config"
|
||||
def xdg(folder="cache"):
|
||||
## Config directories
|
||||
## We implement our own python-xdg to avoid conflict with existing libraries.
|
||||
_home = os.path.expanduser('~')
|
||||
data_home = os.environ.get('XDG_DATA_HOME') or \
|
||||
os.path.join(_home,'.local','share')
|
||||
config_home = os.environ.get('XDG_CONFIG_HOME') or \
|
||||
os.path.join(_home,'.config')
|
||||
_CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/")
|
||||
_DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/")
|
||||
_old_config = os.path.expanduser("~/.offpunk/")
|
||||
## Look for pre-existing config directory, if any
|
||||
if os.path.exists(_old_config):
|
||||
_CONFIG_DIR = _old_config
|
||||
#if no XDG .local/share and not XDG .config, we use the old config
|
||||
if not os.path.exists(data_home) and os.path.exists(_old_config):
|
||||
_DATA_DIR = _CONFIG_DIR
|
||||
## get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable
|
||||
# if OFFPUNK_CACHE_PATH empty, set default to ~/.cache/offpunk
|
||||
cache_home = os.environ.get('XDG_CACHE_HOME') or\
|
||||
os.path.join(_home,'.cache')
|
||||
_CACHE_PATH = os.environ.get('OFFPUNK_CACHE_PATH', \
|
||||
os.path.join(os.path.expanduser(cache_home),"offpunk/"))
|
||||
#Check that the cache path ends with "/"
|
||||
if not _CACHE_PATH.endswith("/"):
|
||||
_CACHE_PATH += "/"
|
||||
os.makedirs(_CACHE_PATH,exist_ok=True)
|
||||
if folder == "cache" and not UPGRADED:
|
||||
upgrade_cache(_CACHE_PATH)
|
||||
if folder == "cache":
|
||||
return _CACHE_PATH
|
||||
elif folder == "config":
|
||||
return _CONFIG_DIR
|
||||
elif folder == "data":
|
||||
return _DATA_DIR
|
||||
else:
|
||||
print("No XDG folder for %s. Check your code."%folder)
|
||||
return None
|
||||
|
||||
|
||||
|
||||
#An IPV6 URL should be put between []
|
||||
#We try to detect them has location with more than 2 ":"
|
||||
def fix_ipv6_url(url):
|
||||
if not url or url.startswith("mailto"):
|
||||
return url
|
||||
if "://" in url:
|
||||
schema, schemaless = url.split("://",maxsplit=1)
|
||||
else:
|
||||
schema, schemaless = None, url
|
||||
if "/" in schemaless:
|
||||
netloc, rest = schemaless.split("/",1)
|
||||
if netloc.count(":") > 2 and "[" not in netloc and "]" not in netloc:
|
||||
schemaless = "[" + netloc + "]" + "/" + rest
|
||||
elif schemaless.count(":") > 2 and "[" not in schemaless and "]" not in schemaless:
|
||||
schemaless = "[" + schemaless + "]/"
|
||||
if schema:
|
||||
return schema + "://" + schemaless
|
||||
return schemaless
|
||||
|
||||
# Cheap and cheerful URL detector
|
||||
def looks_like_url(word):
|
||||
try:
|
||||
if not word.strip():
|
||||
return False
|
||||
url = fix_ipv6_url(word).strip()
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
#sometimes, urllib crashed only when requesting the port
|
||||
port = parsed.port
|
||||
scheme = word.split("://")[0]
|
||||
mailto = word.startswith("mailto:")
|
||||
start = scheme in netcache.standard_ports
|
||||
local = scheme in ["file","list"]
|
||||
if mailto:
|
||||
return "@" in word
|
||||
elif not local:
|
||||
if start:
|
||||
#IPv4
|
||||
if "." in word or "localhost" in word:
|
||||
return True
|
||||
#IPv6
|
||||
elif "[" in word and ":" in word and "]" in word:
|
||||
return True
|
||||
else: return False
|
||||
else: return False
|
||||
return start and ("." in word or "localhost" in word or ":" in word)
|
||||
else:
|
||||
return "/" in word
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
## Those two functions add/remove the mode to the
|
||||
# URLs. This is a gross hack to remember the mode
|
||||
def mode_url(url,mode):
|
||||
if mode and mode!= "readable" and "##offpunk=" not in url:
|
||||
url += "##offpunk_mode=" + mode
|
||||
return url
|
||||
|
||||
def unmode_url(url):
|
||||
mode = None
|
||||
splitted = url.split("##offpunk_mode=")
|
||||
if len(splitted) > 1:
|
||||
url = splitted[0]
|
||||
mode = splitted[1]
|
||||
return [url,mode]
|
||||
|
||||
# In terms of arguments, this can take an input file/string to be passed to
|
||||
# stdin, a parameter to do (well-escaped) "%" replacement on the command, a
|
||||
# flag requesting that the output go directly to the stdout, and a list of
|
||||
# additional environment variables to set.
|
||||
def run(cmd, *, input=None, parameter=None, direct_output=False, env={}):
|
||||
if parameter:
|
||||
cmd = cmd % shlex.quote(parameter)
|
||||
e = os.environ
|
||||
e.update(env)
|
||||
if isinstance(input, io.IOBase):
|
||||
stdin = input
|
||||
input = None
|
||||
else:
|
||||
if input:
|
||||
input = input.encode()
|
||||
stdin = None
|
||||
if not direct_output:
|
||||
# subprocess.check_output() wouldn't allow us to pass stdin.
|
||||
result = subprocess.run(cmd, check=True, env=e, input=input,
|
||||
shell=True, stdin=stdin, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
return result.stdout.decode()
|
||||
else:
|
||||
subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin)
|
||||
|
||||
|
||||
global TERM_WIDTH
|
||||
TERM_WIDTH = 72
|
||||
|
||||
#if absolute, returns the real terminal width, not the text width
|
||||
def term_width(new_width=None,absolute=False):
|
||||
if new_width:
|
||||
global TERM_WIDTH
|
||||
TERM_WIDTH = new_width
|
||||
cur = shutil.get_terminal_size()[0]
|
||||
if absolute:
|
||||
return cur
|
||||
width = TERM_WIDTH
|
||||
if cur < width:
|
||||
width = cur
|
||||
return width
|
||||
|
||||
def is_local(url):
|
||||
if not url: return True
|
||||
elif "://" in url:
|
||||
scheme,path = url.split("://",maxsplit=1)
|
||||
return scheme in ["file","mail","list","mailto"]
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
# This method return the image URL or invent it if it’s a base64 inline image
|
||||
# It returns [url,image_data] where image_data is None for normal image
|
||||
def looks_like_base64(src,baseurl):
|
||||
imgdata = None
|
||||
imgname = src
|
||||
if src and src.startswith("data:image/"):
|
||||
if ";base64," in src:
|
||||
splitted = src.split(";base64,")
|
||||
#splitted[0] is something like data:image/jpg
|
||||
if "/" in splitted[0]:
|
||||
extension = splitted[0].split("/")[1]
|
||||
else:
|
||||
extension = "data"
|
||||
imgdata = splitted[1]
|
||||
imgname = imgdata[:20] + "." + extension
|
||||
imgurl = urllib.parse.urljoin(baseurl, imgname)
|
||||
else:
|
||||
#We can’t handle other data:image such as svg for now
|
||||
imgurl = None
|
||||
else:
|
||||
imgurl = urllib.parse.urljoin(baseurl, imgname)
|
||||
return imgurl,imgdata
|
298
opnk.py
298
opnk.py
|
@ -1,297 +1,3 @@
|
|||
#!/usr/bin/env python3
|
||||
#opnk stand for "Open like a PuNK".
|
||||
#It will open any file or URL and display it nicely in less.
|
||||
#If not possible, it will fallback to xdg-open
|
||||
#URL are retrieved through netcache
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import argparse
|
||||
import netcache
|
||||
import ansicat
|
||||
import offutils
|
||||
import shutil
|
||||
import time
|
||||
import fnmatch
|
||||
from offutils import run,term_width,mode_url,unmode_url,is_local
|
||||
|
||||
_HAS_XDGOPEN = shutil.which('xdg-open')
|
||||
_GREP = "grep --color=auto"
|
||||
|
||||
less_version = 0
|
||||
if not shutil.which("less"):
|
||||
print("Please install the pager \"less\" to run Offpunk.")
|
||||
print("If you wish to use another pager, send me an email !")
|
||||
print("(I’m really curious to hear about people not having \"less\" on their system.)")
|
||||
sys.exit()
|
||||
output = run("less --version")
|
||||
# We get less Version (which is the only integer on the first line)
|
||||
words = output.split("\n")[0].split()
|
||||
less_version = 0
|
||||
for w in words:
|
||||
if w.isdigit():
|
||||
less_version = int(w)
|
||||
# restoring position only works for version of less > 572
|
||||
if less_version >= 572:
|
||||
_LESS_RESTORE_POSITION = True
|
||||
else:
|
||||
_LESS_RESTORE_POSITION = False
|
||||
#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s"
|
||||
# -E : quit when reaching end of file (to behave like "cat")
|
||||
# -F : quit if content fits the screen (behave like "cat")
|
||||
# -X : does not clear the screen
|
||||
# -R : interpret ANSI colors correctly
|
||||
# -f : suppress warning for some contents
|
||||
# -M : long prompt (to have info about where you are in the file)
|
||||
# -W : hilite the new first line after a page skip (space)
|
||||
# -i : ignore case in search
|
||||
# -S : do not wrap long lines. Wrapping is done by offpunk, longlines
|
||||
# are there on purpose (surch in asciiart)
|
||||
#--incsearch : incremental search starting rev581
|
||||
def less_cmd(file, histfile=None,cat=False,grep=None):
|
||||
less_prompt = "page %%d/%%D- lines %%lb/%%L - %%Pb\\%%"
|
||||
if less_version >= 581:
|
||||
less_base = "less --incsearch --save-marks -~ -XRfWiS -P \"%s\""%less_prompt
|
||||
elif less_version >= 572:
|
||||
less_base = "less --save-marks -XRfMWiS"
|
||||
else:
|
||||
less_base = "less -XRfMWiS"
|
||||
_DEFAULT_LESS = less_base + " \"+''\" %s"
|
||||
_DEFAULT_CAT = less_base + " -EF %s"
|
||||
if histfile:
|
||||
env = {"LESSHISTFILE": histfile}
|
||||
else:
|
||||
env = {}
|
||||
if cat:
|
||||
cmd_str = _DEFAULT_CAT
|
||||
elif grep:
|
||||
grep_cmd = _GREP
|
||||
#case insensitive for lowercase search
|
||||
if grep.islower():
|
||||
grep_cmd += " -i"
|
||||
cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep
|
||||
else:
|
||||
cmd_str = _DEFAULT_LESS
|
||||
run(cmd_str, parameter=file, direct_output=True, env=env)
|
||||
|
||||
class opencache():
|
||||
def __init__(self):
|
||||
# We have a cache of the rendering of file and, for each one,
|
||||
# a less_histfile containing the current position in the file
|
||||
self.temp_files = {}
|
||||
self.less_histfile = {}
|
||||
# This dictionary contains an url -> ansirenderer mapping. This allows
|
||||
# to reuse a renderer when visiting several times the same URL during
|
||||
# the same session
|
||||
# We save the time at which the renderer was created in renderer_time
|
||||
# This way, we can invalidate the renderer if a new version of the source
|
||||
# has been downloaded
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.mime_handlers = {}
|
||||
self.last_mode = {}
|
||||
self.last_width = term_width(absolute=True)
|
||||
|
||||
def _get_handler_cmd(self, mimetype):
|
||||
# Now look for a handler for this mimetype
|
||||
# Consider exact matches before wildcard matches
|
||||
exact_matches = []
|
||||
wildcard_matches = []
|
||||
for handled_mime, cmd_str in self.mime_handlers.items():
|
||||
if "*" in handled_mime:
|
||||
wildcard_matches.append((handled_mime, cmd_str))
|
||||
else:
|
||||
exact_matches.append((handled_mime, cmd_str))
|
||||
for handled_mime, cmd_str in exact_matches + wildcard_matches:
|
||||
if fnmatch.fnmatch(mimetype, handled_mime):
|
||||
break
|
||||
else:
|
||||
# Use "xdg-open" as a last resort.
|
||||
if _HAS_XDGOPEN:
|
||||
cmd_str = "xdg-open %s"
|
||||
else:
|
||||
cmd_str = "echo \"Can’t find how to open \"%s"
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return cmd_str
|
||||
|
||||
# Return the handler for a specific mimetype.
|
||||
# Return the whole dic if no specific mime provided
|
||||
def get_handlers(self,mime=None):
|
||||
if mime and mime in self.mime_handlers.keys():
|
||||
return self.mime_handlers[mime]
|
||||
elif mime:
|
||||
return None
|
||||
else:
|
||||
return self.mime_handlers
|
||||
|
||||
def set_handler(self,mime,handler):
|
||||
previous = None
|
||||
if mime in self.mime_handlers.keys():
|
||||
previous = self.mime_handlers[mime]
|
||||
self.mime_handlers[mime] = handler
|
||||
if "%s" not in handler:
|
||||
print("WARNING: this handler has no %%s, no filename will be provided to the command")
|
||||
if previous:
|
||||
print("Previous handler was %s"%previous)
|
||||
|
||||
def get_renderer(self,inpath,mode=None,theme=None):
|
||||
# We remove the ##offpunk_mode= from the URL
|
||||
# If mode is already set, we don’t use the part from the URL
|
||||
inpath,newmode = unmode_url(inpath)
|
||||
if not mode: mode = newmode
|
||||
# If we still doesn’t have a mode, we see if we used one before
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
elif not mode:
|
||||
#default mode is readable
|
||||
mode = "readable"
|
||||
renderer = None
|
||||
path = netcache.get_cache_path(inpath)
|
||||
if path:
|
||||
usecache = inpath in self.rendererdic.keys() and not is_local(inpath)
|
||||
#Screen size may have changed
|
||||
width = term_width(absolute=True)
|
||||
if usecache and self.last_width != width:
|
||||
self.cleanup()
|
||||
usecache = False
|
||||
self.last_width = width
|
||||
if usecache:
|
||||
if inpath in self.renderer_time.keys():
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = self.renderer_time[inpath]
|
||||
usecache = last_cached > last_downloaded
|
||||
else:
|
||||
usecache = False
|
||||
if not usecache:
|
||||
renderer = ansicat.renderer_from_file(path,url=inpath,theme=theme)
|
||||
if renderer:
|
||||
self.rendererdic[inpath] = renderer
|
||||
self.renderer_time[inpath] = int(time.time())
|
||||
else:
|
||||
renderer = self.rendererdic[inpath]
|
||||
return renderer
|
||||
|
||||
def get_temp_filename(self,url):
|
||||
if url in self.temp_files.keys():
|
||||
return self.temp_files[url]
|
||||
else:
|
||||
return None
|
||||
|
||||
def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs):
|
||||
#Return True if inpath opened in Terminal
|
||||
# False otherwise
|
||||
# also returns the url in case it has been modified
|
||||
#if terminal = False, we don’t try to open in the terminal,
|
||||
#we immediately fallback to xdg-open.
|
||||
#netcache currently provide the path if it’s a file.
|
||||
if not offutils.is_local(inpath):
|
||||
kwargs["images_mode"] = mode
|
||||
cachepath,inpath = netcache.fetch(inpath,**kwargs)
|
||||
if not cachepath:
|
||||
return False, inpath
|
||||
# folowing line is for :// which are locals (file,list)
|
||||
elif "://" in inpath:
|
||||
cachepath,inpath = netcache.fetch(inpath,**kwargs)
|
||||
elif inpath.startswith("mailto:"):
|
||||
cachepath = inpath
|
||||
elif os.path.exists(inpath):
|
||||
cachepath = inpath
|
||||
else:
|
||||
print("%s does not exist"%inpath)
|
||||
return False, inpath
|
||||
renderer = self.get_renderer(inpath,mode=mode,theme=theme)
|
||||
if renderer and mode:
|
||||
renderer.set_mode(mode)
|
||||
self.last_mode[inpath] = mode
|
||||
if not mode and inpath in self.last_mode.keys():
|
||||
mode = self.last_mode[inpath]
|
||||
renderer.set_mode(mode)
|
||||
#we use the full moded url as key for the dictionary
|
||||
key = mode_url(inpath,mode)
|
||||
if terminal and renderer:
|
||||
#If this is an image and we have chafa/timg, we
|
||||
#don’t use less, we call it directly
|
||||
if renderer.has_direct_display():
|
||||
renderer.display(mode=mode,directdisplay=True)
|
||||
return True, inpath
|
||||
else:
|
||||
body = renderer.display(mode=mode)
|
||||
#Should we use the cache ? only if it is not local and there’s a cache
|
||||
usecache = key in self.temp_files and not is_local(inpath)
|
||||
if usecache:
|
||||
#and the cache is still valid!
|
||||
last_downloaded = netcache.cache_last_modified(inpath)
|
||||
last_cached = os.path.getmtime(self.temp_files[key])
|
||||
if last_downloaded > last_cached:
|
||||
usecache = False
|
||||
self.temp_files.pop(key)
|
||||
self.less_histfile.pop(key)
|
||||
# We actually put the body in a tmpfile before giving it to less
|
||||
if not usecache:
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.temp_files[key] = tmpf.name
|
||||
tmpf.write(body)
|
||||
tmpf.close()
|
||||
if key not in self.less_histfile:
|
||||
firsttime = True
|
||||
tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False)
|
||||
self.less_histfile[key] = tmpf.name
|
||||
else:
|
||||
#We don’t want to restore positions in lists
|
||||
firsttime = is_local(inpath)
|
||||
less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep)
|
||||
return True, inpath
|
||||
#maybe, we have no renderer. Or we want to skip it.
|
||||
else:
|
||||
mimetype = ansicat.get_mime(cachepath)
|
||||
if mimetype == "mailto":
|
||||
mail = inpath[7:]
|
||||
resp = input("Send an email to %s Y/N? " %mail)
|
||||
if resp.strip().lower() in ("y", "yes"):
|
||||
if _HAS_XDGOPEN :
|
||||
run("xdg-open mailto:%s", parameter=mail,direct_output=True)
|
||||
else:
|
||||
print("Cannot find a mail client to send mail to %s" %inpath)
|
||||
print("Please install xdg-open (usually from xdg-util package)")
|
||||
return False, inpath
|
||||
else:
|
||||
cmd_str = self._get_handler_cmd(mimetype)
|
||||
try:
|
||||
run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True)
|
||||
except FileNotFoundError:
|
||||
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
|
||||
print("You can use the ! command to specify another handler program or pipeline.")
|
||||
return False, inpath
|
||||
|
||||
#We remove the renderers from the cache and we also delete temp files
|
||||
def cleanup(self):
|
||||
while len(self.temp_files) > 0:
|
||||
os.remove(self.temp_files.popitem()[1])
|
||||
while len(self.less_histfile) > 0:
|
||||
os.remove(self.less_histfile.popitem()[1])
|
||||
self.last_width = None
|
||||
self.rendererdic = {}
|
||||
self.renderer_time = {}
|
||||
self.last_mode = {}
|
||||
|
||||
def main():
|
||||
descri = "opnk is an universal open command tool that will try to display any file \
|
||||
in the pager less after rendering its content with ansicat. If that fails, \
|
||||
opnk will fallback to opening the file with xdg-open. If given an URL as input \
|
||||
instead of a path, opnk will rely on netcache to get the networked content."
|
||||
parser = argparse.ArgumentParser(prog="opnk",description=descri)
|
||||
parser.add_argument("--mode", metavar="MODE",
|
||||
help="Which mode should be used to render: normal (default), full or source.\
|
||||
With HTML, the normal mode try to extract the article.")
|
||||
parser.add_argument("content",metavar="INPUT", nargs="*",
|
||||
default=sys.stdin, help="Path to the file or URL to open")
|
||||
parser.add_argument("--cache-validity",type=int, default=0,
|
||||
help="maximum age, in second, of the cached version before \
|
||||
redownloading a new version")
|
||||
args = parser.parse_args()
|
||||
cache = opencache()
|
||||
for f in args.content:
|
||||
cache.opnk(f,mode=args.mode,validity=args.cache_validity)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
from offpunk.opnk import main
|
||||
main()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
requires = ["flit_core >=3.2,<4"]
|
||||
build-backend = "flit_core.buildapi"
|
||||
|
||||
[project]
|
||||
name = "offpunk"
|
||||
|
@ -35,6 +35,8 @@ html = ["bs4", "readability-lxml"]
|
|||
http = ["requests"]
|
||||
process-title = ["setproctitle"]
|
||||
rss = ["feedparser"]
|
||||
timg = ["timg>=1.3.2"]
|
||||
file = ["file"]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://sr.ht/~lioploum/offpunk/"
|
||||
|
@ -43,16 +45,9 @@ Source = "https://git.sr.ht/~lioploum/offpunk"
|
|||
|
||||
[project.scripts]
|
||||
offpunk = "offpunk:main"
|
||||
netcache = "netcache:main"
|
||||
ansicat = "ansicat:main"
|
||||
opnk = "opnk:main"
|
||||
netcache = "offpunk.netcache:main"
|
||||
ansicat = "offpunk.ansicat:main"
|
||||
opnk = "offpunk.opnk:main"
|
||||
|
||||
[tool.hatch.version]
|
||||
path = "offpunk.py" # read __version__
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
only-include = [
|
||||
"ansicat.py", "netcache_migration.py", "netcache.py",
|
||||
"offblocklist.py", "offpunk.py", "offthemes.py",
|
||||
"offutils.py", "opnk.py",
|
||||
]
|
||||
[tool.flit.sdist]
|
||||
include = ["doc/", "man/", "CHANGELOG"]
|
||||
|
|
Loading…
Reference in New Issue