migrating certificate management to netcache

This commit is contained in:
Lionel Dricot 2023-07-28 18:08:45 +02:00
parent eed83a7a95
commit dade6a3bda
2 changed files with 209 additions and 202 deletions

View File

@ -8,6 +8,7 @@ import codecs
import getpass
import socket
import ssl
import glob
from ssl import CertificateError
try:
import chardet
@ -22,6 +23,7 @@ cache_home = os.environ.get('XDG_CACHE_HOME') or\
#Debug:
_CACHE_PATH = "/home/ploum/dev/netcache/"
_DATA_DIR = "/home/ploum/dev/netcache/"
_CONFIG_DIR = "/home/ploum/dev/netcache/"
if not os.path.exists(_CACHE_PATH):
print("Creating cache directory {}".format(_CACHE_PATH))
os.makedirs(_CACHE_PATH)
@ -411,6 +413,150 @@ def _fetch_spartan(url,**kwargs):
cache = _fetch_spartan(redirect_url)
return cache
def _activate_client_cert(certfile, keyfile):
#TODO
#self.client_certs["active"] = (certfile, keyfile)
#self.active_cert_domains = []
#self.prompt = self.cert_prompt + "+" + os.path.basename(certfile).replace('.crt','') + "> " + "\001\x1b[0m\002"
pass
def _deactivate_client_cert():
#TODO
# if self.active_is_transient:
# for filename in self.client_certs["active"]:
# os.remove(filename)
# for domain in self.active_cert_domains:
# self.client_certs.pop(domain)
# self.client_certs["active"] = None
# self.active_cert_domains = []
# self.prompt = self.no_cert_prompt
# self.active_is_transient = False
pass
def _choose_client_cert():
"""
Interactively select a previously generated client certificate and
activate it.
"""
certdir = os.path.join(_CONFIG_DIR, "client_certs")
certs = glob.glob(os.path.join(certdir, "*.crt"))
if len(certs) == 0:
print("There are no previously generated certificates.")
return
certdir = {}
for n, cert in enumerate(certs):
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
choice = input("> ").strip()
if choice in certdir:
certfile, keyfile = certdir[choice]
_activate_client_cert(certfile, keyfile)
else:
print("What?")
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
format.
"""
print("Loading client certificate file, in PEM format (blank line to cancel)")
certfile = input("Certfile path: ").strip()
if not certfile:
print("Aborting.")
return
certfile = os.path.expanduser(certfile)
if not os.path.isfile(certfile):
print("Certificate file {} does not exist.".format(certfile))
return
print("Loading private key file, in PEM format (blank line to cancel)")
keyfile = input("Keyfile path: ").strip()
if not keyfile:
print("Aborting.")
return
keyfile = os.path.expanduser(keyfile)
if not os.path.isfile(keyfile):
print("Private key file {} does not exist.".format(keyfile))
return
_activate_client_cert(certfile, keyfile)
def _generate_client_cert(certdir, basename, transient=False):
"""
Use `openssl` binary to generate a client certificate (which may be
transient or persistent) and save the certificate and private key to the
specified directory with the specified basename.
"""
if not os.path.exists(certdir):
os.makedirs(certdir)
certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, basename+".key")
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
if transient:
cmd += " -subj '/CN={}'".format(basename)
os.system(cmd)
_activate_client_cert(certfile, keyfile)
def _generate_transient_cert_cert():
"""
Use `openssl` command to generate a new transient client certificate
with 24 hours of validity.
"""
certdir = os.path.join(_CONFIG_DIR, "transient_certs")
name = str(uuid.uuid4())
_generate_client_cert(certdir, name, transient=True)
#TODO
#self.active_is_transient = True
#self.transient_certs_created.append(name)
def _generate_persistent_client_cert():
"""
Interactively use `openssl` command to generate a new persistent client
certificate with one year of validity.
"""
certdir = os.path.join(_CONFIG_DIR, "client_certs")
print("What do you want to name this new certificate?")
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
name = input("> ")
if not name.strip():
print("Aborting.")
return
_generate_client_cert(certdir, name)
def _handle_cert_request(meta):
print("SERVER SAYS: ", meta)
# Present different messages for different 6x statuses, but
# handle them the same.
if status in ("64", "65"):
print("The server rejected your certificate because it is either expired or not yet valid.")
elif status == "63":
print("The server did not accept your certificate.")
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
else:
print("The site {} is requesting a client certificate.".format(gi.host))
print("This will allow the site to recognise you across requests.")
# Give the user choices
print("What do you want to do?")
print("1. Give up.")
print("2. Generate a new transient certificate.")
print("3. Generate a new persistent certificate.")
print("4. Load a previously generated persistent.")
print("5. Load certificate from an external file.")
if self.sync_only:
choice = 1
else:
choice = input("> ").strip()
if choice == "2":
self._generate_transient_cert_cert()
elif choice == "3":
self._generate_persistent_client_cert()
elif choice == "4":
self._choose_client_cert()
elif choice == "5":
self._load_client_cert()
else:
print("Giving up.")
raise UserAbortException()
def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
@ -597,9 +743,9 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,**kwargs):
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
# elif status.startswith("6"):
# self._handle_cert_request(meta)
# return self._fetch_over_network(gi)
elif status.startswith("6"):
_handle_cert_request(meta)
_fetch_gemini(url)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
@ -638,7 +784,9 @@ def fetch(url,**kwargs):
url = normalize_url(url)
path=None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
if "://" in url:
if kwargs["offline"] and is_cache_valid(url):
path = get_cache_path(url)
elif "://" in url and not kwargs["offline"]:
try:
scheme = url.split("://")[0]
if scheme not in standard_ports:
@ -650,7 +798,7 @@ def fetch(url,**kwargs):
elif scheme == "finger":
path=_fetch_finger(url,**kwargs)
elif scheme == "gemini":
patch=_fetch_gemini(url,**kwargs)
path=_fetch_gemini(url,**kwargs)
else:
print("scheme %s not implemented yet")
except UserAbortException:
@ -692,7 +840,7 @@ def fetch(url,**kwargs):
print(traceback.format_exc())
return
else:
print("Not a supproted URL")
print("Not cached URL or not supported format (TODO)")
return path

View File

@ -606,115 +606,77 @@ class GeminiClient(cmd.Cmd):
# and not self.sync_only:
# print("Sorry, no support for {} links.".format(gi.scheme))
# return
url = gi.url
if not mode:
mode = gi.last_mode
# Obey permanent redirects
if gi.url in self.permanent_redirects:
new_gi = GeminiItem(self.permanent_redirects[gi.url], name=gi.name)
if url in self.permanent_redirects:
new_gi = GeminiItem(self.permanent_redirects[url], name=gi.name)
self._go_to_gi(new_gi,mode=mode)
return
# Use cache or mark as to_fetch if resource is not cached
# Why is this code useful ? It set the mimetype !
if self.offline_only:
if not netcache.is_cache_valid(gi.url):
self.get_list("to_fetch")
r = self.list_add_line("to_fetch",gi=gi,verbose=False)
if r:
print("%s not available, marked for syncing"%gi.url)
else:
print("%s already marked for syncing"%gi.url)
return
# check if local file exists.
if gi.local and not os.path.exists(gi.path):
print("Local file %s does not exist!" %gi.path)
return
elif not self.offline_only and not gi.local:
elif not gi.local:
params = {}
params["timeout"] = self.options["short_timeout"]
params["max_size"] = int(self.options["max_size_download"])*1000000
params["print_error"] = not self.sync_only
cachepath = netcache.fetch(gi.url,**params)
params["offline"] = self.offline_only
cachepath = netcache.fetch(url,**params)
# Use cache or mark as to_fetch if resource is not cached
if not cachepath:
self.get_list("to_fetch")
r = self.list_add_line("to_fetch",gi=gi,verbose=False)
if r:
print("%s not available, marked for syncing"%url)
else:
print("%s already marked for syncing"%url)
return
#Ok, we have a cached version
else:
# Pass file to handler, unless we were asked not to
display = handle and not self.sync_only
#TODO: take into account _RENDER_IMAGE
if display and self.options["download_images_first"] \
and not self.offline_only:
# Pass file to handler, unless we were asked not to
if netcache.is_cache_valid(gi.url) :
display = handle and not self.sync_only
#TODO: take into account _RENDER_IMAGE
if display and self.options["download_images_first"] \
and not self.offline_only:
# We download images first
#TODO: this should go into netcache
for image in gi.get_images(mode=mode):
if image and image.startswith("http"):
if not netcache.is_cache_valid(image):
width = term_width() - 1
toprint = "Downloading %s" %image
toprint = toprint[:width]
toprint += " "*(width-len(toprint))
print(toprint,end="\r")
img_gi = GeminiItem(image)
self._go_to_gi(img_gi, update_hist=False, check_cache=True, \
handle=False,limit_size=True)
if display and gi.display(mode=mode):
self.index = gi.get_links()
self.page_index = 0
self.index_index = -1
# Update state (external files are not added to history)
self.gi = gi
if update_hist and not self.sync_only:
self._update_history(gi)
elif display :
cmd_str = self._get_handler_cmd(ansirenderer.get_mime(gi.url))
try:
# get body (tmpfile) from gi !
run(cmd_str, parameter=gi.get_body(as_file=True), direct_output=True)
except FileNotFoundError:
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
print("You can use the ! command to specify another handler program or pipeline.")
# We download images first
#TODO: this should go into netcache
for image in gi.get_images(mode=mode):
if image and image.startswith("http"):
if not netcache.is_cache_valid(image):
width = term_width() - 1
toprint = "Downloading %s" %image
toprint = toprint[:width]
toprint += " "*(width-len(toprint))
print(toprint,end="\r")
img_gi = GeminiItem(image)
self._go_to_gi(img_gi, update_hist=False, check_cache=True, \
handle=False,limit_size=True)
if display and gi.display(mode=mode):
self.index = gi.get_links()
self.page_index = 0
self.index_index = -1
# Update state (external files are not added to history)
self.gi = gi
if update_hist and not self.sync_only:
self._update_history(gi)
elif display :
cmd_str = self._get_handler_cmd(ansirenderer.get_mime(gi.url))
try:
# get body (tmpfile) from gi !
run(cmd_str, parameter=gi.get_body(as_file=True), direct_output=True)
except FileNotFoundError:
print("Handler program %s not found!" % shlex.split(cmd_str)[0])
print("You can use the ! command to specify another handler program or pipeline.")
def _handle_cert_request(self, meta):
print("SERVER SAYS: ", meta)
# Present different messages for different 6x statuses, but
# handle them the same.
if status in ("64", "65"):
print("The server rejected your certificate because it is either expired or not yet valid.")
elif status == "63":
print("The server did not accept your certificate.")
print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.")
else:
print("The site {} is requesting a client certificate.".format(gi.host))
print("This will allow the site to recognise you across requests.")
# Give the user choices
print("What do you want to do?")
print("1. Give up.")
print("2. Generate a new transient certificate.")
print("3. Generate a new persistent certificate.")
print("4. Load a previously generated persistent.")
print("5. Load certificate from an external file.")
if self.sync_only:
choice = 1
else:
choice = input("> ").strip()
if choice == "2":
self._generate_transient_cert_cert()
elif choice == "3":
self._generate_persistent_client_cert()
elif choice == "4":
self._choose_client_cert()
elif choice == "5":
self._load_client_cert()
else:
print("Giving up.")
raise UserAbortException()
def _validate_cert(self, address, host, cert):
"""
Validate a TLS certificate in TOFU mode.
@ -917,109 +879,6 @@ class GeminiClient(cmd.Cmd):
debug_text = "\x1b[0;32m[DEBUG] " + debug_text + "\x1b[0m"
print(debug_text)
def _load_client_cert(self):
"""
Interactively load a TLS client certificate from the filesystem in PEM
format.
"""
print("Loading client certificate file, in PEM format (blank line to cancel)")
certfile = input("Certfile path: ").strip()
if not certfile:
print("Aborting.")
return
certfile = os.path.expanduser(certfile)
if not os.path.isfile(certfile):
print("Certificate file {} does not exist.".format(certfile))
return
print("Loading private key file, in PEM format (blank line to cancel)")
keyfile = input("Keyfile path: ").strip()
if not keyfile:
print("Aborting.")
return
keyfile = os.path.expanduser(keyfile)
if not os.path.isfile(keyfile):
print("Private key file {} does not exist.".format(keyfile))
return
self._activate_client_cert(certfile, keyfile)
def _generate_transient_cert_cert(self):
"""
Use `openssl` command to generate a new transient client certificate
with 24 hours of validity.
"""
certdir = os.path.join(_CONFIG_DIR, "transient_certs")
name = str(uuid.uuid4())
self._generate_client_cert(certdir, name, transient=True)
self.active_is_transient = True
self.transient_certs_created.append(name)
def _generate_persistent_client_cert(self):
"""
Interactively use `openssl` command to generate a new persistent client
certificate with one year of validity.
"""
certdir = os.path.join(_CONFIG_DIR, "client_certs")
print("What do you want to name this new certificate?")
print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir))
name = input("> ")
if not name.strip():
print("Aborting.")
return
self._generate_client_cert(certdir, name)
def _generate_client_cert(self, certdir, basename, transient=False):
"""
Use `openssl` binary to generate a client certificate (which may be
transient or persistent) and save the certificate and private key to the
specified directory with the specified basename.
"""
if not os.path.exists(certdir):
os.makedirs(certdir)
certfile = os.path.join(certdir, basename+".crt")
keyfile = os.path.join(certdir, basename+".key")
cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile)
if transient:
cmd += " -subj '/CN={}'".format(basename)
os.system(cmd)
self._activate_client_cert(certfile, keyfile)
def _choose_client_cert(self):
"""
Interactively select a previously generated client certificate and
activate it.
"""
certdir = os.path.join(_CONFIG_DIR, "client_certs")
certs = glob.glob(os.path.join(certdir, "*.crt"))
if len(certs) == 0:
print("There are no previously generated certificates.")
return
certdir = {}
for n, cert in enumerate(certs):
certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key")
print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0]))
choice = input("> ").strip()
if choice in certdir:
certfile, keyfile = certdir[choice]
self._activate_client_cert(certfile, keyfile)
else:
print("What?")
def _activate_client_cert(self, certfile, keyfile):
self.client_certs["active"] = (certfile, keyfile)
self.active_cert_domains = []
self.prompt = self.cert_prompt + "+" + os.path.basename(certfile).replace('.crt','') + "> " + "\001\x1b[0m\002"
self._debug("Using ID {} / {}.".format(*self.client_certs["active"]))
def _deactivate_client_cert(self):
if self.active_is_transient:
for filename in self.client_certs["active"]:
os.remove(filename)
for domain in self.active_cert_domains:
self.client_certs.pop(domain)
self.client_certs["active"] = None
self.active_cert_domains = []
self.prompt = self.no_cert_prompt
self.active_is_transient = False
# Cmd implementation follows
@ -1155,13 +1014,13 @@ class GeminiClient(cmd.Cmd):
choice = input("> ").strip()
if choice == "1":
print("Deactivating client certificate.")
self._deactivate_client_cert()
netcache._deactivate_client_cert()
elif choice == "2":
self._generate_persistent_client_cert()
netcache._generate_persistent_client_cert()
elif choice == "3":
self._choose_client_cert()
netcache._choose_client_cert()
elif choice == "4":
self._load_client_cert()
netcache._load_client_cert()
else:
print("Aborting.")