motm/auth.py

269 lines
9.5 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import datetime
import base64
from typing import Union, Optional, Callable
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
class CA:
hostname: str
cert: x509.Certificate
key: rsa.RSAPrivateKey
crl_path: str
def __init__(
self,
hostname: str,
cert_path: str,
key_path: str,
crl_path: str,
password: Optional[str] = None,
) -> None:
self.hostname = hostname
with open(cert_path, "rb") as cert_file:
self.cert = x509.load_pem_x509_certificate(cert_file.read())
with open(key_path, "rb") as key_file:
if password is not None:
self.key = serialization.load_pem_private_key(
key_file.read(), password=bytes(password, "utf-8")
)
else:
self.key = serialization.load_pem_private_key(
key_file.read(), password=None
)
self.crl_path = crl_path
def verify_cert(self, cert_str: str) -> bool:
# certificate expiration is checked by the server, so we only need to
# check to see if the cert has been revoked
crl: x509.CertificateRevocationList
cert: x509.Certificate = x509.load_pem_x509_certificate(bytes(cert_str, "utf-8"))
with open(self.crl_path, "rb") as crl_file:
crl = x509.load_pem_x509_crl(crl_file.read())
for revoked_cert in crl:
if revoked_cert.serial_number == cert.serial_number:
return False
return True
def revoke_cert(self, cert_str: str) -> None:
crl: x509.CertificateRevocationList
cert = x509.load_pem_x509_certificate(bytes(cert_str, "utf-8"))
revoked_cert_builder: x509.RevokedCertificateBuilder = x509.RevokedCertificateBuilder()
revoked_cert_builder = revoked_cert_builder.revocation_date(datetime.datetime.today())
revoked_cert_builder = revoked_cert_builder.serial_number(cert.serial_number)
revoked_cert: x509.RevokedCertificate = revoked_cert_builder.build()
one_day: datetime.timedelta = datetime.timedelta(days=1)
with open(self.crl_path, "rb") as crl_file:
crl = x509.load_pem_x509_crl(crl_file.read())
new_crl_builder: x509.CertificateRevocationListBuilder = (
x509.CertificateRevocationListBuilder()
)
new_crl_builder = new_crl_builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)])
)
new_crl_builder = new_crl_builder.last_update(datetime.datetime.today())
new_crl_builder = new_crl_builder.next_update(
datetime.datetime.today() + (one_day * 99999)
)
# add the existing revoked certs to the new crl
for existing_cert in crl:
new_crl_builder = new_crl_builder.add_revoked_certificate(existing_cert)
# add the new revoked cert to the crl
new_crl_builder = new_crl_builder.add_revoked_certificate(revoked_cert)
new_crl = new_crl_builder.sign(private_key=self.key, algorithm=hashes.SHA256())
with open(self.crl_path, "wb") as crl_file:
crl_file.write(new_crl.public_bytes(serialization.Encoding.PEM))
def unrevoke_cert (self, cert_str: str) -> None:
crl: x509.CertificateRevocationList
cert: x509.Certificate = x509.load_pem_x509_certificate(bytes(cert_str, "utf-8"))
one_day: datetime.timedelta = datetime.timedelta(days=1)
with open(self.crl_path, "rb") as crl_file:
crl = x509.load_pem_x509_crl(crl_file.read())
new_crl_builder: x509.CertificateRevocationListBuilder = (
x509.CertificateRevocationListBuilder()
)
new_crl_builder = new_crl_builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)])
)
new_crl_builder = new_crl_builder.last_update(datetime.datetime.today())
new_crl_builder = new_crl_builder.next_update(
datetime.datetime.today() + (one_day * 99999)
)
# create a new crl without the cert that has been un-revoked
for existing_cert in crl:
if existing_cert.serial_number != cert.serial_number:
new_crl_builder = new_crl_builder.add_revoked_certificate(existing_cert)
new_crl = new_crl_builder.sign(private_key=self.key, algorithm=hashes.SHA256())
with open(self.crl_path, "wb") as crl_file:
crl_file.write(new_crl.public_bytes(serialization.Encoding.PEM))
@classmethod
def new_ca(
cls, cert_dir: str, hostname: str, password: Optional[str] = None
) -> CA:
cert: x509.Certificate
key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
)
crl: x509.CertificateRevocationList
cert_path: str = cert_dir + "/ca.cert"
key_path: str = cert_dir + "/ca.key"
crl_path: str = cert_dir + "/ca.crl"
pubkey: rsa.RSAPublicKey = key.public_key()
one_day: datetime.timedelta = datetime.timedelta(days=1)
cert_builder: x509.CertificateBuilder = x509.CertificateBuilder()
cert_builder = cert_builder.subject_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
)
cert_builder = cert_builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
)
cert_builder = cert_builder.not_valid_before(
datetime.datetime.today() - one_day
)
cert_builder = cert_builder.not_valid_after(
datetime.datetime.today() + (one_day * 99999)
)
cert_builder = cert_builder.serial_number(x509.random_serial_number())
cert_builder = cert_builder.public_key(pubkey)
cert_builder = cert_builder.add_extension(
x509.BasicConstraints(ca=True, path_length=1), critical=True
)
cert = cert_builder.sign(private_key=key, algorithm=hashes.SHA256())
crl_builder: x509.CertificateRevocationListBuilder = (
x509.CertificateRevocationListBuilder()
)
crl_builder = crl_builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
)
crl_builder = crl_builder.last_update(datetime.datetime.today())
crl_builder = crl_builder.next_update(
datetime.datetime.today() + (one_day * 99999)
)
crl = crl_builder.sign(private_key=key, algorithm=hashes.SHA256())
with open(key_path, "wb") as key_file:
if password is not None:
key_file.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.BestAvailableEncryption(
bytes(password, "utf-8")
),
)
)
else:
key_file.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
),
)
with open(cert_path, "wb") as cert_file:
cert_file.write(cert.public_bytes(serialization.Encoding.PEM))
with open(crl_path, "wb") as crl_file:
crl_file.write(crl.public_bytes(serialization.Encoding.PEM))
return cls(
hostname=hostname,
cert_path=cert_path,
key_path=key_path,
crl_path=crl_path,
password=password,
)
def generate_user_cert(self, username: str) -> tuple:
cert: x509.Certificate
key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
)
key_str: str
cert_str: str
cert_hash: str
pubkey: rsa.RSAPublicKey = key.public_key()
one_day: datetime.timedelta = datetime.timedelta(days=1)
builder: x509.CertificateBuilder = x509.CertificateBuilder()
builder = builder.subject_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, username)])
)
builder = builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)])
)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
builder = builder.not_valid_after(datetime.datetime.today() + (one_day * 99999))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(pubkey)
cert = builder.sign(private_key=self.key, algorithm=hashes.SHA256())
if not isinstance(cert, x509.Certificate):
return ()
key_str = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode()
cert_str = cert.public_bytes(serialization.Encoding.PEM).decode()
cert_hash = cert.fingerprint(hashes.SHA256()).hex().upper()
cert_hash = "SHA256:" + cert_hash
return (key_str, cert_str, cert_hash)