435 lines
16 KiB
Python
435 lines
16 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import socket
|
|
import datetime
|
|
import pytz
|
|
import OpenSSL.SSL as ossl
|
|
import OpenSSL.crypto as ocrypt
|
|
|
|
from cryptography import x509
|
|
from cryptography.x509 import NameOID, ExtensionOID
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
|
|
# _______
|
|
# |== []| misfin(b) protocol
|
|
# | ==== | implemented in one file
|
|
# '-------' lem (2023)
|
|
|
|
|
|
# ----------
|
|
# Certificate handling.
|
|
|
|
# A nice round number...
|
|
default_expiry = datetime.timedelta(days=32768)
|
|
|
|
class Identity:
|
|
""" An identified user, either local (i.e. with a private key) or a peer. """
|
|
def __init__(self, cert, private=None, password=None):
|
|
""" Load an Identity from certificate and key objects, or from PEM data. """
|
|
if isinstance(cert, bytes):
|
|
self._cert = x509.load_pem_x509_certificate(cert)
|
|
elif isinstance(cert, ocrypt.X509):
|
|
self._cert = cert.to_cryptography()
|
|
elif isinstance(cert, x509.Certificate):
|
|
self._cert = cert
|
|
else:
|
|
raise TypeError("Can't load certificate")
|
|
|
|
if isinstance(private, bytes):
|
|
self._private = serialization.load_pem_private_key(private, password=password)
|
|
elif isinstance(private, rsa.RSAPrivateKey) or private is None:
|
|
self._private = private
|
|
else:
|
|
raise TypeError("Can't load private key")
|
|
|
|
def _build_name(mailbox, blurb, additional_names=[]):
|
|
""" Builds an x509 Name with the right format for a Misfin certificate. """
|
|
mandatory = [x509.NameAttribute(NameOID.USER_ID, mailbox), x509.NameAttribute(NameOID.COMMON_NAME, blurb)]
|
|
return x509.Name(mandatory + additional_names)
|
|
|
|
def _build_key():
|
|
""" Common method for building a private key. """
|
|
return rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
|
|
def _build_cert(pubkey, privkey, subject, issuer, hostname, is_ca, expires_in):
|
|
""" Common method for building and signing an x509 certificate. """
|
|
return x509.CertificateBuilder() \
|
|
.subject_name(subject) \
|
|
.issuer_name(issuer) \
|
|
.public_key(pubkey) \
|
|
.serial_number(x509.random_serial_number()) \
|
|
.not_valid_before(datetime.datetime.utcnow()) \
|
|
.not_valid_after(datetime.datetime.utcnow() + expires_in) \
|
|
.add_extension(x509.SubjectAlternativeName([x509.DNSName(hostname)]), critical=False) \
|
|
.add_extension(x509.BasicConstraints(ca=is_ca, path_length=None), critical=True) \
|
|
.sign(privkey, hashes.SHA256())
|
|
|
|
@classmethod
|
|
def new(cls, mailbox, blurb, hostname, is_ca=False, additional_names=[], expires_in=default_expiry):
|
|
""" Generate a new, self-signed identity. """
|
|
ob = cls.__new__(cls)
|
|
|
|
ob._private = Identity._build_key()
|
|
subject = Identity._build_name(mailbox, blurb, additional_names)
|
|
ob._cert = Identity._build_cert(ob._private.public_key(), ob._private, subject, subject, hostname, is_ca, expires_in)
|
|
|
|
return ob
|
|
|
|
@classmethod
|
|
def child_of(cls, parent, mailbox, blurb, additional_names=[], expires_in=default_expiry):
|
|
""" Generate a child certificate, signed by a parent certificate. """
|
|
if not parent.is_ca(): raise TypeError("Parent certificate can't be used to sign children")
|
|
if parent.is_peer(): raise TypeError("Parent certificate is missing a private key")
|
|
|
|
ob = cls.__new__(cls)
|
|
ob._private = Identity._build_key()
|
|
subject = Identity._build_name(mailbox, blurb, additional_names)
|
|
|
|
csr = x509.CertificateSigningRequestBuilder() \
|
|
.subject_name(subject) \
|
|
.sign(ob._private, hashes.SHA256())
|
|
|
|
ob._cert = Identity._build_cert(
|
|
csr.public_key(), parent._private,
|
|
subject, parent._cert.subject, parent.hostname(),
|
|
is_ca=False, expires_in=expires_in
|
|
)
|
|
|
|
return ob
|
|
|
|
def as_pem(self, encryption=serialization.NoEncryption()):
|
|
""" Serializes the Identity as PEM data. """
|
|
built = self._cert.public_bytes(serialization.Encoding.PEM)
|
|
if self._private is not None:
|
|
built += self._private.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=encryption
|
|
)
|
|
|
|
return built
|
|
|
|
# Ugly ugly ugly.
|
|
# Note that these are hardcoded to the first found result for their attribute.
|
|
# Misfin certs don't support multiple values for USER_ID and COMMON_NAME, and support for
|
|
# multiple hostnames is possible but not implemented.
|
|
def is_peer(self):
|
|
return self._private is None
|
|
def is_ca(self):
|
|
return self._cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value.ca
|
|
def hostname(self):
|
|
return self._cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(x509.DNSName)[0] # ew
|
|
def blurb(self):
|
|
return self._cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
|
|
def mailbox(self):
|
|
return self._cert.subject.get_attributes_for_oid(NameOID.USER_ID)[0].value
|
|
def parent_blurb(self):
|
|
return self._cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
|
|
def parent_mailbox(self):
|
|
return self._cert.issuer.get_attributes_for_oid(NameOID.USER_ID)[0].value
|
|
|
|
# Built addresses.
|
|
def address(self): return self.mailbox() + "@" + self.hostname()
|
|
def parent_address(self): return self.parent_mailbox() + "@" + self.hostname()
|
|
|
|
# For TOFU.
|
|
def fingerprint(self, hash_method=hashes.SHA256()):
|
|
raw = self._cert.fingerprint(hash_method)
|
|
return ":".join("%02x" % b for b in raw)
|
|
|
|
|
|
# ----------
|
|
# Requests and responses.
|
|
|
|
|
|
class Request:
|
|
""" The basic unit of data transfer for Misfin. Here's some data, here's where it's going. """
|
|
|
|
def __init__(self, mailbox, host, message):
|
|
self.mailbox = mailbox
|
|
self.host = host
|
|
self.message = message
|
|
|
|
@classmethod
|
|
def from_incoming(cls, req):
|
|
""" Create a Request object from the client's greeting. """
|
|
ob = cls.__new__(cls)
|
|
|
|
# Auto-convert from a bytes object, makes socket code a little cleaner
|
|
if isinstance(req, bytes): req = req.decode("utf-8")
|
|
|
|
# Maybe this isn't even a Misfin request...
|
|
if not req.startswith("misfin://"): raise TypeError("Not a Misfin request")
|
|
req = req.removeprefix("misfin://")
|
|
|
|
# Make sure we have the whole request, and save any body that might have made it through
|
|
if "\r\n" not in req: raise ValueError("Incomplete request")
|
|
header, _ = req.split("\r\n", 1)
|
|
|
|
try:
|
|
# Split up the relevant bits of the header
|
|
dest, ob.message= header.split(" ", 1)
|
|
ob.mailbox, ob.host = dest.split("@", 1)
|
|
return ob
|
|
except:
|
|
raise ValueError("Malformed request")
|
|
|
|
def build(self):
|
|
""" Builds the Misfin request. """
|
|
return "misfin://{}@{} {}\r\n".format(self.mailbox, self.host, self.message)
|
|
|
|
# A Misfin server response - either a go ahead, or some flavor of error.
|
|
class Response:
|
|
""" Tells the client what to do - either a go ahead, or some flavor of error. """
|
|
|
|
# Handy error messages for a server to send.
|
|
# Note that 20, 30, and 31 shouldn't use these messages, but they are included
|
|
# here for completeness
|
|
meta_tags = {
|
|
20: "message accepted",
|
|
|
|
30: "mailbox changed, look here",
|
|
31: "mailbox changed, look here (permanent)",
|
|
|
|
40: "temporary error",
|
|
41: "server is unavailable",
|
|
42: "cgi error",
|
|
43: "proxying error",
|
|
44: "slow down",
|
|
45: "mailbox full",
|
|
|
|
50: "permanent error",
|
|
51: "mailbox doesn't exist",
|
|
52: "mailbox has been removed",
|
|
53: "that domain isn't served here",
|
|
59: "bad request",
|
|
|
|
60: "certificate required",
|
|
61: "you can't send mail there",
|
|
62: "your certificate is invalid",
|
|
63: "you're lying about your certificate",
|
|
64: "prove it"
|
|
}
|
|
|
|
@classmethod
|
|
def of(cls, status, meta=None):
|
|
""" Build a Response object for a status code. """
|
|
ob = cls.__new__(cls)
|
|
ob.status = str(status)
|
|
if meta is None: ob.meta = Response.meta_tags[status]
|
|
else: ob.meta = meta
|
|
return ob
|
|
|
|
# Some shortcuts for responses that actually use the meta tag
|
|
def delivered(fingerprint):
|
|
return Response.of(20, fingerprint)
|
|
|
|
def redirect(to):
|
|
return Response.of(30, to)
|
|
|
|
def redirect_forever(to):
|
|
return Response.of(31, to)
|
|
|
|
@classmethod
|
|
def from_server(cls, resp):
|
|
""" Creates a Response object from the server's response. """
|
|
ob = cls.__new__(cls)
|
|
|
|
# Auto-convert from a bytes object, makes socket code a little cleaner
|
|
if isinstance(resp, bytes): resp = resp.decode("utf-8")
|
|
|
|
try:
|
|
ob.status, ob.meta = resp.split(" ", 1)
|
|
return ob
|
|
except:
|
|
raise ValueError("Malformed response")
|
|
|
|
def build(self):
|
|
return "{} {}\r\n".format(self.status, self.meta)
|
|
|
|
def __str__(self):
|
|
return "{} {}".format(self.status, self.meta)
|
|
|
|
def was_successful(self): return self.status[0] == "2"
|
|
def send_max(self): return int(self.meta)
|
|
|
|
def was_redirect(self): return self.status[0] == "3"
|
|
def was_temporary_error(self): return self.status[0] == "4"
|
|
def was_permanent_error(self): return self.status[0] == "5"
|
|
def was_certificate_error(self): return self.status[0] == "6"
|
|
|
|
|
|
# ----------
|
|
# Sending, receiving, and TLS.
|
|
|
|
|
|
# Default port to communicate over.
|
|
default_port = 1958
|
|
|
|
# Maximum amount of data to accept - by default, anyway.
|
|
max_request_size = 2048
|
|
|
|
def _validate_nothing(conn, cert, err, depth, rtrn):
|
|
""" Callback that lets us steal certificate verification from OpenSSL. """
|
|
""" This is !!!DANGEROUS!!! but necessary to allow us to accept self-signed certs. """
|
|
return True
|
|
|
|
def send_as(sender, request, port=default_port, check_valid_method=_validate_nothing):
|
|
""" Sends a Misfin message as a user. """
|
|
|
|
# For some reason, this block doesn't survive being moved to a separate function, so it's
|
|
# repeated below in an ugly way.
|
|
context = ossl.Context( ossl.TLS_CLIENT_METHOD )
|
|
context.set_verify( ossl.VERIFY_PEER | ossl.VERIFY_FAIL_IF_NO_PEER_CERT, callback=check_valid_method)
|
|
context.use_certificate( ocrypt.X509.from_cryptography(sender._cert) )
|
|
context.use_privatekey( ocrypt.PKey.from_cryptography_key(sender._private) )
|
|
sock = ossl.Connection(context, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
|
|
|
|
sock.connect((request.host, port))
|
|
sock.set_connect_state()
|
|
sock.do_handshake()
|
|
|
|
# Send our message and see if the destination accepts.
|
|
sock.write(bytes(request.build(), "utf-8"))
|
|
response = Response.from_server(sock.read(max_request_size))
|
|
return response
|
|
|
|
# Skadoodle
|
|
sock.shutdown()
|
|
sock.close()
|
|
return True
|
|
|
|
def write_message(message, blurb, address):
|
|
tz = pytz.timezone('America/New_York')
|
|
now = datetime.datetime.now(tz)
|
|
with open("messages.gmi","a") as file:
|
|
file.write("### message from {} ({}):\n@ {}\n{}\n\n".format(blurb, address, now.strftime("%Y-%m-%dT%H:%M:%S"), message))
|
|
|
|
def _allow_anything(server, peer, request):
|
|
""" Callback that accepts any message to the server's mailbox. """
|
|
""" SCARY! Only use for testing. """
|
|
print("Incoming from {} ({})".format(peer.blurb(), peer.address()))
|
|
print("Fingerprint is {}".format(peer.fingerprint()))
|
|
print("Message: {}".format(request.message))
|
|
|
|
if request.mailbox == server.mailbox():
|
|
write_message(request.message, peer.blurb(), peer.address())
|
|
return Response.delivered(server.fingerprint())
|
|
else:
|
|
print("...but we aren't {}, we're {}".format(request.mailbox, server.mailbox()))
|
|
return Response.of(51)
|
|
|
|
def receive_from(connection, server, peer, is_allowed_method):
|
|
""" Receives a Misfin message from a client. """
|
|
# Do we want to receive this message?
|
|
try:
|
|
request = Request.from_incoming(connection.read(max_request_size))
|
|
response = is_allowed_method(server, peer, request)
|
|
connection.write(bytes(response.build(), 'utf-8'))
|
|
|
|
except Exception as err:
|
|
# Something fucked up, be nice and tell the client.
|
|
connection.write(bytes(Response.of(40).build(), "utf-8"))
|
|
raise err
|
|
|
|
# Skadoodle
|
|
connection.shutdown()
|
|
connection.close()
|
|
|
|
return request
|
|
|
|
def receive_forever(server, is_allowed_method=_allow_anything, check_valid_method=_validate_nothing, port=default_port):
|
|
""" Receives Misfin messages, forever and ever. """
|
|
# See above.
|
|
context = ossl.Context( ossl.TLS_SERVER_METHOD )
|
|
context.set_verify( ossl.VERIFY_PEER | ossl.VERIFY_FAIL_IF_NO_PEER_CERT, callback=check_valid_method)
|
|
context.use_certificate( ocrypt.X509.from_cryptography(server._cert) )
|
|
context.use_privatekey( ocrypt.PKey.from_cryptography_key(server._private) )
|
|
sock = ossl.Connection(context, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
|
|
|
|
sock.bind((server.hostname(), port))
|
|
sock.listen(3)
|
|
|
|
while True:
|
|
print("")
|
|
try:
|
|
# Set up a connection...
|
|
connection, addr = sock.accept()
|
|
connection.set_accept_state()
|
|
connection.do_handshake()
|
|
|
|
# ...and do something about it
|
|
peer = Identity(connection.get_peer_certificate())
|
|
receive_from(connection, server, peer, is_allowed_method)
|
|
|
|
except ossl.Error as err:
|
|
print("Client disconnected before finishing.")
|
|
|
|
except Exception as err:
|
|
print(err)
|
|
print("Aborting receive due to exception.")
|
|
|
|
|
|
# ----------
|
|
# Stupid simple command-line interface.
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# I wasn't kidding.
|
|
def print_usage():
|
|
print("usage: python -m misfin_b [make-cert mailbox blurb hostname output.who]")
|
|
print("usage: [cert-from parent.who mailbox blurb output.who]")
|
|
print("usage: [send-as identity.who destination 'message']")
|
|
print("usage: [receive-as identity.who]")
|
|
sys.exit(-1)
|
|
|
|
try:
|
|
command = sys.argv[1]
|
|
|
|
if command == "make-cert":
|
|
mailbox, blurb, hostname, output = sys.argv[2:]
|
|
|
|
ident = Identity.new(mailbox, blurb, hostname, is_ca=True)
|
|
with open(output, "wb") as dest: dest.write(ident.as_pem())
|
|
|
|
print("Generated cert for {} ({}) - saved to {}".format(ident.blurb(), ident.address(), output))
|
|
|
|
elif command == "cert-from":
|
|
parent, mailbox, blurb, output = sys.argv[2:]
|
|
|
|
loaded_pem = open(parent, "rb").read()
|
|
parent_ident = Identity(loaded_pem, loaded_pem)
|
|
ident = Identity.child_of(parent_ident, mailbox, blurb)
|
|
with open(output, "wb") as dest: dest.write(ident.as_pem())
|
|
|
|
print("Generated cert for {} ({}), child of {} ({}) - saved to {}".format(ident.blurb(), ident.address(), ident.parent_blurb(), ident.parent_address(), output))
|
|
|
|
elif command == "send-as":
|
|
sender, destination, message = sys.argv[2:]
|
|
mailbox, host = destination.split("@", 1)
|
|
|
|
loaded_pem = open(sender, "rb").read()
|
|
ident = Identity(loaded_pem, loaded_pem)
|
|
|
|
msg = Request(mailbox, host, message)
|
|
if len(msg.build()) > max_request_size:
|
|
raise ValueError("Message is too long - needs to fit in {} bytes but totals {}".format(max_request_size, len(msg.build())))
|
|
|
|
print(send_as(ident, msg))
|
|
|
|
elif command == "receive-as":
|
|
loaded_pem = open(sys.argv[2], "rb").read()
|
|
ident = Identity(loaded_pem, loaded_pem)
|
|
|
|
print("Receiving for {} ({})".format(ident.blurb(), ident.address()))
|
|
receive_forever(ident)
|
|
|
|
except Exception as err:
|
|
# Hehe
|
|
# raise err
|
|
print(err)
|
|
print_usage()
|