retab python to spaces

This commit is contained in:
James Tomasino 2023-02-27 13:57:15 +00:00
parent ceff29fc12
commit 604c8987b5
1 changed files with 153 additions and 153 deletions

306
app.py
View File

@ -10,194 +10,194 @@ RECENT_ENTRIES_COUNT = 20
# Ships whose descriptions need to be quoted in a pre text block. Key is ship name, value is alt text. # Ships whose descriptions need to be quoted in a pre text block. Key is ship name, value is alt text.
DESC_PRE_SHIPS = { DESC_PRE_SHIPS = {
"anon.hmm.st": """An ascii banner split in two halves. The left half reads "anon.hmm.st" in a figlet font. The right half lists the server as AnonServ 0.9.infinity, the readable frontends as QEC, Gemini, Gopher, and Web, as well as listing Email as a write-only frontend. The description under these listings reads "'Sup losers, it's your boy Phantom Override back at it again. They got us once but they'll never find us out amongst the stars. Lose lose." At the bottom of the banner it lists the email address for submissions, which is anonhmmst@cosmic.voyage.""", "anon.hmm.st": """An ascii banner split in two halves. The left half reads "anon.hmm.st" in a figlet font. The right half lists the server as AnonServ 0.9.infinity, the readable frontends as QEC, Gemini, Gopher, and Web, as well as listing Email as a write-only frontend. The description under these listings reads "'Sup losers, it's your boy Phantom Override back at it again. They got us once but they'll never find us out amongst the stars. Lose lose." At the bottom of the banner it lists the email address for submissions, which is anonhmmst@cosmic.voyage.""",
"anon.penet.fi": """An ascii banner split in two uneven halves. The left half reads "anon.penet.fi" in a cool figlet-esque font. The right half reads "Back from the grave after 400 years, you can't find us but keep on tryin'. An anonymous QEC retransmit service: put it here, it never ever goes away. Just send us your stuff on 182.2 KHz. Use QEC: AnonPenetFi@cosmic.voyage. How did we get the name? Very easily! Where are we? Here, there, everywhere. Greets snowcrash, acidburn, lordnikon. countzero, get lost. Special to exclusive services: do you think this is some kind of game? Traffic analysis, for babies. You will never find us but keep trying. Busy time: 39882 failed attacks, and counting! You can't stop the signal".""" "anon.penet.fi": """An ascii banner split in two uneven halves. The left half reads "anon.penet.fi" in a cool figlet-esque font. The right half reads "Back from the grave after 400 years, you can't find us but keep on tryin'. An anonymous QEC retransmit service: put it here, it never ever goes away. Just send us your stuff on 182.2 KHz. Use QEC: AnonPenetFi@cosmic.voyage. How did we get the name? Very easily! Where are we? Here, there, everywhere. Greets snowcrash, acidburn, lordnikon. countzero, get lost. Special to exclusive services: do you think this is some kind of game? Traffic analysis, for babies. You will never find us but keep trying. Busy time: 39882 failed attacks, and counting! You can't stop the signal"."""
} }
# Utility lambda to get line count # Utility lambda to get line count
LINE_COUNT = lambda fn: int((subprocess.run(["/usr/bin/wc","-l",fn],stdout=subprocess.PIPE).stdout or b"-1 somethingswrong").decode("utf-8").split()[0]) LINE_COUNT = lambda fn: int((subprocess.run(["/usr/bin/wc","-l",fn],stdout=subprocess.PIPE).stdout or b"-1 somethingswrong").decode("utf-8").split()[0])
class DefaultToGopher(JetforceApplication): class DefaultToGopher(JetforceApplication):
GOPHER_ROOT = Path("/var/gopher") GOPHER_ROOT = Path("/var/gopher")
def default_callback(self, request, **_): def default_callback(self, request, **_):
""" """
Defaults to serving a file out of /var/gopher if one exists. Defaults to serving a file out of /var/gopher if one exists.
Since the app itself defines analogues for the gophermap CGI-esques that cosmic Since the app itself defines analogues for the gophermap CGI-esques that cosmic
itself uses, we'll only end up here if we're trying to serve some file that itself uses, we'll only end up here if we're trying to serve some file that
exists on gopher (or if it's a bad request but that's the job of check_request). exists on gopher (or if it's a bad request but that's the job of check_request).
Implementation loosely based on jetforce.app.static.StaticDirectoryApplication.serve_static_request. Implementation loosely based on jetforce.app.static.StaticDirectoryApplication.serve_static_request.
""" """
# make sure we can handle this request (since that's the only other reason we'd be here) # make sure we can handle this request (since that's the only other reason we'd be here)
if (resp:=self.check_request(request)): return resp if (resp:=self.check_request(request)): return resp
# normalize the request path # normalize the request path
filename = Path(os.path.normpath(urllib.parse.unquote(request.path.strip("/")))) filename = Path(os.path.normpath(urllib.parse.unquote(request.path.strip("/"))))
# cowardly refuse to serve file outside of gopher root # cowardly refuse to serve file outside of gopher root
if filename.is_absolute() or str(filename).startswith(".."): if filename.is_absolute() or str(filename).startswith(".."):
return Response(Status.BAD_REQUEST,"Detected path traversal") return Response(Status.BAD_REQUEST,"Detected path traversal")
# find filesystem path # find filesystem path
fs_path = self.GOPHER_ROOT / filename fs_path = self.GOPHER_ROOT / filename
# check readability # check readability
try: try:
if not os.access(fs_path, os.R_OK): if not os.access(fs_path, os.R_OK):
return Response(Status.NOT_FOUND,"Not Found") return Response(Status.NOT_FOUND,"Not Found")
except OSError: except OSError:
return Response(Status.NOT_FOUND) return Response(Status.NOT_FOUND)
if fs_path.is_file(): if fs_path.is_file():
mimetype = mimetypes.guess_type(fs_path,strict=False)[0] or "application/octet-stream" mimetype = mimetypes.guess_type(fs_path,strict=False)[0] or "application/octet-stream"
if str(fs_path).endswith("gophermap"): mimetype = "text/gophermap" if str(fs_path).endswith("gophermap"): mimetype = "text/gophermap"
if any([str(fs_path).endswith(x) for x in ("LICENSE","AUTHOR",".description")]): mimetype = "text/plain" if any([str(fs_path).endswith(x) for x in ("LICENSE","AUTHOR",".description")]): mimetype = "text/plain"
return Response(Status.SUCCESS,mimetype,self.load_file(fs_path)) return Response(Status.SUCCESS,mimetype,self.load_file(fs_path))
else: # don't bother with directory listings else: # don't bother with directory listings
return Response(Status.NOT_FOUND,"Not Found") return Response(Status.NOT_FOUND,"Not Found")
def load_file(self,path): def load_file(self,path):
with open(path) as f: with open(path) as f:
while (data:=f.read(2048)): while (data:=f.read(2048)):
yield data yield data
def check_request(self,request): def check_request(self,request):
if any([ if any([
request.scheme!="gemini", request.scheme!="gemini",
request.hostname!=request.environ["HOSTNAME"], request.hostname!=request.environ["HOSTNAME"],
request.port and request.port!=request.environ["SERVER_PORT"] request.port and request.port!=request.environ["SERVER_PORT"]
]): ]):
return Response(Status.PROXY_REQUEST_REFUSED,"Proxy Request Refused") return Response(Status.PROXY_REQUEST_REFUSED,"Proxy Request Refused")
def abstract_listing_generator(listing_file,header_file=None,header_text=None,limit_line_count=False,filter_line=lambda raw_line: True,count_from_end=True): def abstract_listing_generator(listing_file,header_file=None,header_text=None,limit_line_count=False,filter_line=lambda raw_line: True,count_from_end=True):
""" """
Handles listing pages. Handles listing pages.
`listing_file` is the file to read as the listing. `listing_file` is the file to read as the listing.
There are two ways to give a header to the listing: There are two ways to give a header to the listing:
- `header_file` is the filename of a file to send as the header. - `header_file` is the filename of a file to send as the header.
- `header_text` is text to send as the header. - `header_text` is text to send as the header.
If they are both provided, `header_file` is sent first, then `header_text`. If they are both provided, `header_file` is sent first, then `header_text`.
`limit_line_count` is either False (no limit to line count) or the number of `limit_line_count` is either False (no limit to line count) or the number of
lines to send at most. lines to send at most.
`filter_line` is a function that returns False when a line should be skipped. `filter_line` is a function that returns False when a line should be skipped.
`count_from_end` affects the line number generation. If it is True, the line `count_from_end` affects the line number generation. If it is True, the line
numbers are generated in reverse order (like in most listings). Otherwise, the numbers are generated in reverse order (like in most listings). Otherwise, the
line numbers are generated forwards (like in ship listings). line numbers are generated forwards (like in ship listings).
""" """
if header_file is not None: if header_file is not None:
with open(header_file) as hdr_f: with open(header_file) as hdr_f:
yield hdr_f.read() yield hdr_f.read()
if header_text is not None: if header_text is not None:
yield header_text yield header_text
yield "\n\n" yield "\n\n"
total_line_count = LINE_COUNT(listing_file) total_line_count = LINE_COUNT(listing_file)
with open(listing_file) as f: with open(listing_file) as f:
line_count = 0 line_count = 0
while (line_count:=line_count+1) and (line:=f.readline()): while (line_count:=line_count+1) and (line:=f.readline()):
if limit_line_count and line_count>limit_line_count: return if limit_line_count and line_count>limit_line_count: return
if not filter_line(line): continue if not filter_line(line): continue
log_name, log_link = line[1:].strip().split("\t") log_name, log_link = line[1:].strip().split("\t")
if count_from_end: if count_from_end:
log_num = total_line_count-line_count log_num = total_line_count-line_count
else: else:
log_num = line_count log_num = line_count
log_num += 1 # account for numbers starting at 1 and not 0 log_num += 1 # account for numbers starting at 1 and not 0
log_link = urllib.parse.quote(log_link,safe='/') log_link = urllib.parse.quote(log_link,safe='/')
yield f"=> {log_link} {log_num} >> {log_name}\n" yield f"=> {log_link} {log_num} >> {log_name}\n"
def listing_generator(**kwargs): def listing_generator(**kwargs):
"""Use the normal listing file.""" """Use the normal listing file."""
return abstract_listing_generator("/var/gopher/listing.gophermap",**kwargs) return abstract_listing_generator("/var/gopher/listing.gophermap",**kwargs)
def reverse_listing_generator(**kwargs): def reverse_listing_generator(**kwargs):
""" """
Use the normal listing file but in reverse. Uses some trickery to (hopefully Use the normal listing file but in reverse. Uses some trickery to (hopefully
efficiently) reverse the listing file. efficiently) reverse the listing file.
""" """
with tempfile.NamedTemporaryFile("r",encoding="utf-8") as f: with tempfile.NamedTemporaryFile("r",encoding="utf-8") as f:
subprocess.run("tac /var/gopher/listing.gophermap > "+f.name,shell=True) subprocess.run("tac /var/gopher/listing.gophermap > "+f.name,shell=True)
yield from abstract_listing_generator(f.name,count_from_end=False,**kwargs) yield from abstract_listing_generator(f.name,count_from_end=False,**kwargs)
def count_entries(ship): def count_entries(ship):
count = 0 count = 0
with open("/var/gopher/listing.gophermap") as f: with open("/var/gopher/listing.gophermap") as f:
for line in f: for line in f:
if line.startswith("0"+ship): count+=1 if line.startswith("0"+ship): count+=1
return count return count
def wrap_response(mimetype="text/gemini"): def wrap_response(mimetype="text/gemini"):
def __wrapfunc(f): def __wrapfunc(f):
@wraps(f) @wraps(f)
def __wrapper(*args,**kwargs): def __wrapper(*args,**kwargs):
rv = f(*args,**kwargs) rv = f(*args,**kwargs)
if type(rv)!=Response: return Response(Status.SUCCESS,"text/gemini",rv) if type(rv)!=Response: return Response(Status.SUCCESS,"text/gemini",rv)
return rv return rv
return __wrapper return __wrapper
return __wrapfunc return __wrapfunc
app = DefaultToGopher() app = DefaultToGopher()
@app.route("(?:/(?:index\.gmi)?)?") @app.route("(?:/(?:index\.gmi)?)?")
@wrap_response() @wrap_response()
def index(request): def index(request):
return listing_generator(header_file="/var/cosmic/templates/geminiintro.tmpl",limit_line_count=RECENT_ENTRIES_COUNT) return listing_generator(header_file="/var/cosmic/templates/geminiintro.tmpl",limit_line_count=RECENT_ENTRIES_COUNT)
@app.route("/log(?:/(?:index\.gmi)?)?") @app.route("/log(?:/(?:index\.gmi)?)?")
@wrap_response() @wrap_response()
def log(request): def log(request):
return listing_generator(header_text="RS001 Log Entries (Newest First):") return listing_generator(header_text="RS001 Log Entries (Newest First):")
@app.route("/ships(?:/(?:index\.gmi)?)?") @app.route("/ships(?:/(?:index\.gmi)?)?")
@wrap_response() @wrap_response()
def ships(request): def ships(request):
yield "# Ships and Outposts\n\n" yield "# Ships and Outposts\n\n"
for ship in sorted(os.listdir("/var/gopher")): for ship in sorted(os.listdir("/var/gopher")):
if not os.path.isdir(os.path.join("/var/gopher",ship)): continue if not os.path.isdir(os.path.join("/var/gopher",ship)): continue
if ship in ("ships","log"): continue if ship in ("ships","log"): continue
entries = count_entries(ship) entries = count_entries(ship)
if entries==0: continue if entries==0: continue
urlencoded = urllib.parse.quote(ship,safe='') urlencoded = urllib.parse.quote(ship,safe='')
yield f"=> /ships/{urlencoded}/ {ship} ({entries!s})\n" yield f"=> /ships/{urlencoded}/ {ship} ({entries!s})\n"
@app.route("/ships/(?P<ship>[^/]+)(?:/(?:index\.gmi)?)?") @app.route("/ships/(?P<ship>[^/]+)(?:/(?:index\.gmi)?)?")
@wrap_response() @wrap_response()
def ship(request,ship=None): def ship(request,ship=None):
global DESC_PRE_SHIPS global DESC_PRE_SHIPS
if ship is None: return Response(Status.BAD_REQUEST,"Bad Request") if ship is None: return Response(Status.BAD_REQUEST,"Bad Request")
ship_unquoted = urllib.parse.unquote(ship) ship_unquoted = urllib.parse.unquote(ship)
urlencoded = urllib.parse.quote(ship,safe='') urlencoded = urllib.parse.quote(ship,safe='')
if not os.path.isdir(os.path.join("/var/gopher",ship_unquoted)): return Response(Status.NOT_FOUND,"Not Found") if not os.path.isdir(os.path.join("/var/gopher",ship_unquoted)): return Response(Status.NOT_FOUND,"Not Found")
def __generator(): def __generator():
description = os.path.join("/var/gopher",ship_unquoted,".description") description = os.path.join("/var/gopher",ship_unquoted,".description")
if os.path.exists(description): if os.path.exists(description):
if ship_unquoted in DESC_PRE_SHIPS: if ship_unquoted in DESC_PRE_SHIPS:
yield "```" yield "```"
yield DESC_PRE_SHIPS[ship_unquoted] yield DESC_PRE_SHIPS[ship_unquoted]
yield "\n" yield "\n"
yield from app.load_file(description) yield from app.load_file(description)
if ship_unquoted in DESC_PRE_SHIPS: if ship_unquoted in DESC_PRE_SHIPS:
yield "```" yield "```"
yield "\n" yield "\n"
if os.path.exists(os.path.join("/var/gopher",ship_unquoted,"AUTHOR")): if os.path.exists(os.path.join("/var/gopher",ship_unquoted,"AUTHOR")):
yield f"=> /{urlencoded}/AUTHOR Author {ship_unquoted}\n" yield f"=> /{urlencoded}/AUTHOR Author {ship_unquoted}\n"
yield f"# {ship_unquoted} - Ship Log\n" yield f"# {ship_unquoted} - Ship Log\n"
yield from reverse_listing_generator(filter_line=lambda line: line.startswith("0"+ship_unquoted)) yield from reverse_listing_generator(filter_line=lambda line: line.startswith("0"+ship_unquoted))
yield '\n\n' yield '\n\n'
year = time.strftime("%Y") year = time.strftime("%Y")
author_file = os.path.join("/var/gopher",ship_unquoted,"AUTHOR") author_file = os.path.join("/var/gopher",ship_unquoted,"AUTHOR")
if os.path.exists(author_file): if os.path.exists(author_file):
with open(author_file) as f: with open(author_file) as f:
author = f.readline().strip() author = f.readline().strip()
yield f"=> /{urlencoded}/AUTHOR © {year} {author}\n" yield f"=> /{urlencoded}/AUTHOR © {year} {author}\n"
else: else:
username = (subprocess.run(["/usr/bin/stat","-c","%U",os.path.join("/var/gopher",ship_unquoted)],stdout=subprocess.PIPE).stdout or b"unknown").decode("utf-8").strip() username = (subprocess.run(["/usr/bin/stat","-c","%U",os.path.join("/var/gopher",ship_unquoted)],stdout=subprocess.PIPE).stdout or b"unknown").decode("utf-8").strip()
yield f"© {year} {username}\n" yield f"© {year} {username}\n"
license_file = os.path.join("/var/gopher",ship_unquoted,"LICENSE") license_file = os.path.join("/var/gopher",ship_unquoted,"LICENSE")
if os.path.exists(license_file): if os.path.exists(license_file):
with open(license_file) as f: with open(license_file) as f:
license_name = f.readline().strip() license_name = f.readline().strip()
yield f"=> /{urlencoded}/LICENSE {license_name}\n" yield f"=> /{urlencoded}/LICENSE {license_name}\n"
else: else:
yield "All rights reserved.\n" yield "All rights reserved.\n"
return __generator() return __generator()
@app.route("/(?P<format>rss|atom).xml") @app.route("/(?P<format>rss|atom).xml")
def feeds(request,format="atom"): def feeds(request,format="atom"):
def __generator(): def __generator():
with open(os.path.join("/var/gopher",format+".xml")) as f: with open(os.path.join("/var/gopher",format+".xml")) as f:
for line in f: for line in f:
yield line.replace("gopher://cosmic.voyage/0/","gemini://cosmic.voyage/").replace("<link>gopher://cosmic.voyage","<link>gemini://cosmic.voyage") yield line.replace("gopher://cosmic.voyage/0/","gemini://cosmic.voyage/").replace("<link>gopher://cosmic.voyage","<link>gemini://cosmic.voyage")
return Response(Status.SUCCESS,f"application/{format}+xml",__generator()) return Response(Status.SUCCESS,f"application/{format}+xml",__generator())