From 402b6b1bd35ba65d570258b200c8696b184c0082 Mon Sep 17 00:00:00 2001 From: xfnw Date: Wed, 2 Dec 2020 14:51:54 -0500 Subject: [PATCH] include jess's marx scanners.py --- lib/marx.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 lib/marx.py diff --git a/lib/marx.py b/lib/marx.py new file mode 100644 index 0000000..00e419b --- /dev/null +++ b/lib/marx.py @@ -0,0 +1,113 @@ +# marx scanning lib +# https://github.com/jesopo/irctoolkit +# /freenode/vpncn/vpncn/scanner.py +# +# thanks jess :3 + +import asyncio, ssl, traceback +from typing import Dict, List, Optional, Pattern, Tuple +from dataclasses import dataclass + +from async_timeout import timeout as timeout_ +from OpenSSL import crypto + +#from .config import CertPattern + + +@dataclass +class CertPattern(object): + name: str + find: List[Pattern] + + + +CERT_KEYS = [ + ("CN", "cn"), + ("O", "on") +] + +TLS = ssl.SSLContext(ssl.PROTOCOL_TLS) + +def _bytes_dict(d: List[Tuple[bytes, bytes]]) -> Dict[str, str]: + return {k.decode("utf8"): v.decode("utf8") for k, v in d} + +class CertScanner(object): + def __init__(self, timeout: int = 5): + self._timeout = timeout + + async def _values(self, + ip: str, + port: int + ) -> List[Tuple[str, str]]: + reader, writer = await asyncio.open_connection(ip, port, ssl=TLS) + cert = writer.transport._ssl_protocol._sslpipe.ssl_object.getpeercert(True) + writer.close() + await writer.wait_closed() + + x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert) + + subject = _bytes_dict(x509.get_subject().get_components()) + issuer = _bytes_dict(x509.get_issuer().get_components()) + + values: List[Tuple[str, str]] = [] + for cert_key, match_key in CERT_KEYS: + if cert_key in subject: + values.append((f"s{match_key}", subject[cert_key])) + if cert_key in issuer: + values.append((f"i{match_key}", issuer[cert_key])) + + for i in range(x509.get_extension_count()): + ext = x509.get_extension(i) + if ext.get_short_name() == b"subjectAltName": + sans = ext.get_data()[4:].split(b"\x82\x18") + for san in sans: + values.append(("san", san.decode("latin-1"))) + + return values + + + async def _match(self, + ip: str, + port: int, + certs: List[CertPattern] + ) -> Optional[str]: + + try: + async with timeout_(self._timeout): + values_t = await self._values(ip, port) + except (asyncio.TimeoutError, + ConnectionError): + pass + except Exception as e: + traceback.print_exc() + else: + values = [f"{k}:{v}" for k, v in values_t] + for cert in certs: + for pattern in cert.find: + for value in values: + if pattern.fullmatch(value): + return f"{value} (:{port} {cert.name})" + return None + + async def scan(self, + ip: str, + bad: Dict[int, List[CertPattern]] + ) -> Optional[str]: + coros = [self._match(ip, p, c) for p, c in bad.items()] + tasks = set(asyncio.ensure_future(c) for c in coros) + while tasks: + finished, unfinished = await asyncio.wait( + tasks, return_when=asyncio.FIRST_COMPLETED + ) + for fin in finished: + result = fin.result() + if result is not None: + for task in unfinished: + task.cancel() + if unfinished: + await asyncio.wait(unfinished) + + return result + tasks = set(asyncio.ensure_future(f) for f in unfinished) + return None +