187 lines
7.3 KiB
Python
187 lines
7.3 KiB
Python
import os, mimetypes, subprocess, urllib.parse, time, tempfile
|
|
from jetforce.app.base import JetforceApplication, Response, Status
|
|
from pathlib import Path
|
|
from functools import wraps
|
|
mimetypes.init()
|
|
mimetypes.add_type("text/gemini",".gmi")
|
|
|
|
RECENT_ENTRIES_COUNT = 20
|
|
LINE_COUNT = lambda fn: int((subprocess.run(["/usr/bin/wc","-l",fn],stdout=subprocess.PIPE).stdout or b"-1 somethingswrong").decode("utf-8").split()[0])
|
|
|
|
class DefaultToGopher(JetforceApplication):
|
|
GOPHER_ROOT = Path("/var/gopher")
|
|
def default_callback(self, request, **_):
|
|
"""
|
|
Defaults to serving a file out of /var/gopher if one exists.
|
|
Since the app itself defines analogues for the gophermap CGI-esques that cosmic
|
|
itself uses, we'll only end up here if we're trying to serve some file that
|
|
exists on gopher (or if it's a bad request but that's the job of check_request).
|
|
|
|
Implementation loosely based on jetforce.app.static.StaticDirectoryApplication.serve_static_request.
|
|
"""
|
|
# make sure we can handle this request (since that's the only other reason we'd be here)
|
|
if (resp:=self.check_request(request)): return resp
|
|
# normalize the request path
|
|
filename = Path(os.path.normpath(urllib.parse.unquote(request.path.strip("/"))))
|
|
# cowardly refuse to serve file outside of gopher root
|
|
if filename.is_absolute() or str(filename).startswith(".."):
|
|
return Response(Status.BAD_REQUEST,"Detected path traversal")
|
|
# find filesystem path
|
|
fs_path = self.GOPHER_ROOT / filename
|
|
# check readability
|
|
try:
|
|
if not os.access(fs_path, os.R_OK):
|
|
return Response(Status.NOT_FOUND,"Not Found")
|
|
except OSError:
|
|
return Response(Status.NOT_FOUND)
|
|
if fs_path.is_file():
|
|
mimetype = mimetypes.guess_type(fs_path,strict=False)[0] or "application/octet-stream"
|
|
if str(fs_path).endswith("gophermap"): mimetype = "text/gophermap"
|
|
return Response(Status.SUCCESS,mimetype,self.load_file(fs_path))
|
|
else: # don't bother with directory listings
|
|
return Response(Status.NOT_FOUND,"Not Found")
|
|
def load_file(self,path):
|
|
with open(path) as f:
|
|
while (data:=f.read(2048)):
|
|
yield data
|
|
def check_request(self,request):
|
|
if any([
|
|
request.scheme!="gemini",
|
|
request.hostname!=request.environ["HOSTNAME"],
|
|
request.port and request.port!=request.environ["SERVER_PORT"]
|
|
]):
|
|
return Response(Status.PROXY_REQUEST_REFUSED,"Proxy Request Refused")
|
|
|
|
def abstract_listing_generator(listing_file,header_file=None,header_text=None,limit_line_count=False,filter_line=lambda raw_line: True,count_from_end=True):
|
|
"""
|
|
Handles listing pages.
|
|
`listing_file` is the file to read as the listing.
|
|
There are two ways to give a header to the listing:
|
|
- `header_file` is the filename of a file to send as the header.
|
|
- `header_text` is text to send as the header.
|
|
If they are both provided, `header_file` is sent first, then `header_text`.
|
|
`limit_line_count` is either False (no limit to line count) or the number of
|
|
lines to send at most.
|
|
`filter_line` is a function that returns False when a line should be skipped.
|
|
`count_from_end` affects the line number generation. If it is True, the line
|
|
numbers are generated in reverse order (like in most listings). Otherwise, the
|
|
line numbers are generated forwards (like in ship listings).
|
|
"""
|
|
if header_file is not None:
|
|
with open(header_file) as hdr_f:
|
|
yield hdr_f.read()
|
|
if header_text is not None:
|
|
yield header_text
|
|
yield "\n\n"
|
|
total_line_count = LINE_COUNT(listing_file)
|
|
with open(listing_file) as f:
|
|
line_count = 0
|
|
while (line_count:=line_count+1) and (line:=f.readline()):
|
|
if limit_line_count and line_count>limit_line_count: return
|
|
if not filter_line(line): continue
|
|
log_name, log_link = line[1:].strip().split("\t")
|
|
if count_from_end:
|
|
log_num = total_line_count-line_count
|
|
else:
|
|
log_num = line_count
|
|
log_link = urllib.parse.quote(log_link,safe='/')
|
|
yield f"=> {log_link} {log_num} >> {log_name}\n"
|
|
|
|
def listing_generator(**kwargs):
|
|
"""Use the normal listing file."""
|
|
return abstract_listing_generator("/var/gopher/listing.gophermap",**kwargs)
|
|
|
|
def reverse_listing_generator(**kwargs):
|
|
"""
|
|
Use the normal listing file but in reverse. Uses some trickery to (hopefully
|
|
efficiently) reverse the listing file.
|
|
"""
|
|
with tempfile.NamedTemporaryFile("r",encoding="utf-8") as f:
|
|
subprocess.run("tac /var/gopher/listing.gophermap > "+f.name,shell=True)
|
|
yield from abstract_listing_generator(f.name,count_from_end=False,**kwargs)
|
|
|
|
def count_entries(ship):
|
|
count = 0
|
|
with open("/var/gopher/listing.gophermap") as f:
|
|
for line in f:
|
|
if line.startswith("0"+ship): count+=1
|
|
return count
|
|
|
|
def wrap_response(f):
|
|
@wraps(f)
|
|
def __wrapper(*args,**kwargs):
|
|
rv = f(*args,**kwargs)
|
|
if type(rv)!=Response: return Response(Status.SUCCESS,"text/gemini",rv)
|
|
return rv
|
|
return __wrapper
|
|
|
|
def wrap_generator(g):
|
|
def __viewfunc(request,**kwargs):
|
|
return Response(Status.SUCCESS,"text/gemini",g(request,**kwargs))
|
|
__viewfunc.__name__ = g.__name__
|
|
return __viewfunc
|
|
|
|
app = DefaultToGopher()
|
|
|
|
@app.route("(?:/(?:index\.gmi)?)?")
|
|
@wrap_response
|
|
def index(request):
|
|
return listing_generator(header_file="/var/cosmic/templates/geminiintro.tmpl",limit_line_count=RECENT_ENTRIES_COUNT)
|
|
|
|
@app.route("/log(?:/(?:index\.gmi)?)?")
|
|
@wrap_response
|
|
def log(request):
|
|
return listing_generator(header_text="RS001 Log Entries (Newest First):")
|
|
|
|
@app.route("/ships(?:/(?:index\.gmi)?)?")
|
|
@wrap_generator
|
|
def ships(request):
|
|
yield "# Ships and Outposts\n\n"
|
|
for ship in sorted(os.listdir("/var/gopher")):
|
|
if not os.path.isdir(os.path.join("/var/gopher",ship)): continue
|
|
if ship in ("ships","log"): continue
|
|
entries = count_entries(ship)
|
|
if entries==0: continue
|
|
urlencoded = urllib.parse.quote(ship,safe='')
|
|
yield f"=> /ships/{urlencoded}/ {ship} ({entries!s})\n"
|
|
|
|
@app.route("/ships/(?P<ship>[^/]+)(?:/(?:index\.gmi)?)?")
|
|
@wrap_response
|
|
def ship(request,ship=None):
|
|
if ship is None: return Response(Status.BAD_REQUEST,"Bad Request")
|
|
ship_unquoted = urllib.parse.unquote(ship)
|
|
def __generator():
|
|
description = os.path.join("/var/gopher",ship_unquoted,".description")
|
|
if os.path.exists(description):
|
|
yield from app.load_file(description)
|
|
yield "\n"
|
|
if os.path.exists(os.path.join("/var/gopher",ship_unquoted,"ABOUT")):
|
|
yield f"=> /{ship}/ABOUT About {ship_unquoted}\n"
|
|
yield f"# {ship_unquoted} - Ship Log\n"
|
|
yield from reverse_listing_generator(filter_line=lambda line: line.startswith("0"+ship_unquoted))
|
|
yield '\n\n'
|
|
year = time.strftime("%Y")
|
|
author_file = os.path.join("/var/gopher",ship_unquoted,"AUTHOR")
|
|
if os.path.exists(author_file):
|
|
with open(author_file) as f:
|
|
author = f.readline().strip()
|
|
yield f"=> /{ship}/ABOUT © {year} {author}\n"
|
|
else:
|
|
username = (subprocess.run(["/usr/bin/stat","-c","%U",os.path.join("/var/gopher",ship_unquoted)],stdout=subprocess.PIPE).stdout or b"unknown").decode("utf-8").strip()
|
|
yield f"© {year} {username}\n"
|
|
license_file = os.path.join("/var/gopher",ship_unquoted,"LICENSE")
|
|
if os.path.exists(license_file):
|
|
with open(license_file) as f:
|
|
license_name = f.readline().strip()
|
|
yield f"=> /{ship}/LICENSE {license_name}\n"
|
|
else:
|
|
yield "All rights reserved.\n"
|
|
return __generator()
|
|
|
|
@app.route("/(?P<format>rss|atom).xml")
|
|
@wrap_generator
|
|
def feeds(request,format="atom"):
|
|
with open(os.path.join("/var/gopher",format+".xml")) as f:
|
|
for line in f:
|
|
yield line.replace("gopher://cosmic.voyage/0/","gemini://cosmic.voyage/").replace("<link>gopher://cosmic.voyage","<link>gemini://cosmic.voyage")
|