OOPify GeminiItems - makes interface code a bit tidier and concentrates all URL wrangling in one place. I *think* this makes things clearer.
This commit is contained in:
parent
642117f553
commit
d8d59f51e4
160
av98.py
160
av98.py
|
@ -74,28 +74,10 @@ _ANSI_COLORS = {
|
|||
"black": "\x1b[0;30m",
|
||||
}
|
||||
|
||||
CRLF = '\r\n'
|
||||
|
||||
# Lightweight representation of an item in Geminispace
|
||||
GeminiItem = collections.namedtuple("GeminiItem",
|
||||
("scheme", "host", "port", "path", "name"))
|
||||
|
||||
def url_to_geminiitem(url, name=None):
|
||||
# urllibparse.urlparse can handle IPv6 addresses, but only if they
|
||||
# are formatted very carefully, in a way that users almost
|
||||
# certainly won't expect. So, catch them early and try to fix
|
||||
# them...
|
||||
if url.count(":") > 2: # Best way to detect them?
|
||||
url = fix_ipv6_url(url)
|
||||
# Prepend a gemini schema if none given
|
||||
if "://" not in url:
|
||||
url = "gemini://" + url
|
||||
u = urllib.parse.urlparse(url)
|
||||
# https://tools.ietf.org/html/rfc4266#section-2.1
|
||||
path = u.path
|
||||
return GeminiItem(u.scheme, u.hostname, u.port or 1965, path, name)
|
||||
|
||||
def fix_ipv6_url(url):
|
||||
if not url.count(":") > 2: # Best way to detect them?
|
||||
return url
|
||||
# If there's a pair of []s in there, it's probably fine as is.
|
||||
if "[" in url and "]" in url:
|
||||
return url
|
||||
|
@ -115,40 +97,80 @@ def fix_ipv6_url(url):
|
|||
return schema + "://" + schemaless
|
||||
return schemaless
|
||||
|
||||
def geminiitem_to_url(gi):
|
||||
if gi and gi.host:
|
||||
return ("%s://%s%s%s" % (
|
||||
gi.scheme,
|
||||
gi.host,
|
||||
"" if gi.port == 1965 else ":%d" % gi.port,
|
||||
gi.path if gi.path.startswith("/") else "/"+gi.path,
|
||||
))
|
||||
elif gi:
|
||||
return gi.path
|
||||
else:
|
||||
return ""
|
||||
standard_ports = {
|
||||
"gemini": 1965,
|
||||
"gopher": 70,
|
||||
"http": 80,
|
||||
"https": 443,
|
||||
}
|
||||
|
||||
def geminiitem_from_line(line, menu_gi):
|
||||
assert line.startswith("=>")
|
||||
assert line[2:].strip()
|
||||
bits = line[2:].strip().split(maxsplit=1)
|
||||
link = bits[0]
|
||||
name = bits[1] if len(bits) == 2 else link
|
||||
if "://" in link:
|
||||
return url_to_geminiitem(link, name)
|
||||
else:
|
||||
return GeminiItem("gemini", menu_gi.host, menu_gi.port, link, name)
|
||||
class GeminiItem():
|
||||
|
||||
def geminiitem_to_line(gi, name=""):
|
||||
name = ((name or gi.name) or geminiitem_to_url(gi))
|
||||
path = gi.path
|
||||
return "=> %s %s" % (geminiitem_to_url(gi), name)
|
||||
def __init__(self, url, name=""):
|
||||
if "://" not in url:
|
||||
url = "gemini://" + url
|
||||
self.url = fix_ipv6_url(url)
|
||||
self.name = name
|
||||
parsed = urllib.parse.urlparse(self.url)
|
||||
self.scheme = parsed.scheme
|
||||
self.host = parsed.hostname
|
||||
self.port = parsed.port or standard_ports[self.scheme]
|
||||
self.path = parsed.path
|
||||
|
||||
def root_url(self):
|
||||
return urllib.parse.urlunparse((self.scheme,
|
||||
# Don't include redundant ports in the netloc
|
||||
self.host if self.port == standard_ports[self.scheme] else self.host + ":" + str(self.port),
|
||||
"/", "", "", ""))
|
||||
|
||||
def up_url(self):
|
||||
pathbits = list(os.path.split(self.path))
|
||||
# Get rid of empty string from trailing /
|
||||
while not pathbits[-1]:
|
||||
pathbits.pop()
|
||||
# Don't try to go higher than root
|
||||
if len(pathbits) == 1:
|
||||
return self.url
|
||||
# Get rid of bottom component
|
||||
pathbits.pop()
|
||||
new_path = os.path.join(*pathbits)
|
||||
return urllib.parse.urlunparse((self.scheme,
|
||||
self.host if self.port == standard_ports[self.scheme] else self.host + ":" + str(self.port),
|
||||
new_path, "", "", ""))
|
||||
|
||||
def absolutise_url(self, relative_url):
|
||||
"""
|
||||
Convert a relative URL to an absolute URL by using the URL of this
|
||||
GeminiItem as a base.
|
||||
"""
|
||||
# Absolutise URL, which annoyingly needs a valid scheme...
|
||||
base_url = self.url.replace("gemini://", "https://")
|
||||
absolute = urllib.parse.urljoin(base_url, relative_url)
|
||||
absolute = absolute.replace("https://", "gemini://")
|
||||
return absolute
|
||||
|
||||
def to_map_line(self, name=None):
|
||||
if name or self.name:
|
||||
return "=> {} {}".format(self.url, name or self.name)
|
||||
else:
|
||||
return "=> {}".format(self.url)
|
||||
|
||||
@classmethod
|
||||
def from_map_line(cls, line, origin_gi):
|
||||
print(line)
|
||||
assert line.startswith("=>")
|
||||
assert line[2:].strip()
|
||||
bits = line[2:].strip().split(maxsplit=1)
|
||||
bits[0] = origin_gi.absolutise_url(bits[0])
|
||||
return cls(*bits)
|
||||
|
||||
CRLF = '\r\n'
|
||||
|
||||
# Cheap and cheerful URL detector
|
||||
def looks_like_url(word):
|
||||
return "." in word and word.startswith("gemini://")
|
||||
|
||||
# Decorators
|
||||
# GeminiClient Decorators
|
||||
def needs_gi(inner):
|
||||
def outer(self, *args, **kwargs):
|
||||
if not self.gi:
|
||||
|
@ -256,8 +278,8 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
print("User input not supported.")
|
||||
# Redirects
|
||||
elif status.startswith("3"):
|
||||
self._debug("Following redirect to %s." % mime)
|
||||
new_gi = GeminiItem(gi.host, gi.port, mime, None)
|
||||
new_gi = GeminiItem(gi.absolutise_url(mime))
|
||||
self._debug("Following redirect to %s." % new_gi.url)
|
||||
self._go_to_gi(new_gi)
|
||||
return
|
||||
# Errors
|
||||
|
@ -362,9 +384,8 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
# knowledge of earlier failures.
|
||||
raise err
|
||||
# Send request and wrap response in a file descriptor
|
||||
url = geminiitem_to_url(gi)
|
||||
self._debug("Sending %s<CRLF>" % url)
|
||||
s.sendall((url + CRLF).encode("UTF-8"))
|
||||
self._debug("Sending %s<CRLF>" % gi.url)
|
||||
s.sendall((gi.url + CRLF).encode("UTF-8"))
|
||||
return address, s.makefile(mode = "rb")
|
||||
|
||||
def _get_addresses(self, host, port):
|
||||
|
@ -414,7 +435,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
for line in body.splitlines():
|
||||
if line.startswith("=>"):
|
||||
try:
|
||||
gi = geminiitem_from_line(line, menu_gi)
|
||||
gi = GeminiItem.from_map_line(line, menu_gi)
|
||||
self.index.append(gi)
|
||||
tmpf.write(self._format_geminiitem(len(self.index), gi) + "\n")
|
||||
except:
|
||||
|
@ -431,16 +452,9 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
subprocess.call(shlex.split(cmd_str % self.idx_filename))
|
||||
|
||||
def _format_geminiitem(self, index, gi, url=False):
|
||||
line = "[%d] " % index
|
||||
# Add item name
|
||||
if gi.name:
|
||||
line += gi.name
|
||||
# Use URL in place of name if we didn't get here from a menu
|
||||
else:
|
||||
line += geminiitem_to_url(gi)
|
||||
# Add URL if requested
|
||||
line = "[%d] %s" % (index, gi.name or gi.url)
|
||||
if gi.name and url:
|
||||
line += " (%s)" % geminiitem_to_url(gi)
|
||||
line += " (%s)" % gi.url
|
||||
return line
|
||||
|
||||
def _show_lookup(self, offset=0, end=None, url=False):
|
||||
|
@ -590,9 +604,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
self._go_to_gi(gi)
|
||||
# If this isn't a mark, treat it as a URL
|
||||
else:
|
||||
url = line
|
||||
gi = url_to_geminiitem(url)
|
||||
self._go_to_gi(gi)
|
||||
self._go_to_gi(GeminiItem(line))
|
||||
|
||||
@needs_gi
|
||||
def do_reload(self, *args):
|
||||
|
@ -602,11 +614,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
@needs_gi
|
||||
def do_up(self, *args):
|
||||
"""Go up one directory in the path."""
|
||||
gi = self.gi
|
||||
pathbits = os.path.split(self.gi.path)
|
||||
new_path = os.path.join(*pathbits[0:-1])
|
||||
new_gi = GeminiItem(gi.host, gi.port, new_path, "1", gi.name)
|
||||
self._go_to_gi(new_gi)
|
||||
self._go_to_gi(GeminiItem(self.gi.up_url()))
|
||||
|
||||
def do_back(self, *args):
|
||||
"""Go back to the previous gemini item."""
|
||||
|
@ -636,9 +644,7 @@ Slow internet connection? Use 'set timeout' to be more patient.""")
|
|||
@needs_gi
|
||||
def do_root(self, *args):
|
||||
"""Go to root selector of the server hosting current item."""
|
||||
gi = GeminiItem(self.gi.scheme, self.gi.host, self.gi.port, "",
|
||||
"Root of %s" % self.gi.host)
|
||||
self._go_to_gi(gi)
|
||||
self._go_to_gi(GeminiItem(self.gi.root_url()))
|
||||
|
||||
def do_tour(self, line):
|
||||
"""Add index items as waypoints on a tour, which is basically a FIFO
|
||||
|
@ -665,7 +671,7 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
|
|||
elif line == "*":
|
||||
self.waypoints.extend(self.lookup)
|
||||
elif looks_like_url(line):
|
||||
self.waypoints.append(url_to_geminiitem(line))
|
||||
self.waypoints.append(GeminiItem(line))
|
||||
else:
|
||||
for index in line.split():
|
||||
try:
|
||||
|
@ -696,7 +702,7 @@ Think of it like marks in vi: 'mark a'='ma' and 'go a'=''a'."""
|
|||
line = line.strip()
|
||||
if not line:
|
||||
for mark, gi in self.marks.items():
|
||||
print("[%s] %s (%s)" % (mark, gi.name, geminiitem_to_url(gi)))
|
||||
print("[%s] %s (%s)" % (mark, gi.name, gi.url))
|
||||
elif line.isalpha() and len(line) == 1:
|
||||
self.marks[line] = self.gi
|
||||
else:
|
||||
|
@ -838,7 +844,7 @@ Use 'ls -l' to see URLs."""
|
|||
@needs_gi
|
||||
def do_url(self, *args):
|
||||
"""Print URL of most recently visited item."""
|
||||
print(geminiitem_to_url(self.gi))
|
||||
print(self.gi.url)
|
||||
|
||||
### Bookmarking stuff
|
||||
@needs_gi
|
||||
|
@ -847,7 +853,7 @@ Use 'ls -l' to see URLs."""
|
|||
Bookmarks are stored in the ~/.av98-bookmarks.txt file.
|
||||
Optionally, specify the new name for the bookmark."""
|
||||
with open(os.path.expanduser("~/.av98-bookmarks.txt"), "a") as fp:
|
||||
fp.write(geminiitem_to_line(self.gi, name=line))
|
||||
fp.write(self.gi.to_map_line(line))
|
||||
|
||||
def do_bookmarks(self, *args):
|
||||
"""Show the current bookmarks menu.
|
||||
|
|
Loading…
Reference in New Issue