318 lines
12 KiB
Python
Executable File
318 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import time
|
|
import mimetypes
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import socket
|
|
import socketserver
|
|
import ssl
|
|
import stat
|
|
import sys
|
|
import tempfile
|
|
import urllib.parse
|
|
|
|
HOST = "localhost"
|
|
|
|
def _format_filesize(size):
|
|
if size < 1024:
|
|
return "{:5.1f} B".format(size)
|
|
elif size < 1024**2:
|
|
return "{:5.1f} KiB".format(size / 1024.0)
|
|
elif size < 1024**3:
|
|
return "{:5.1f} MiB".format(size / 1024.0**2)
|
|
|
|
class GegobiHandler(socketserver.BaseRequestHandler):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
socketserver.BaseRequestHandler.__init__(self, *args, **kwargs)
|
|
|
|
def setup(self):
|
|
"""
|
|
Wrap socket in SSL session.
|
|
"""
|
|
self.request = context.wrap_socket(self.request, server_side=True)
|
|
|
|
def handle(self):
|
|
# Parse request URL, make sure it's for a Gemini resource
|
|
self.parse_request()
|
|
if self.request_scheme != "gemini" or self.request_host not in ("localhost", self.server.args.host):
|
|
self.send_gemini_header(50, "This server does not proxy requests.")
|
|
return
|
|
# Perform redirects
|
|
if self.request_path in self.server.redirects:
|
|
self.send_gemini_header(31, self.server.redirects[self.request_path])
|
|
self.request.close()
|
|
return
|
|
# Resolve path to filesystem
|
|
## First, transform request_path to a relative path, so os.join doesn't
|
|
## break out of the base directory
|
|
while self.request_path.startswith(os.sep):
|
|
self.request_path = self.request_path[1:]
|
|
## Handle tilde paths
|
|
if self.server.args.tilde and self.request_path.startswith("~"):
|
|
bits = self.request_path.split(os.sep)
|
|
# Remove ~ to get username
|
|
bits[0] = bits[0][1:]
|
|
bits.insert(1, self.server.args.tilde)
|
|
bits.insert(0, os.sep + "home")
|
|
local_path = os.path.join(*bits)
|
|
## Standard path
|
|
else:
|
|
local_path = os.path.join(self.server.args.base, self.request_path)
|
|
## Make absolutely sure we're not anywhere we shouldn't be
|
|
if not local_path.startswith(self.server.args.base):
|
|
self.send_gemini_header(51, "Not found.")
|
|
return
|
|
# Handle not founds
|
|
if not os.path.exists(local_path):
|
|
self.send_gemini_header(51, "Not found.")
|
|
return
|
|
if ".." in local_path or local_path.endswith(".pem"):
|
|
self.send_gemini_header(51, "Not found.")
|
|
return
|
|
# Check for .nogegobi files
|
|
dir_ = os.path.dirname(local_path)
|
|
while True:
|
|
if os.path.exists(os.path.join(dir_,".nogegobi")):
|
|
self.send_gemini_header(51, "Not found.")
|
|
return
|
|
if dir_ == "/":
|
|
break
|
|
dir_ = os.path.split(dir_)[0]
|
|
# Check for world readability
|
|
st = os.stat(local_path)
|
|
if not st.st_mode & stat.S_IROTH:
|
|
self.send_gemini_header(51, "Not found.")
|
|
return
|
|
# Handle directories
|
|
if os.path.isdir(local_path):
|
|
# Redirect to add trailing slash so relative URLs work
|
|
if self.request_path and not self.request_path.endswith("/"):
|
|
self.send_gemini_header(31, self.request_url+"/")
|
|
return
|
|
# Check for gemini or gopher menus
|
|
geminimap = os.path.join(local_path, "index.gmi")
|
|
geminimap2 = os.path.join(local_path, "index.gemini")
|
|
gophermap = os.path.join(local_path, "gophermap")
|
|
indexgph = os.path.join(local_path, "index.gph")
|
|
if os.path.exists(geminimap):
|
|
self.handle_geminimap(geminimap)
|
|
elif os.path.exists(geminimap2):
|
|
self.handle_geminimap(geminimap2)
|
|
elif os.path.exists(gophermap):
|
|
self.handle_gophermap(gophermap)
|
|
elif os.path.exists(indexgph):
|
|
self.handle_gph(indexgph)
|
|
else:
|
|
self.generate_directory_listing(local_path)
|
|
# Handle files
|
|
else:
|
|
self.handle_file(local_path)
|
|
# Clean up
|
|
self.request.close()
|
|
|
|
def _send(self, string):
|
|
self.request.send(string.encode("UTF-8"))
|
|
|
|
def send_gemini_header(self, status, meta):
|
|
"""
|
|
Send a Gemini header, and close the connection if the status code does
|
|
not indicate success.
|
|
"""
|
|
self._send("{} {}\r\n".format(status, meta))
|
|
if status / 10 != 2:
|
|
self.request.close()
|
|
|
|
def parse_request(self):
|
|
"""
|
|
Read a URL from the Gemini client and parse it up into parts,
|
|
including separating out the Gopher item type.
|
|
"""
|
|
requested_url = self.request.recv(1024).decode("UTF-8").strip()
|
|
if "://" not in requested_url:
|
|
requested_url = "gemini://" + requested_url
|
|
self.request_url = requested_url
|
|
parsed = urllib.parse.urlparse(requested_url)
|
|
self.request_scheme = parsed.scheme
|
|
self.request_host = parsed.hostname
|
|
self.request_port = parsed.port or 1965
|
|
self.request_path = parsed.path[1:] if parsed.path.startswith("/") else parsed.path
|
|
self.request_path = urllib.parse.unquote(self.request_path)
|
|
self.request_query = parsed.query
|
|
|
|
def handle_geminimap(self, filename):
|
|
self.send_gemini_header(20, "text/gemini")
|
|
with open(filename,"r") as fp:
|
|
self._send(fp.read())
|
|
|
|
def handle_gophermap(self, filename):
|
|
self.send_gemini_header(20, "text/gemini")
|
|
with open(filename, "r") as fp:
|
|
for line in fp:
|
|
if "\t" in line:
|
|
itemtype = line[0]
|
|
parts = line[1:].strip().split("\t")
|
|
if itemtype == "i":
|
|
self._send(parts[0])
|
|
else:
|
|
if len(parts) == 2:
|
|
# Relative link to same server
|
|
name, link = parts
|
|
if itemtype == "h" and link.startswith("URL:"):
|
|
link = link[4:]
|
|
elif len(parts) == 4:
|
|
# External gopher link
|
|
name, path, host, port = parts
|
|
link = self._gopher_url(host, port, itemtype, path)
|
|
self._send("=> %s %s\r\n" % (link, name))
|
|
else:
|
|
self._send(line)
|
|
|
|
def handle_gph(self, filename):
|
|
self.send_gemini_header(20, "text/gemini")
|
|
with open(filename, "r") as fp:
|
|
for line in fp:
|
|
if line.startswith("[") and "]" in line and "|" in line:
|
|
line = line.strip()
|
|
# Menu item
|
|
# Ugly way to handle escaped pipes...
|
|
line = line.replace("\|","___GEGOBI_PIPE___")
|
|
itemtype, name, link, host, port = line[1:-1].split("|")
|
|
name = name.replace("___GEGOBI_PIPE___","|")
|
|
if not itemtype:
|
|
continue
|
|
elif itemtype == "i":
|
|
self._send(name)
|
|
continue
|
|
if host == "server":
|
|
# Link to same server
|
|
if itemtype == "h" and link.startswith("URL:"):
|
|
link = link[4:]
|
|
else:
|
|
# External gopher link
|
|
link = self._gopher_url(host, port, itemtype, path)
|
|
self._send("=> %s %s\r\n" % (link, name))
|
|
elif line.startswith("t"):
|
|
# Trim t and send as info line
|
|
self._send(line[1:])
|
|
else:
|
|
self._send(line)
|
|
|
|
def _gopher_url(self, host, port, itemtype, path):
|
|
path = itemtype + path
|
|
if port == "70":
|
|
netloc = host
|
|
else:
|
|
netloc = host + ":" + port
|
|
return urllib.parse.urlunparse(("gopher", netloc, path, "", "", ""))
|
|
|
|
def generate_directory_listing(self, directory):
|
|
self.send_gemini_header(20, "text/gemini")
|
|
self._send("[/{}]\r\n".format(self.request_path))
|
|
self._send("\r\n")
|
|
if directory != self.server.args.base:
|
|
up = self._get_up_url()
|
|
self._send("=> %s %s\r\n" % (urllib.parse.quote(up), ".."))
|
|
for f in os.listdir(directory):
|
|
# Only list world readable files
|
|
st = os.stat(os.path.join(directory,f))
|
|
if not st.st_mode & stat.S_IROTH:
|
|
continue
|
|
label = f.ljust(32)
|
|
label += time.ctime(st.st_mtime).ljust(30)
|
|
label += _format_filesize(st.st_size)
|
|
self._send("=> %s %s\r\n" % (urllib.parse.quote(f), label))
|
|
|
|
def _get_up_url(self):
|
|
# You'd think this would be simple...
|
|
path_to_split = "/" + self.request_path
|
|
if path_to_split.endswith("/"):
|
|
path_to_split = path_to_split[0:-1]
|
|
up, _ = os.path.split(path_to_split)
|
|
return up
|
|
path_bits = list(os.path.split(self.request_path))
|
|
while not path_bits[-1]:
|
|
path_bits.pop()
|
|
path_bits.pop()
|
|
if not path_bits:
|
|
return "/"
|
|
else:
|
|
return os.path.join(*path_bits)
|
|
|
|
def handle_file(self, filename):
|
|
"""
|
|
"""
|
|
# Guess/detect MIME type
|
|
mimetype, encoding = mimetypes.guess_type(filename)
|
|
if not mimetype:
|
|
out = subprocess.check_output(
|
|
shlex.split("file --brief --mime-type %s" % filename)
|
|
)
|
|
mimetype = out.decode("UTF-8").strip()
|
|
self.send_gemini_header(20, mimetype)
|
|
with open(filename,"rb") as fp:
|
|
self.request.send(fp.read())
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser(description=
|
|
"""GeGoBi is a tool to easily serve existing Gopher content via the
|
|
Gemini protocol as well, resulting in "Gemini-Gopher bi-hosting"
|
|
(or "GeGoBi").""")
|
|
parser.add_argument('--base', type=str, nargs="?", default="/var/gopher",
|
|
help='Gopherhole base directory.')
|
|
parser.add_argument('--cert', type=str, nargs="?", default="cert.pem",
|
|
help='TLS certificate file.')
|
|
parser.add_argument('--host', type=str, required=True,
|
|
help='Hostname of Gemini server.')
|
|
parser.add_argument('--key', type=str, nargs="?", default="key.pem",
|
|
help='TLS private key file.')
|
|
parser.add_argument('--local', action="store_true",
|
|
help='Serve only on 127.0.0.1.')
|
|
parser.add_argument('--port', type=int, nargs="?", default=1965,
|
|
help='TCP port to serve on.')
|
|
parser.add_argument('--redirects', type=str, nargs="?", default="",
|
|
help='File to read redirect definitions from.')
|
|
parser.add_argument('--tilde', type=str, nargs="?", default="",
|
|
help='Home subdirectory to map tilde URLs to.')
|
|
args = parser.parse_args()
|
|
|
|
# Absolutise base directory and make sure it exists
|
|
args.base = os.path.abspath(args.base)
|
|
if not os.path.exists(args.base):
|
|
print("Could not find base directory {}.".format(
|
|
args.base))
|
|
sys.exit(1)
|
|
|
|
if not (os.path.exists(args.cert) and os.path.exists(args.key)):
|
|
print("Could not find certificate file {} and/or key file {}.".format(
|
|
args.cert, args.key))
|
|
sys.exit(1)
|
|
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
context.load_cert_chain(certfile=args.cert, keyfile=args.key)
|
|
|
|
socketserver.ThreadingTCPServer.allow_reuse_address = 1
|
|
gegobi = socketserver.ThreadingTCPServer(("localhost" if args.local else "",
|
|
args.port), GegobiHandler)
|
|
gegobi.args = args
|
|
|
|
gegobi.redirects = {}
|
|
if args.redirects:
|
|
with open(args.redirects, "r") as fp:
|
|
for line in fp:
|
|
try:
|
|
old, new = line.strip().split()
|
|
except ValueError:
|
|
continue
|
|
gegobi.redirects[old] = new
|
|
|
|
try:
|
|
gegobi.serve_forever()
|
|
except KeyboardInterrupt:
|
|
gegobi.shutdown()
|
|
gegobi.server_close()
|
|
|