#!/usr/bin/env python3 from __future__ import annotations import datetime import base64 from typing import Union, Optional, Callable from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography import x509 from cryptography.x509.oid import NameOID class CA: hostname: str cert: x509.Certificate key: rsa.RSAPrivateKey crl_path: str def __init__( self, hostname: str, cert_path: str, key_path: str, crl_path: str, password: Optional[str] = None, ) -> None: self.hostname = hostname with open(cert_path, "rb") as cert_file: self.cert = x509.load_pem_x509_certificate(cert_file.read()) with open(key_path, "rb") as key_file: if password is not None: self.key = serialization.load_pem_private_key( key_file.read(), password=bytes(password, "utf-8") ) else: self.key = serialization.load_pem_private_key( key_file.read(), password=None ) self.crl_path = crl_path def verify_cert(self, cert_str: str) -> bool: # certificate expiration is checked by the server, so we only need to # check to see if the cert has been revoked crl: x509.CertificateRevocationList cert: x509.Certificate = x509.load_pem_x509_certificate(bytes(cert_str, "utf-8")) with open(self.crl_path, "rb") as crl_file: crl = x509.load_pem_x509_crl(crl_file.read()) for revoked_cert in crl: if revoked_cert.serial_number == cert.serial_number: return False return True def revoke_cert(self, cert_str: str) -> None: crl: x509.CertificateRevocationList cert = x509.load_pem_x509_certificate(bytes(cert_str, "utf-8")) revoked_cert_builder: x509.RevokedCertificateBuilder = x509.RevokedCertificateBuilder() revoked_cert_builder = revoked_cert_builder.revocation_date(datetime.datetime.today()) revoked_cert_builder = revoked_cert_builder.serial_number(cert.serial_number) revoked_cert: x509.RevokedCertificate = revoked_cert_builder.build() one_day: datetime.timedelta = datetime.timedelta(days=1) with open(self.crl_path, "rb") as crl_file: crl = x509.load_pem_x509_crl(crl_file.read()) new_crl_builder: x509.CertificateRevocationListBuilder = ( x509.CertificateRevocationListBuilder() ) new_crl_builder = new_crl_builder.issuer_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)]) ) new_crl_builder = new_crl_builder.last_update(datetime.datetime.today()) new_crl_builder = new_crl_builder.next_update( datetime.datetime.today() + (one_day * 99999) ) # add the existing revoked certs to the new crl for existing_cert in crl: new_crl_builder = new_crl_builder.add_revoked_certificate(existing_cert) # add the new revoked cert to the crl new_crl_builder = new_crl_builder.add_revoked_certificate(revoked_cert) new_crl = new_crl_builder.sign(private_key=self.key, algorithm=hashes.SHA256()) with open(self.crl_path, "wb") as crl_file: crl_file.write(new_crl.public_bytes(serialization.Encoding.PEM)) def unrevoke_cert (self, cert_str: str) -> None: crl: x509.CertificateRevocationList cert: x509.Certificate = x509.load_pem_x509_certificate(bytes(cert_str, "utf-8")) one_day: datetime.timedelta = datetime.timedelta(days=1) with open(self.crl_path, "rb") as crl_file: crl = x509.load_pem_x509_crl(crl_file.read()) new_crl_builder: x509.CertificateRevocationListBuilder = ( x509.CertificateRevocationListBuilder() ) new_crl_builder = new_crl_builder.issuer_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)]) ) new_crl_builder = new_crl_builder.last_update(datetime.datetime.today()) new_crl_builder = new_crl_builder.next_update( datetime.datetime.today() + (one_day * 99999) ) # create a new crl without the cert that has been un-revoked for existing_cert in crl: if existing_cert.serial_number != cert.serial_number: new_crl_builder = new_crl_builder.add_revoked_certificate(existing_cert) new_crl = new_crl_builder.sign(private_key=self.key, algorithm=hashes.SHA256()) with open(self.crl_path, "wb") as crl_file: crl_file.write(new_crl.public_bytes(serialization.Encoding.PEM)) @classmethod def new_ca( cls, cert_dir: str, hostname: str, password: Optional[str] = None ) -> CA: cert: x509.Certificate key = rsa.generate_private_key( public_exponent=65537, key_size=4096, ) crl: x509.CertificateRevocationList cert_path: str = cert_dir + "/ca.cert" key_path: str = cert_dir + "/ca.key" crl_path: str = cert_dir + "/ca.crl" pubkey: rsa.RSAPublicKey = key.public_key() one_day: datetime.timedelta = datetime.timedelta(days=1) cert_builder: x509.CertificateBuilder = x509.CertificateBuilder() cert_builder = cert_builder.subject_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)]) ) cert_builder = cert_builder.issuer_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)]) ) cert_builder = cert_builder.not_valid_before( datetime.datetime.today() - one_day ) cert_builder = cert_builder.not_valid_after( datetime.datetime.today() + (one_day * 99999) ) cert_builder = cert_builder.serial_number(x509.random_serial_number()) cert_builder = cert_builder.public_key(pubkey) cert_builder = cert_builder.add_extension( x509.BasicConstraints(ca=True, path_length=1), critical=True ) cert = cert_builder.sign(private_key=key, algorithm=hashes.SHA256()) crl_builder: x509.CertificateRevocationListBuilder = ( x509.CertificateRevocationListBuilder() ) crl_builder = crl_builder.issuer_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)]) ) crl_builder = crl_builder.last_update(datetime.datetime.today()) crl_builder = crl_builder.next_update( datetime.datetime.today() + (one_day * 99999) ) crl = crl_builder.sign(private_key=key, algorithm=hashes.SHA256()) with open(key_path, "wb") as key_file: if password is not None: key_file.write( key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.BestAvailableEncryption( bytes(password, "utf-8") ), ) ) else: key_file.write( key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ), ) with open(cert_path, "wb") as cert_file: cert_file.write(cert.public_bytes(serialization.Encoding.PEM)) with open(crl_path, "wb") as crl_file: crl_file.write(crl.public_bytes(serialization.Encoding.PEM)) return cls( hostname=hostname, cert_path=cert_path, key_path=key_path, crl_path=crl_path, password=password, ) def generate_user_cert(self, username: str) -> tuple: cert: x509.Certificate key = rsa.generate_private_key( public_exponent=65537, key_size=4096, ) key_str: str cert_str: str cert_hash: str pubkey: rsa.RSAPublicKey = key.public_key() one_day: datetime.timedelta = datetime.timedelta(days=1) builder: x509.CertificateBuilder = x509.CertificateBuilder() builder = builder.subject_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, username)]) ) builder = builder.issuer_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)]) ) builder = builder.not_valid_before(datetime.datetime.today() - one_day) builder = builder.not_valid_after(datetime.datetime.today() + (one_day * 99999)) builder = builder.serial_number(x509.random_serial_number()) builder = builder.public_key(pubkey) cert = builder.sign(private_key=self.key, algorithm=hashes.SHA256()) if not isinstance(cert, x509.Certificate): return () key_str = key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ).decode() cert_str = cert.public_bytes(serialization.Encoding.PEM).decode() cert_hash = cert.fingerprint(hashes.SHA256()).hex().upper() cert_hash = "SHA256:" + cert_hash return (key_str, cert_str, cert_hash)