mirror of https://xfnw.ttm.sh/git/oirc.git
101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
# marx scanning lib
|
|
# https://github.com/jesopo/irctoolkit
|
|
# /freenode/vpncn/vpncn/scanner.py
|
|
#
|
|
# thanks jess :3
|
|
|
|
import asyncio, ssl, traceback
|
|
from typing import Dict, List, Optional, Pattern, Tuple
|
|
from dataclasses import dataclass
|
|
|
|
from async_timeout import timeout as timeout_
|
|
from OpenSSL import crypto
|
|
|
|
# from .config import CertPattern
|
|
|
|
|
|
@dataclass
|
|
class CertPattern(object):
|
|
name: str
|
|
find: List[Pattern]
|
|
|
|
|
|
CERT_KEYS = [("CN", "cn"), ("O", "on")]
|
|
|
|
TLS = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
|
|
|
|
|
def _bytes_dict(d: List[Tuple[bytes, bytes]]) -> Dict[str, str]:
|
|
return {k.decode("utf8"): v.decode("utf8") for k, v in d}
|
|
|
|
|
|
class CertScanner(object):
|
|
def __init__(self, timeout: int = 5):
|
|
self._timeout = timeout
|
|
|
|
async def _values(self, ip: str, port: int) -> List[Tuple[str, str]]:
|
|
reader, writer = await asyncio.open_connection(ip, port, ssl=TLS)
|
|
cert = writer.transport._ssl_protocol._sslpipe.ssl_object.getpeercert(True)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
|
|
|
|
subject = _bytes_dict(x509.get_subject().get_components())
|
|
issuer = _bytes_dict(x509.get_issuer().get_components())
|
|
|
|
values: List[Tuple[str, str]] = []
|
|
for cert_key, match_key in CERT_KEYS:
|
|
if cert_key in subject:
|
|
values.append((f"s{match_key}", subject[cert_key]))
|
|
if cert_key in issuer:
|
|
values.append((f"i{match_key}", issuer[cert_key]))
|
|
|
|
for i in range(x509.get_extension_count()):
|
|
ext = x509.get_extension(i)
|
|
if ext.get_short_name() == b"subjectAltName":
|
|
sans = ext.get_data()[4:].split(b"\x82\x18")
|
|
for san in sans:
|
|
values.append(("san", san.decode("latin-1")))
|
|
|
|
return values
|
|
|
|
async def _match(
|
|
self, ip: str, port: int, certs: List[CertPattern]
|
|
) -> Optional[str]:
|
|
|
|
try:
|
|
async with timeout_(self._timeout):
|
|
values_t = await self._values(ip, port)
|
|
except (asyncio.TimeoutError, ConnectionError):
|
|
pass
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
else:
|
|
values = [f"{k}:{v}" for k, v in values_t]
|
|
for cert in certs:
|
|
for pattern in cert.find:
|
|
for value in values:
|
|
if pattern.fullmatch(value):
|
|
return f"{value} (:{port} {cert.name})"
|
|
return None
|
|
|
|
async def scan(self, ip: str, bad: Dict[int, List[CertPattern]]) -> Optional[str]:
|
|
coros = [self._match(ip, p, c) for p, c in bad.items()]
|
|
tasks = set(asyncio.ensure_future(c) for c in coros)
|
|
while tasks:
|
|
finished, unfinished = await asyncio.wait(
|
|
tasks, return_when=asyncio.FIRST_COMPLETED
|
|
)
|
|
for fin in finished:
|
|
result = fin.result()
|
|
if result is not None:
|
|
for task in unfinished:
|
|
task.cancel()
|
|
if unfinished:
|
|
await asyncio.wait(unfinished)
|
|
|
|
return result
|
|
tasks = set(asyncio.ensure_future(f) for f in unfinished)
|
|
return None
|