249 lines
8.3 KiB
Python
249 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import json
|
|
import time
|
|
from typing import Optional
|
|
from db import DB
|
|
from post import Post
|
|
from auth import CA
|
|
from urllib.parse import unquote, urlparse
|
|
|
|
class MessageBoard:
|
|
|
|
def __init__(self, settings_path: str, env: dict) -> None:
|
|
self.env: dict = env
|
|
|
|
self.request_query: str = unquote(urlparse(env["GEMINI_URL"]).query)
|
|
self.request_path: str = self.env["PATH_INFO"]
|
|
|
|
with open(settings_path, "r") as settings_file:
|
|
self.settings: dict = json.load(settings_file)
|
|
|
|
self.ca: CA = CA(
|
|
self.settings["server"]["hostname"],
|
|
self.settings["auth"]["caCert"],
|
|
self.settings["auth"]["caKey"],
|
|
self.settings["auth"]["caCRL"],
|
|
None,
|
|
)
|
|
|
|
self.db: DB = DB(self.settings)
|
|
|
|
def register_user(self) -> None:
|
|
response_body: str
|
|
user_key: str
|
|
user_cert: str
|
|
new_username: str = self.request_query
|
|
user_data: tuple
|
|
|
|
if new_username == "":
|
|
print("10", "Enter the username you would like to register (please read the instructions first)")
|
|
return None
|
|
|
|
user_data = self.ca.generate_user_cert(new_username)
|
|
|
|
if user_data == ():
|
|
response_body = "Could not create user: an error occured while generating the certificate"
|
|
else:
|
|
user_key = user_data[0]
|
|
user_cert = user_data[1]
|
|
user_cert_hash = user_data[2]
|
|
|
|
if self.db.add_user(username=new_username, user_cert=user_cert, cert_hash=user_cert_hash):
|
|
response_body = "CLIENT CERT:\n{}\n\nKEY:\n{}".format(
|
|
user_cert, user_key
|
|
)
|
|
else:
|
|
response_body = "Could not create user: user already exists\n"
|
|
|
|
print("20", "text/plain")
|
|
print(response_body)
|
|
return None
|
|
|
|
def auth_user(self, post: Optional[Post] = None) -> bool:
|
|
client_hash: str
|
|
client_cert: Optional[str]
|
|
is_authorized: bool = False
|
|
try:
|
|
is_authorized = bool(int(self.env["TLS_CLIENT_AUTHORISED"]))
|
|
except KeyError:
|
|
print("60", "You need a certificate in order to do that.")
|
|
return False
|
|
except ValueError: # sometimes this is "True/False" instead of "1/0" (????)
|
|
is_authorized = bool(self.env["TLS_CLIENT_AUTHORISED"])
|
|
|
|
if is_authorized != True:
|
|
print("61", "You are not using a registered certificate.")
|
|
return False
|
|
|
|
client_hash = self.env["TLS_CLIENT_HASH"]
|
|
if post is not None and post.cert_hash != client_hash:
|
|
print("61", "You are not authorized to modify this post.")
|
|
return False
|
|
|
|
client_cert = self.db.hash_to_cert(client_hash)
|
|
if client_cert is None:
|
|
print("61", "Cert hash not in db.")
|
|
return False
|
|
if self.ca.verify_cert(client_cert) != True:
|
|
print("61", "You are banned.")
|
|
return False
|
|
|
|
return True
|
|
|
|
def fetch_post(self) -> None:
|
|
body: str
|
|
post: Optional[Post]
|
|
|
|
post_path = self.request_path.strip("/")
|
|
|
|
if self.request_query.split("=")[0] == "page":
|
|
page_query: str = self.request_query.split("=")[1]
|
|
page_num: Optional[int] = int(page_query) if page_query.isnumeric() else None
|
|
post = self.db.get_post_by_path(
|
|
path=post_path, page_num=page_num
|
|
)
|
|
else:
|
|
post = self.db.get_post_by_path(path=post_path)
|
|
|
|
if post is None:
|
|
print("51", 'Could not find post "{}"'.format(post_path))
|
|
return None
|
|
else:
|
|
body = post.render()
|
|
|
|
print("20", "text/gemini")
|
|
print(body)
|
|
|
|
def handle_submission(self) -> None:
|
|
post_settings: dict
|
|
child_settings: dict
|
|
username: str
|
|
post_path: str = "/".join(self.request_path.strip("/").split("/")[:-1])
|
|
|
|
post: Optional[Post] = self.db.get_post_by_path(
|
|
path=post_path
|
|
)
|
|
|
|
if post is None:
|
|
print("51", 'Could not submit: Could not find post "{}"'.format(post_path))
|
|
return None
|
|
else:
|
|
post_settings = self.settings["postTypes"][post.post_type]
|
|
child_settings = self.settings["postTypes"][post_settings["childType"]]
|
|
|
|
if post_settings["allowSubmission"] != True:
|
|
print("20", "text/plain")
|
|
print("That post cannot be submitted to.")
|
|
return None
|
|
|
|
if self.request_query != "":
|
|
new_post_id = self.db.latest_post_id() + 1
|
|
|
|
if child_settings["titled"]:
|
|
self.db.add_post(
|
|
ip_address=self.env["REMOTE_ADDR"],
|
|
post_type=post_settings["childType"],
|
|
post_id=new_post_id,
|
|
timestamp=time.time_ns(),
|
|
path=str(new_post_id),
|
|
title=self.request_query,
|
|
username=self.env["REMOTE_USER"],
|
|
cert_hash=self.env["TLS_CLIENT_HASH"],
|
|
parent=post.post_id,
|
|
)
|
|
else:
|
|
self.db.add_post(
|
|
ip_address=self.env["REMOTE_ADDR"],
|
|
post_type=post_settings["childType"],
|
|
post_id=new_post_id,
|
|
timestamp=time.time_ns(),
|
|
path=str(new_post_id),
|
|
title=str(new_post_id),
|
|
text=self.request_query,
|
|
username=self.env["REMOTE_USER"],
|
|
cert_hash=self.env["TLS_CLIENT_HASH"],
|
|
parent=post.post_id,
|
|
)
|
|
print("30", "/".join(self.env["GEMINI_URL"].split("/")[:-1]))
|
|
return None
|
|
else:
|
|
if child_settings["titled"]:
|
|
print("10", "Enter the title for your post: ")
|
|
return None
|
|
else:
|
|
print("10", "Enter the text for your post: ")
|
|
return None
|
|
|
|
def handle_append(self) -> Optional[bool]:
|
|
auth_status: bool
|
|
post_settings: dict
|
|
post_path: str = "/".join(self.request_path.strip("/").split("/")[:-1])
|
|
|
|
post: Optional[Post] = self.db.get_post_by_path(
|
|
path=post_path
|
|
)
|
|
|
|
if post is None:
|
|
print("51", 'Could not append: Could not find post "{}"'.format(post_path))
|
|
return None
|
|
|
|
else:
|
|
auth_status = self.auth_user(post)
|
|
if auth_status != True:
|
|
return auth_status
|
|
|
|
post_settings = self.settings["postTypes"][post.post_type]
|
|
if self.request_query != "":
|
|
self.db.append_to_post(
|
|
post_id=post.post_id,
|
|
text=self.request_query.replace(r"\n", "\n"),
|
|
)
|
|
print("30", "/".join(self.env["GEMINI_URL"].split("/")[:-1]))
|
|
return None
|
|
else:
|
|
if post_settings["allowAppend"] == False:
|
|
print("20", "text/plain")
|
|
print("This post cannot be appended to.")
|
|
return None
|
|
else:
|
|
print("10", "Enter the text to append to your post: ")
|
|
return None
|
|
|
|
def handle_request(self) -> Optional[bool]:
|
|
basename: str
|
|
|
|
if self.request_path == "":
|
|
print("30", "{}/boards\r\n".format(self.env["GEMINI_URL"]))
|
|
return None
|
|
|
|
if self.env["GEMINI_URL"][-1] == "/":
|
|
print("30", self.env["GEMINI_URL"].strip("/"))
|
|
return None
|
|
|
|
if self.request_path == "/register":
|
|
self.register_user()
|
|
return None
|
|
|
|
basename = self.request_path.split("/")[-1]
|
|
|
|
if basename == "submit":
|
|
auth_status = self.auth_user()
|
|
if auth_status == True:
|
|
self.handle_submission()
|
|
return None
|
|
|
|
elif basename == "append":
|
|
auth_status = self.auth_user()
|
|
if auth_status == True:
|
|
self.handle_append()
|
|
return None
|
|
else:
|
|
self.fetch_post()
|
|
return None
|
|
|
|
def main() -> None:
|
|
os.chdir(os.path.dirname(os.path.realpath(__file__)))
|
|
motm = MessageBoard("config/settings.json", dict(os.environ))
|
|
motm.handle_request()
|