Initial code commit
This commit is contained in:
commit
80a1876be8
|
@ -0,0 +1,21 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2021 Robert 'khuxkm' Miles, https://khuxkm.tilde.team <khuxkm@tilde.team>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
|
@ -0,0 +1,3 @@
|
|||
# cosmic-gemini
|
||||
|
||||
a proxy for cosmic.voyage access via gemini. run `server.py` to run the server.
|
|
@ -0,0 +1,186 @@
|
|||
import os, mimetypes, subprocess, urllib.parse, time
|
||||
from jetforce.app.base import JetforceApplication, Response, Status
|
||||
from pathlib import Path
|
||||
from functools import wraps
|
||||
mimetypes.init()
|
||||
mimetypes.add_type("text/gemini",".gmi")
|
||||
|
||||
RECENT_ENTRIES_COUNT = 20
|
||||
LINE_COUNT = lambda fn: int((subprocess.run(["/usr/bin/wc","-l",fn],stdout=subprocess.PIPE).stdout or b"-1 somethingswrong").decode("utf-8").split()[0])
|
||||
|
||||
class DefaultToGopher(JetforceApplication):
|
||||
GOPHER_ROOT = Path("/var/gopher")
|
||||
def default_callback(self, request, **_):
|
||||
"""
|
||||
Defaults to serving a file out of /var/gopher if one exists.
|
||||
Since the app itself defines analogues for the gophermap CGI-esques that cosmic
|
||||
itself uses, we'll only end up here if we're trying to serve some file that
|
||||
exists on gopher (or if it's a bad request but that's the job of check_request).
|
||||
|
||||
Implementation loosely based on jetforce.app.static.StaticDirectoryApplication.serve_static_request.
|
||||
"""
|
||||
# make sure we can handle this request (since that's the only other reason we'd be here)
|
||||
if (resp:=self.check_request(request)): return resp
|
||||
# normalize the request path
|
||||
filename = Path(os.path.normpath(urllib.parse.unquote(request.path.strip("/"))))
|
||||
# cowardly refuse to serve file outside of gopher root
|
||||
if filename.is_absolute() or str(filename).startswith(".."):
|
||||
return Response(Status.BAD_REQUEST,"Detected path traversal")
|
||||
# find filesystem path
|
||||
fs_path = self.GOPHER_ROOT / filename
|
||||
# check readability
|
||||
try:
|
||||
if not os.access(fs_path, os.R_OK):
|
||||
return Response(Status.NOT_FOUND,"Not Found")
|
||||
except OSError:
|
||||
return Response(Status.NOT_FOUND)
|
||||
if fs_path.is_file():
|
||||
mimetype = mimetypes.guess_type(fs_path,strict=False)[0] or "application/octet-stream"
|
||||
if str(fs_path).endswith("gophermap"): mimetype = "text/gophermap"
|
||||
return Response(Status.SUCCESS,mimetype,self.load_file(fs_path))
|
||||
else: # don't bother with directory listings
|
||||
return Response(Status.NOT_FOUND,"Not Found")
|
||||
def load_file(self,path):
|
||||
with open(path) as f:
|
||||
while (data:=f.read(2048)):
|
||||
yield data
|
||||
def check_request(self,request):
|
||||
if any([
|
||||
request.scheme!="gemini",
|
||||
request.hostname!=request.environ["HOSTNAME"],
|
||||
request.port and request.port!=request.environ["SERVER_PORT"]
|
||||
]):
|
||||
return Response(Status.PROXY_REQUEST_REFUSED,"Proxy Request Refused")
|
||||
|
||||
def abstract_listing_generator(listing_file,header_file=None,header_text=None,limit_line_count=False,filter_line=lambda raw_line: True,count_from_end=True):
|
||||
"""
|
||||
Handles listing pages.
|
||||
`listing_file` is the file to read as the listing.
|
||||
There are two ways to give a header to the listing:
|
||||
- `header_file` is the filename of a file to send as the header.
|
||||
- `header_text` is text to send as the header.
|
||||
If they are both provided, `header_file` is sent first, then `header_text`.
|
||||
`limit_line_count` is either False (no limit to line count) or the number of
|
||||
lines to send at most.
|
||||
`filter_line` is a function that returns False when a line should be skipped.
|
||||
`count_from_end` affects the line number generation. If it is True, the line
|
||||
numbers are generated in reverse order (like in most listings). Otherwise, the
|
||||
line numbers are generated forwards (like in ship listings).
|
||||
"""
|
||||
if header_file is not None:
|
||||
with open(header_file) as hdr_f:
|
||||
yield hdr_f.read()
|
||||
if header_text is not None:
|
||||
yield header_text
|
||||
yield "\n\n"
|
||||
total_line_count = LINE_COUNT(listing_file)
|
||||
with open(listing_file) as f:
|
||||
line_count = 0
|
||||
while (line_count:=line_count+1) and (line:=f.readline()):
|
||||
if limit_line_count and line_count>limit_line_count: return
|
||||
if not filter_line(line): continue
|
||||
log_name, log_link = line.strip().split("\t")
|
||||
if count_from_end:
|
||||
log_num = total_line_count-line_count
|
||||
else:
|
||||
log_num = line_count
|
||||
yield f"=> {log_link} {log_num} >> {log_name}\n"
|
||||
|
||||
def listing_generator(**kwargs):
|
||||
"""Use the normal listing file."""
|
||||
return abstract_listing_generator("/var/gopher/listing.gophermap",**kwargs)
|
||||
|
||||
def reverse_listing_generator(**kwargs):
|
||||
"""
|
||||
Use the normal listing file but in reverse. Uses some trickery to (hopefully
|
||||
efficiently) reverse the listing file.
|
||||
"""
|
||||
with tempfile.NamedTemporaryFile("r",encoding="utf-8") as f:
|
||||
subprocess.run("tac /var/gopher/listing.gophermap > "+f.name,shell=True)
|
||||
yield from abstract_listing_generator(f.name,count_from_end=False,**kwargs)
|
||||
|
||||
def count_entries(ship):
|
||||
count = 0
|
||||
with open("/var/gopher/listing.gophermap") as f:
|
||||
for line in f:
|
||||
if line.startswith("0"+ship): count+=1
|
||||
return count
|
||||
|
||||
def wrap_response(f):
|
||||
@wraps(f)
|
||||
def __wrapper(*args,**kwargs):
|
||||
rv = f(*args,**kwargs)
|
||||
if type(rv)!=Response: return Response(Status.SUCCESS,"text/gemini",rv)
|
||||
return rv
|
||||
return __wrapper
|
||||
|
||||
def wrap_generator(g):
|
||||
def __viewfunc(request,**kwargs):
|
||||
return Response(Status.SUCCESS,"text/gemini",g(request,**kwargs))
|
||||
__viewfunc.__name__ = g.__name__
|
||||
return __viewfunc
|
||||
|
||||
app = DefaultToGopher()
|
||||
|
||||
@wrap_response
|
||||
@app.route("/")
|
||||
@app.route("/index.gmi")
|
||||
def index(request):
|
||||
return listing_generator(header_file="/var/cosmic/templates/geminiintro.tmpl",limit_line_count=RECENT_ENTRIES_COUNT)
|
||||
|
||||
@wrap_response
|
||||
@app.route("/log(?:/(?:index\.gmi)?)?")
|
||||
def log(request):
|
||||
return listing_generator(header_text="RS001 Log Entries (Newest First):")
|
||||
|
||||
@wrap_generator
|
||||
@app.route("/ships(?:/(?:index\.gmi)?)?")
|
||||
def ships(request):
|
||||
yield "# Ships and Outposts\n\n"
|
||||
for ship in os.listdir("/var/gopher"):
|
||||
if not os.path.isdir(os.path.join("/var/gopher",ship)): continue
|
||||
if ship in ("ships","log"): continue
|
||||
entries = count_entries(ship)
|
||||
if entries==0: continue
|
||||
urlencoded = urllib.parse.quote(ship,safe='')
|
||||
yield f"=> /ships/{urlencoded}/ {ship} ({entries!s})\n"
|
||||
|
||||
@wrap_response
|
||||
@app.route("/ships/(?<ship>[^/]+)(?:/(?:index\.gmi)?)?")
|
||||
def ship(request,ship=None):
|
||||
if ship is None: return Response(Status.BAD_REQUEST,"Bad Request")
|
||||
ship_unquoted = urllib.parse.unquote(ship)
|
||||
def __generator():
|
||||
description = os.path.join("/var/gopher",ship_unquoted,".description")
|
||||
if os.path.exists(description):
|
||||
yield from app.load_file(description)
|
||||
yield "\n"
|
||||
if os.path.exists(os.path.join("/var/gopher",ship_unquoted,"ABOUT")):
|
||||
yield f"=> /{ship}/ABOUT About {ship_unquoted}\n"
|
||||
yield f"# {ship_unquoted} - Ship Log\n"
|
||||
yield from reverse_listing_generator(filter_line=lambda line: line.startswith("0"+ship_unquoted))
|
||||
yield '\n\n'
|
||||
year = time.strftime("%Y")
|
||||
author_file = os.path.join("/var/gopher",ship_unquoted,"AUTHOR")
|
||||
if os.path.exists(author_file):
|
||||
with open(author_file) as f:
|
||||
author = f.readline().strip()
|
||||
yield f"=> /{ship}/ABOUT © {year} {author}\n"
|
||||
else:
|
||||
username = (subprocess.run(["/usr/bin/stat","-c","%U",os.path.join("/var/gopher",ship_unquoted)],stdout=subprocess.PIPE).stdout or b"unknown").decode("utf-8").strip()
|
||||
yield f"© {year} {username}\n"
|
||||
license_file = os.path.join("/var/gopher",ship_unquoted,"LICENSE")
|
||||
if os.path.exists(license_file):
|
||||
with open(license_file) as f:
|
||||
license_name = f.readline().strip()
|
||||
yield f"=> /{ship}/LICENSE {license_name}\n"
|
||||
else:
|
||||
yield "All rights reserved.\n"
|
||||
return __generator
|
||||
|
||||
@wrap_generator
|
||||
@app.route("/(?P<format>rss|atom).xml")
|
||||
def feeds(request,format="atom"):
|
||||
with open(os.path.join("/var/gopher",format+".xml")):
|
||||
for line in f:
|
||||
yield line.replace("gopher://cosmic.voyage/0/","gemini://cosmic.voyage/").replace("<link>gopher://cosmic.voyage","<link>gemini://cosmic.voyage")
|
Loading…
Reference in New Issue