#!/usr/bin/env python3 import os import json import time from typing import Optional from db import DB from post import Post from auth import CA from urllib.parse import unquote, urlparse class MessageBoard: def __init__(self, settings_path: str, env: dict) -> None: self.env: dict = env self.request_query: str = unquote(urlparse(env["GEMINI_URL"]).query) self.request_path: str = self.env["PATH_INFO"] with open(settings_path, "r") as settings_file: self.settings: dict = json.load(settings_file) self.ca: CA = CA( self.settings["server"]["hostname"], self.settings["auth"]["caCert"], self.settings["auth"]["caKey"], self.settings["auth"]["caCRL"], None, ) self.db: DB = DB(self.settings) def register_user(self) -> None: response_body: str user_key: str user_cert: str new_username: str = self.request_query user_data: tuple if new_username == "": print("10", "Enter the username you would like to register (please read the instructions first)") return None user_data = self.ca.generate_user_cert(new_username) if user_data == (): response_body = "Could not create user: an error occured while generating the certificate" else: user_key = user_data[0] user_cert = user_data[1] user_cert_hash = user_data[2] if self.db.add_user(username=new_username, user_cert=user_cert, cert_hash=user_cert_hash): response_body = "CLIENT CERT:\n{}\n\nKEY:\n{}".format( user_cert, user_key ) else: response_body = "Could not create user: user already exists\n" print("20", "text/plain") print(response_body) return None def auth_user(self, post: Optional[Post] = None) -> bool: client_hash: str client_cert: Optional[str] is_authorized: bool = False try: is_authorized = bool(int(self.env["TLS_CLIENT_AUTHORISED"])) except KeyError: print("60", "You need a certificate in order to do that.") return False except ValueError: # sometimes this is "True/False" instead of "1/0" (????) is_authorized = bool(self.env["TLS_CLIENT_AUTHORISED"]) if is_authorized != True: print("61", "You are not using a registered certificate.") return False client_hash = self.env["TLS_CLIENT_HASH"] if post is not None and post.cert_hash != client_hash: print("61", "You are not authorized to modify this post.") return False client_cert = self.db.hash_to_cert(client_hash) if client_cert is None: print("61", "Cert hash not in db.") return False if self.ca.verify_cert(client_cert) != True: print("61", "You are banned.") return False return True def fetch_post(self) -> None: body: str post: Optional[Post] post_path = self.request_path.strip("/") if self.request_query.split("=")[0] == "page": page_query: str = self.request_query.split("=")[1] page_num: Optional[int] = int(page_query) if page_query.isnumeric() else None post = self.db.get_post_by_path( path=post_path, page_num=page_num ) else: post = self.db.get_post_by_path(path=post_path) if post is None: print("51", 'Could not find post "{}"'.format(post_path)) return None else: body = post.render() print("20", "text/gemini") print(body) def handle_submission(self) -> None: post_settings: dict child_settings: dict username: str post_path: str = "/".join(self.request_path.strip("/").split("/")[:-1]) post: Optional[Post] = self.db.get_post_by_path( path=post_path ) if post is None: print("51", 'Could not submit: Could not find post "{}"'.format(post_path)) return None else: post_settings = self.settings["postTypes"][post.post_type] child_settings = self.settings["postTypes"][post_settings["childType"]] if post_settings["allowSubmission"] != True: print("20", "text/plain") print("That post cannot be submitted to.") return None if self.request_query != "": new_post_id = self.db.latest_post_id() + 1 if child_settings["titled"]: self.db.add_post( ip_address=self.env["REMOTE_ADDR"], post_type=post_settings["childType"], post_id=new_post_id, timestamp=time.time_ns(), path=str(new_post_id), title=self.request_query, username=self.env["REMOTE_USER"], cert_hash=self.env["TLS_CLIENT_HASH"], parent=post.post_id, ) else: self.db.add_post( ip_address=self.env["REMOTE_ADDR"], post_type=post_settings["childType"], post_id=new_post_id, timestamp=time.time_ns(), path=str(new_post_id), title=str(new_post_id), text=self.request_query, username=self.env["REMOTE_USER"], cert_hash=self.env["TLS_CLIENT_HASH"], parent=post.post_id, ) print("30", "/".join(self.env["GEMINI_URL"].split("/")[:-1])) return None else: if child_settings["titled"]: print("10", "Enter the title for your post: ") return None else: print("10", "Enter the text for your post: ") return None def handle_append(self) -> Optional[bool]: auth_status: bool post_settings: dict post_path: str = "/".join(self.request_path.strip("/").split("/")[:-1]) post: Optional[Post] = self.db.get_post_by_path( path=post_path ) if post is None: print("51", 'Could not append: Could not find post "{}"'.format(post_path)) return None else: auth_status = self.auth_user(post) if auth_status != True: return auth_status post_settings = self.settings["postTypes"][post.post_type] if self.request_query != "": self.db.append_to_post( post_id=post.post_id, text=self.request_query.replace(r"\n", "\n"), ) print("30", "/".join(self.env["GEMINI_URL"].split("/")[:-1])) return None else: if post_settings["allowAppend"] == False: print("20", "text/plain") print("This post cannot be appended to.") return None else: print("10", "Enter the text to append to your post: ") return None def handle_request(self) -> Optional[bool]: basename: str if self.request_path == "": print("30", "{}/boards\r\n".format(self.env["GEMINI_URL"])) return None if self.env["GEMINI_URL"][-1] == "/": print("30", self.env["GEMINI_URL"].strip("/")) return None if self.request_path == "/register": self.register_user() return None basename = self.request_path.split("/")[-1] if basename == "submit": auth_status = self.auth_user() if auth_status == True: self.handle_submission() return None elif basename == "append": auth_status = self.auth_user() if auth_status == True: self.handle_append() return None else: self.fetch_post() return None def main() -> None: os.chdir(os.path.dirname(os.path.realpath(__file__))) motm = MessageBoard("config/settings.json", dict(os.environ)) motm.handle_request()