motm/motm.py

249 lines
8.3 KiB
Python

#!/usr/bin/env python3
import os
import json
import time
from typing import Optional
from db import DB
from post import Post
from auth import CA
from urllib.parse import unquote, urlparse
class MessageBoard:
def __init__(self, settings_path: str, env: dict) -> None:
self.env: dict = env
self.request_query: str = unquote(urlparse(env["GEMINI_URL"]).query)
self.request_path: str = self.env["PATH_INFO"]
with open(settings_path, "r") as settings_file:
self.settings: dict = json.load(settings_file)
self.ca: CA = CA(
self.settings["server"]["hostname"],
self.settings["auth"]["caCert"],
self.settings["auth"]["caKey"],
self.settings["auth"]["caCRL"],
None,
)
self.db: DB = DB(self.settings)
def register_user(self) -> None:
response_body: str
user_key: str
user_cert: str
new_username: str = self.request_query
user_data: tuple
if new_username == "":
print("10", "Enter the username you would like to register (please read the instructions first)")
return None
user_data = self.ca.generate_user_cert(new_username)
if user_data == ():
response_body = "Could not create user: an error occured while generating the certificate"
else:
user_key = user_data[0]
user_cert = user_data[1]
user_cert_hash = user_data[2]
if self.db.add_user(username=new_username, user_cert=user_cert, cert_hash=user_cert_hash):
response_body = "CLIENT CERT:\n{}\n\nKEY:\n{}".format(
user_cert, user_key
)
else:
response_body = "Could not create user: user already exists\n"
print("20", "text/plain")
print(response_body)
return None
def auth_user(self, post: Optional[Post] = None) -> bool:
client_hash: str
client_cert: Optional[str]
is_authorized: bool = False
try:
is_authorized = bool(int(self.env["TLS_CLIENT_AUTHORISED"]))
except KeyError:
print("60", "You need a certificate in order to do that.")
return False
except ValueError: # sometimes this is "True/False" instead of "1/0" (????)
is_authorized = bool(self.env["TLS_CLIENT_AUTHORISED"])
if is_authorized != True:
print("61", "You are not using a registered certificate.")
return False
client_hash = self.env["TLS_CLIENT_HASH"]
if post is not None and post.cert_hash != client_hash:
print("61", "You are not authorized to modify this post.")
return False
client_cert = self.db.hash_to_cert(client_hash)
if client_cert is None:
print("61", "Cert hash not in db.")
return False
if self.ca.verify_cert(client_cert) != True:
print("61", "You are banned.")
return False
return True
def fetch_post(self) -> None:
body: str
post: Optional[Post]
post_path = self.request_path.strip("/")
if self.request_query.split("=")[0] == "page":
page_query: str = self.request_query.split("=")[1]
page_num: Optional[int] = int(page_query) if page_query.isnumeric() else None
post = self.db.get_post_by_path(
path=post_path, page_num=page_num
)
else:
post = self.db.get_post_by_path(path=post_path)
if post is None:
print("51", 'Could not find post "{}"'.format(post_path))
return None
else:
body = post.render()
print("20", "text/gemini")
print(body)
def handle_submission(self) -> None:
post_settings: dict
child_settings: dict
username: str
post_path: str = "/".join(self.request_path.strip("/").split("/")[:-1])
post: Optional[Post] = self.db.get_post_by_path(
path=post_path
)
if post is None:
print("51", 'Could not submit: Could not find post "{}"'.format(post_path))
return None
else:
post_settings = self.settings["postTypes"][post.post_type]
child_settings = self.settings["postTypes"][post_settings["childType"]]
if post_settings["allowSubmission"] != True:
print("20", "text/plain")
print("That post cannot be submitted to.")
return None
if self.request_query != "":
new_post_id = self.db.latest_post_id() + 1
if child_settings["titled"]:
self.db.add_post(
ip_address=self.env["REMOTE_ADDR"],
post_type=post_settings["childType"],
post_id=new_post_id,
timestamp=time.time_ns(),
path=str(new_post_id),
title=self.request_query,
username=self.env["REMOTE_USER"],
cert_hash=self.env["TLS_CLIENT_HASH"],
parent=post.post_id,
)
else:
self.db.add_post(
ip_address=self.env["REMOTE_ADDR"],
post_type=post_settings["childType"],
post_id=new_post_id,
timestamp=time.time_ns(),
path=str(new_post_id),
title=str(new_post_id),
text=self.request_query,
username=self.env["REMOTE_USER"],
cert_hash=self.env["TLS_CLIENT_HASH"],
parent=post.post_id,
)
print("30", "/".join(self.env["GEMINI_URL"].split("/")[:-1]))
return None
else:
if child_settings["titled"]:
print("10", "Enter the title for your post: ")
return None
else:
print("10", "Enter the text for your post: ")
return None
def handle_append(self) -> Optional[bool]:
auth_status: bool
post_settings: dict
post_path: str = "/".join(self.request_path.strip("/").split("/")[:-1])
post: Optional[Post] = self.db.get_post_by_path(
path=post_path
)
if post is None:
print("51", 'Could not append: Could not find post "{}"'.format(post_path))
return None
else:
auth_status = self.auth_user(post)
if auth_status != True:
return auth_status
post_settings = self.settings["postTypes"][post.post_type]
if self.request_query != "":
self.db.append_to_post(
post_id=post.post_id,
text=self.request_query.replace(r"\n", "\n"),
)
print("30", "/".join(self.env["GEMINI_URL"].split("/")[:-1]))
return None
else:
if post_settings["allowAppend"] == False:
print("20", "text/plain")
print("This post cannot be appended to.")
return None
else:
print("10", "Enter the text to append to your post: ")
return None
def handle_request(self) -> Optional[bool]:
basename: str
if self.request_path == "":
print("30", "{}/boards\r\n".format(self.env["GEMINI_URL"]))
return None
if self.env["GEMINI_URL"][-1] == "/":
print("30", self.env["GEMINI_URL"].strip("/"))
return None
if self.request_path == "/register":
self.register_user()
return None
basename = self.request_path.split("/")[-1]
if basename == "submit":
auth_status = self.auth_user()
if auth_status == True:
self.handle_submission()
return None
elif basename == "append":
auth_status = self.auth_user()
if auth_status == True:
self.handle_append()
return None
else:
self.fetch_post()
return None
def main() -> None:
os.chdir(os.path.dirname(os.path.realpath(__file__)))
motm = MessageBoard("config/settings.json", dict(os.environ))
motm.handle_request()