2021-10-07 00:28:42 +00:00
import os , mimetypes , subprocess , urllib . parse , time , tempfile
2021-10-02 06:19:14 +00:00
from jetforce . app . base import JetforceApplication , Response , Status
from pathlib import Path
from functools import wraps
mimetypes . init ( )
mimetypes . add_type ( " text/gemini " , " .gmi " )
2021-10-13 10:13:20 +00:00
# The amount of recent entries to show on the homepage
2021-10-02 06:19:14 +00:00
RECENT_ENTRIES_COUNT = 20
2021-10-13 10:13:20 +00:00
# Ships whose descriptions need to be quoted in a pre text block. Key is ship name, value is alt text.
DESC_PRE_SHIPS = {
2023-02-27 13:57:15 +00:00
" anon.hmm.st " : """ An ascii banner split in two halves. The left half reads " anon.hmm.st " in a figlet font. The right half lists the server as AnonServ 0.9.infinity, the readable frontends as QEC, Gemini, Gopher, and Web, as well as listing Email as a write-only frontend. The description under these listings reads " ' Sup losers, it ' s your boy Phantom Override back at it again. They got us once but they ' ll never find us out amongst the stars. Lose lose. " At the bottom of the banner it lists the email address for submissions, which is anonhmmst@cosmic.voyage. """ ,
" anon.penet.fi " : """ An ascii banner split in two uneven halves. The left half reads " anon.penet.fi " in a cool figlet-esque font. The right half reads " Back from the grave after 400 years, you can ' t find us but keep on tryin ' . An anonymous QEC retransmit service: put it here, it never ever goes away. Just send us your stuff on 182.2 KHz. Use QEC: AnonPenetFi@cosmic.voyage. How did we get the name? Very easily! Where are we? Here, there, everywhere. Greets snowcrash, acidburn, lordnikon. countzero, get lost. Special to exclusive services: do you think this is some kind of game? Traffic analysis, for babies. You will never find us but keep trying. Busy time: 39882 failed attacks, and counting! You can ' t stop the signal " . """
2021-10-13 10:13:20 +00:00
}
# Utility lambda to get line count
2021-10-02 06:19:14 +00:00
LINE_COUNT = lambda fn : int ( ( subprocess . run ( [ " /usr/bin/wc " , " -l " , fn ] , stdout = subprocess . PIPE ) . stdout or b " -1 somethingswrong " ) . decode ( " utf-8 " ) . split ( ) [ 0 ] )
class DefaultToGopher ( JetforceApplication ) :
2023-02-27 13:57:15 +00:00
GOPHER_ROOT = Path ( " /var/gopher " )
def default_callback ( self , request , * * _ ) :
"""
Defaults to serving a file out of / var / gopher if one exists .
Since the app itself defines analogues for the gophermap CGI - esques that cosmic
itself uses , we ' ll only end up here if we ' re trying to serve some file that
exists on gopher ( or if it ' s a bad request but that ' s the job of check_request ) .
Implementation loosely based on jetforce . app . static . StaticDirectoryApplication . serve_static_request .
"""
# make sure we can handle this request (since that's the only other reason we'd be here)
if ( resp := self . check_request ( request ) ) : return resp
# normalize the request path
filename = Path ( os . path . normpath ( urllib . parse . unquote ( request . path . strip ( " / " ) ) ) )
# cowardly refuse to serve file outside of gopher root
if filename . is_absolute ( ) or str ( filename ) . startswith ( " .. " ) :
return Response ( Status . BAD_REQUEST , " Detected path traversal " )
# find filesystem path
fs_path = self . GOPHER_ROOT / filename
# check readability
try :
if not os . access ( fs_path , os . R_OK ) :
return Response ( Status . NOT_FOUND , " Not Found " )
except OSError :
return Response ( Status . NOT_FOUND )
if fs_path . is_file ( ) :
mimetype = mimetypes . guess_type ( fs_path , strict = False ) [ 0 ] or " application/octet-stream "
if str ( fs_path ) . endswith ( " gophermap " ) : mimetype = " text/gophermap "
if any ( [ str ( fs_path ) . endswith ( x ) for x in ( " LICENSE " , " AUTHOR " , " .description " ) ] ) : mimetype = " text/plain "
return Response ( Status . SUCCESS , mimetype , self . load_file ( fs_path ) )
else : # don't bother with directory listings
return Response ( Status . NOT_FOUND , " Not Found " )
def load_file ( self , path ) :
with open ( path ) as f :
while ( data := f . read ( 2048 ) ) :
yield data
def check_request ( self , request ) :
if any ( [
request . scheme != " gemini " ,
request . hostname != request . environ [ " HOSTNAME " ] ,
request . port and request . port != request . environ [ " SERVER_PORT " ]
] ) :
return Response ( Status . PROXY_REQUEST_REFUSED , " Proxy Request Refused " )
2021-10-02 06:19:14 +00:00
def abstract_listing_generator ( listing_file , header_file = None , header_text = None , limit_line_count = False , filter_line = lambda raw_line : True , count_from_end = True ) :
2023-02-27 13:57:15 +00:00
"""
Handles listing pages .
` listing_file ` is the file to read as the listing .
There are two ways to give a header to the listing :
- ` header_file ` is the filename of a file to send as the header .
- ` header_text ` is text to send as the header .
If they are both provided , ` header_file ` is sent first , then ` header_text ` .
` limit_line_count ` is either False ( no limit to line count ) or the number of
lines to send at most .
` filter_line ` is a function that returns False when a line should be skipped .
` count_from_end ` affects the line number generation . If it is True , the line
numbers are generated in reverse order ( like in most listings ) . Otherwise , the
line numbers are generated forwards ( like in ship listings ) .
"""
if header_file is not None :
with open ( header_file ) as hdr_f :
yield hdr_f . read ( )
if header_text is not None :
yield header_text
yield " \n \n "
total_line_count = LINE_COUNT ( listing_file )
with open ( listing_file ) as f :
line_count = 0
while ( line_count := line_count + 1 ) and ( line := f . readline ( ) ) :
if limit_line_count and line_count > limit_line_count : return
if not filter_line ( line ) : continue
log_name , log_link = line [ 1 : ] . strip ( ) . split ( " \t " )
if count_from_end :
log_num = total_line_count - line_count
2023-03-17 07:42:49 +00:00
log_num + = 1 # account for numbers starting at 1 and not 0
2023-02-27 13:57:15 +00:00
else :
log_num = line_count
log_link = urllib . parse . quote ( log_link , safe = ' / ' )
yield f " => { log_link } { log_num } >> { log_name } \n "
2021-10-02 06:19:14 +00:00
def listing_generator ( * * kwargs ) :
2023-02-27 13:57:15 +00:00
""" Use the normal listing file. """
return abstract_listing_generator ( " /var/gopher/listing.gophermap " , * * kwargs )
2021-10-02 06:19:14 +00:00
def reverse_listing_generator ( * * kwargs ) :
2023-02-27 13:57:15 +00:00
"""
Use the normal listing file but in reverse . Uses some trickery to ( hopefully
efficiently ) reverse the listing file .
"""
with tempfile . NamedTemporaryFile ( " r " , encoding = " utf-8 " ) as f :
subprocess . run ( " tac /var/gopher/listing.gophermap > " + f . name , shell = True )
yield from abstract_listing_generator ( f . name , count_from_end = False , * * kwargs )
2021-10-02 06:19:14 +00:00
def count_entries ( ship ) :
2023-02-27 13:57:15 +00:00
count = 0
with open ( " /var/gopher/listing.gophermap " ) as f :
for line in f :
if line . startswith ( " 0 " + ship ) : count + = 1
return count
2021-10-02 06:19:14 +00:00
2021-10-07 00:43:50 +00:00
def wrap_response ( mimetype = " text/gemini " ) :
2023-02-27 13:57:15 +00:00
def __wrapfunc ( f ) :
@wraps ( f )
def __wrapper ( * args , * * kwargs ) :
rv = f ( * args , * * kwargs )
if type ( rv ) != Response : return Response ( Status . SUCCESS , " text/gemini " , rv )
return rv
return __wrapper
return __wrapfunc
2021-10-02 06:19:14 +00:00
app = DefaultToGopher ( )
2021-10-06 23:57:26 +00:00
@app.route ( " (?:/(?:index \ .gmi)?)? " )
2021-10-07 00:37:52 +00:00
@wrap_response ( )
2021-10-02 06:19:14 +00:00
def index ( request ) :
2023-02-27 13:57:15 +00:00
return listing_generator ( header_file = " /var/cosmic/templates/geminiintro.tmpl " , limit_line_count = RECENT_ENTRIES_COUNT )
2021-10-02 06:19:14 +00:00
@app.route ( " /log(?:/(?:index \ .gmi)?)? " )
2021-10-07 00:37:52 +00:00
@wrap_response ( )
2021-10-02 06:19:14 +00:00
def log ( request ) :
2023-02-27 13:57:15 +00:00
return listing_generator ( header_text = " RS001 Log Entries (Newest First): " )
2021-10-02 06:19:14 +00:00
@app.route ( " /ships(?:/(?:index \ .gmi)?)? " )
2021-10-07 00:37:52 +00:00
@wrap_response ( )
2021-10-02 06:19:14 +00:00
def ships ( request ) :
2023-02-27 13:57:15 +00:00
yield " # Ships and Outposts \n \n "
for ship in sorted ( os . listdir ( " /var/gopher " ) ) :
if not os . path . isdir ( os . path . join ( " /var/gopher " , ship ) ) : continue
if ship in ( " ships " , " log " ) : continue
entries = count_entries ( ship )
if entries == 0 : continue
urlencoded = urllib . parse . quote ( ship , safe = ' ' )
yield f " => /ships/ { urlencoded } / { ship } ( { entries !s} ) \n "
2021-10-02 06:19:14 +00:00
2021-10-06 23:34:15 +00:00
@app.route ( " /ships/(?P<ship>[^/]+)(?:/(?:index \ .gmi)?)? " )
2021-10-07 00:37:52 +00:00
@wrap_response ( )
2021-10-02 06:19:14 +00:00
def ship ( request , ship = None ) :
2023-02-27 13:57:15 +00:00
global DESC_PRE_SHIPS
if ship is None : return Response ( Status . BAD_REQUEST , " Bad Request " )
ship_unquoted = urllib . parse . unquote ( ship )
urlencoded = urllib . parse . quote ( ship , safe = ' ' )
if not os . path . isdir ( os . path . join ( " /var/gopher " , ship_unquoted ) ) : return Response ( Status . NOT_FOUND , " Not Found " )
def __generator ( ) :
description = os . path . join ( " /var/gopher " , ship_unquoted , " .description " )
if os . path . exists ( description ) :
if ship_unquoted in DESC_PRE_SHIPS :
yield " ``` "
yield DESC_PRE_SHIPS [ ship_unquoted ]
yield " \n "
yield from app . load_file ( description )
if ship_unquoted in DESC_PRE_SHIPS :
yield " ``` "
yield " \n "
if os . path . exists ( os . path . join ( " /var/gopher " , ship_unquoted , " AUTHOR " ) ) :
yield f " => / { urlencoded } /AUTHOR Author { ship_unquoted } \n "
yield f " # { ship_unquoted } - Ship Log \n "
yield from reverse_listing_generator ( filter_line = lambda line : line . startswith ( " 0 " + ship_unquoted ) )
yield ' \n \n '
year = time . strftime ( " % Y " )
author_file = os . path . join ( " /var/gopher " , ship_unquoted , " AUTHOR " )
if os . path . exists ( author_file ) :
with open ( author_file ) as f :
author = f . readline ( ) . strip ( )
yield f " => / { urlencoded } /AUTHOR © { year } { author } \n "
else :
2023-03-17 08:00:41 +00:00
username = ( Path ( " /var/gopher " ) / ship_unquoted ) . owner ( )
2023-02-27 13:57:15 +00:00
yield f " © { year } { username } \n "
license_file = os . path . join ( " /var/gopher " , ship_unquoted , " LICENSE " )
if os . path . exists ( license_file ) :
with open ( license_file ) as f :
license_name = f . readline ( ) . strip ( )
yield f " => / { urlencoded } /LICENSE { license_name } \n "
else :
yield " All rights reserved. \n "
return __generator ( )
2021-10-02 06:19:14 +00:00
@app.route ( " /(?P<format>rss|atom).xml " )
def feeds ( request , format = " atom " ) :
2023-02-27 13:57:15 +00:00
def __generator ( ) :
with open ( os . path . join ( " /var/gopher " , format + " .xml " ) ) as f :
for line in f :
yield line . replace ( " gopher://cosmic.voyage/0/ " , " gemini://cosmic.voyage/ " ) . replace ( " <link>gopher://cosmic.voyage " , " <link>gemini://cosmic.voyage " )
return Response ( Status . SUCCESS , f " application/ { format } +xml " , __generator ( ) )