Refactoring to isolate workers better (trying `multiprocessing`)
Each worker loads their own extension modules. This will enable starting and stopping workers independently. `multiprocessing` would be nice, but there must be a way to pass the incoming connections to the worker process somehow. Pickling the connection doesn't seem to work.
This commit is contained in:
parent
3fc06fae08
commit
28d6cb6148
|
@ -485,9 +485,9 @@ from .gemini import Server, Cache
|
|||
from .markdown import to_gemtext as markdown_to_gemtext
|
||||
|
||||
|
||||
__version__ = '0.4.1'
|
||||
__version__ = '0.5.0'
|
||||
__all__ = [
|
||||
'Config', 'Capsule', 'Cache',
|
||||
'Config', 'Cache',
|
||||
'get_mime_type', 'markdown_to_gemtext'
|
||||
]
|
||||
|
||||
|
@ -508,7 +508,6 @@ class Config:
|
|||
"""
|
||||
|
||||
def __init__(self, config_path):
|
||||
self.debug_memtrace = False
|
||||
self.ini = configparser.ConfigParser()
|
||||
if os.path.exists(config_path):
|
||||
self.ini.read(config_path)
|
||||
|
@ -556,6 +555,9 @@ class Config:
|
|||
def max_upload_size(self):
|
||||
return self.ini.getint('titan', 'upload_limit', fallback=10 * 1024 * 1024)
|
||||
|
||||
def require_upload_identity(self):
|
||||
return self.ini.getboolean('titan', 'require_identity', fallback=True)
|
||||
|
||||
def section(self, name):
|
||||
"""
|
||||
Find a section in the config INI file.
|
||||
|
@ -607,137 +609,16 @@ class Capsule:
|
|||
cfg (Config): Server configuration.
|
||||
"""
|
||||
|
||||
_capsule = None
|
||||
|
||||
def __init__(self, cfg):
|
||||
Capsule._capsule = self
|
||||
self.cfg = cfg
|
||||
self.sv = Server(
|
||||
cfg.hostnames(),
|
||||
cfg.certs_dir() / 'cert.pem',
|
||||
cfg.certs_dir() / 'key.pem',
|
||||
address=cfg.address(),
|
||||
port=cfg.port(),
|
||||
session_id=f'GmCapsule:{cfg.port()}'.encode('utf-8'),
|
||||
max_upload_size=cfg.max_upload_size(),
|
||||
num_threads=cfg.num_threads()
|
||||
)
|
||||
# Modules define the entrypoints.
|
||||
self.load_modules()
|
||||
|
||||
@staticmethod
|
||||
def config():
|
||||
"""
|
||||
Returns:
|
||||
Config: Server configuration.
|
||||
"""
|
||||
return Capsule._capsule.cfg
|
||||
|
||||
def add(self, path, entrypoint, hostname=None, protocol='gemini'):
|
||||
"""
|
||||
Register a URL entry point.
|
||||
|
||||
Extension modules must call this to become visible in the server's
|
||||
path hierarchy. Entry points are looked up in the order the modules
|
||||
were loaded, with earlier modules getting precedence.
|
||||
|
||||
Args:
|
||||
path (str): URL path. Must begin with a slash (``/``). Asterisk
|
||||
wildcards (``*``) are supported. Note that if the path
|
||||
``/*`` is registered, it will match any requested URL.
|
||||
entrypoint (callable): Function or other callable object that
|
||||
gets called when a request is processed with a matching
|
||||
URL path. A :class:`~gmcapsule.gemini.Request` is passed in as the
|
||||
only argument.
|
||||
hostname (str): Hostname for the entry point. If omitted,
|
||||
the entry point applies to all configured hostnames.
|
||||
protocol (str): Protocol for the entry point.
|
||||
"""
|
||||
if hostname:
|
||||
self.sv.add_entrypoint(protocol, hostname, path, entrypoint)
|
||||
else:
|
||||
for hostname in self.cfg.hostnames():
|
||||
if not hostname:
|
||||
raise Exception(f'invalid hostname: "{hostname}"')
|
||||
self.sv.add_entrypoint(protocol, hostname, path, entrypoint)
|
||||
|
||||
def add_cache(self, cache):
|
||||
"""
|
||||
Install a cache.
|
||||
|
||||
All installed caches will attempt to save and load content until one
|
||||
succeeds. The caches installed first get precedence.
|
||||
|
||||
Args:
|
||||
cache (Cache): Cache instance.
|
||||
"""
|
||||
self.sv.add_cache(cache)
|
||||
|
||||
def load_modules(self):
|
||||
# The configuration can override default priorities.
|
||||
mod_priority = {}
|
||||
if 'priority' in self.cfg.ini:
|
||||
for name, priority in self.cfg.section('priority').items():
|
||||
mod_priority[name] = int(priority)
|
||||
|
||||
# We will load all recognized modules.
|
||||
name_pattern = re.compile(r'([0-9][0-9])_(.*)\.py')
|
||||
dirs = []
|
||||
for user_dir in self.cfg.mod_dirs():
|
||||
if user_dir not in dirs:
|
||||
dirs.append(user_dir)
|
||||
dirs += [Path(__file__).parent.resolve() / 'modules']
|
||||
mods = []
|
||||
for mdir in dirs:
|
||||
for mod_file in sorted(os.listdir(mdir)):
|
||||
m = name_pattern.match(mod_file)
|
||||
if m:
|
||||
path = (mdir / mod_file).resolve()
|
||||
name = m.group(2)
|
||||
loader = importlib.machinery.SourceFileLoader(name, str(path))
|
||||
spec = importlib.util.spec_from_loader(name, loader)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
loader.exec_module(mod)
|
||||
if name in mod_priority:
|
||||
priority = mod_priority[name]
|
||||
else:
|
||||
priority = int(m.group(1))
|
||||
mods.append((priority, name, mod))
|
||||
|
||||
# Initialize in priority order.
|
||||
for _, _, mod in sorted(mods):
|
||||
print(f'Init:', mod.__doc__)
|
||||
mod.init(self)
|
||||
|
||||
def shutdown_event(self):
|
||||
"""
|
||||
Returns:
|
||||
threading.Event: Event that is set when the server is
|
||||
shutting down. Background workers must wait on this and stop
|
||||
when the event is set.
|
||||
"""
|
||||
return self.sv.shutdown_event
|
||||
|
||||
def call_entrypoint(self, request):
|
||||
"""
|
||||
Calls the registered entry point for a request.
|
||||
|
||||
Args:
|
||||
request (Request): Request object.
|
||||
|
||||
Returns:
|
||||
Tuple with (response, cache). The response can be binary data, text,
|
||||
tuple with status and meta string, or tuple with status, meta, and body.
|
||||
The cache is None if the data was not read from a cache.
|
||||
"""
|
||||
return self.sv.call_entrypoint(request)
|
||||
self.sv = Server(cfg)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Start worker threads and begin accepting incoming connections. The
|
||||
server will run until stopped with a KeyboardInterrupt (^C).
|
||||
"""
|
||||
self.sv.run(memtrace=self.cfg.debug_memtrace)
|
||||
self.sv.run()
|
||||
|
||||
|
||||
def get_mime_type(path):
|
||||
|
@ -795,8 +676,6 @@ def run_server():
|
|||
args = argp.parse_args()
|
||||
|
||||
cfg = Config(args.config_file)
|
||||
cfg.debug_memtrace = args.trace_malloc
|
||||
|
||||
try:
|
||||
capsule = Capsule(cfg)
|
||||
capsule.run()
|
||||
|
|
|
@ -2,14 +2,15 @@
|
|||
# License: BSD-2-Clause
|
||||
|
||||
import fnmatch
|
||||
import gc
|
||||
import hashlib
|
||||
import queue
|
||||
import importlib
|
||||
import os.path
|
||||
import select
|
||||
import socket
|
||||
import threading
|
||||
import multiprocessing as mp
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import OpenSSL.crypto
|
||||
|
@ -144,56 +145,6 @@ def report_error(stream, code, msg):
|
|||
safe_close(stream)
|
||||
|
||||
|
||||
memtrace_lock = threading.Lock()
|
||||
|
||||
|
||||
def display_memtop(snapshot, prev_snapshot, key_type='lineno', limit=1000):
|
||||
import tracemalloc
|
||||
import linecache
|
||||
filters = (
|
||||
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
|
||||
tracemalloc.Filter(False, "<unknown>"),
|
||||
tracemalloc.Filter(False, "*/linecache.py"),
|
||||
tracemalloc.Filter(False, "*/tracemalloc.py")
|
||||
)
|
||||
snapshot = snapshot.filter_traces(filters)
|
||||
if prev_snapshot:
|
||||
prev_snapshot = prev_snapshot.filter_traces(filters)
|
||||
top_stats = snapshot.compare_to(prev_snapshot, key_type)
|
||||
top_type = 'delta'
|
||||
limit = 200
|
||||
else:
|
||||
top_stats = snapshot.statistics('traceback') #key_type)
|
||||
top_type = 'malloc'
|
||||
|
||||
with memtrace_lock:
|
||||
print("\n\nTop %s %s" % (limit, top_type))
|
||||
for index, stat in enumerate(top_stats[:limit], 1):
|
||||
if prev_snapshot:
|
||||
frame = stat.traceback[0]
|
||||
if stat.size_diff <= 0:
|
||||
continue
|
||||
print("#%s: \x1b[1m%.1f\x1b[0m KiB (%+.1f KiB) count=%d (%+d)"
|
||||
% (index,
|
||||
stat.size / 1024, stat.size_diff / 1024, stat.count, stat.count_diff))
|
||||
else:
|
||||
print("#%s: \x1b[1m%.1f\x1b[0m KiB count=%d"
|
||||
% (index, stat.size / 1024, stat.count))
|
||||
for frame in stat.traceback:
|
||||
line = linecache.getline(frame.filename, frame.lineno).strip()
|
||||
if 'python3.' in frame.filename: continue
|
||||
if line:
|
||||
print('\x1b[0;31m %35s:%-5s ' % (frame.filename[-35:], str(frame.lineno) + ':'))
|
||||
print('\x1b[0;36m %s\x1b[0m' % line)
|
||||
|
||||
other = top_stats[limit:]
|
||||
if other:
|
||||
size = sum(stat.size for stat in other)
|
||||
print("%s other: %.1f KiB" % (len(other), size / 1024))
|
||||
total = sum(stat.size for stat in top_stats)
|
||||
print("Total size: %.1f KiB\n\n" % (total / 1024))
|
||||
|
||||
|
||||
class Identity:
|
||||
"""
|
||||
Client certificate.
|
||||
|
@ -339,39 +290,204 @@ class Cache:
|
|||
return None, None
|
||||
|
||||
|
||||
class Worker(threading.Thread):
|
||||
"""Thread that processes incoming requests from clients."""
|
||||
class WorkerContext:
|
||||
def __init__(self, cfg, shutdown_event):
|
||||
self.cfg = cfg
|
||||
self.shutdown = shutdown_event
|
||||
self.hostnames = cfg.hostnames()
|
||||
self.entrypoints = {'gemini': {}, 'titan': {}}
|
||||
for proto in ['gemini', 'titan']:
|
||||
self.entrypoints[proto] = {}
|
||||
for hostname in self.hostnames:
|
||||
self.entrypoints[proto][hostname] = []
|
||||
self.caches = []
|
||||
self.is_quiet = False
|
||||
|
||||
def __init__(self, id, server):
|
||||
super().__init__()
|
||||
def set_quiet(self, is_quiet):
|
||||
self.is_quiet = is_quiet
|
||||
|
||||
def config(self):
|
||||
return self.cfg
|
||||
|
||||
def print(self, *args):
|
||||
if not self.is_quiet:
|
||||
print(*args)
|
||||
|
||||
def add_entrypoint(self, protocol, hostname, path_pattern, entrypoint):
|
||||
self.entrypoints[protocol][hostname].append((path_pattern, entrypoint))
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
for hostname in self.hostnames:
|
||||
self.add_entrypoint('gemini', hostname, key, value)
|
||||
|
||||
def add_cache(self, cache):
|
||||
"""
|
||||
Install a cache.
|
||||
|
||||
All installed caches will attempt to save and load content until one
|
||||
succeeds. The caches installed first get precedence.
|
||||
|
||||
Args:
|
||||
cache (Cache): Cache instance.
|
||||
"""
|
||||
self.caches.append(cache)
|
||||
|
||||
def add(self, path, entrypoint, hostname=None, protocol='gemini'):
|
||||
"""
|
||||
Register a URL entry point.
|
||||
|
||||
Extension modules must call this to become visible in the server's
|
||||
path hierarchy. Entry points are looked up in the order the modules
|
||||
were loaded, with earlier modules getting precedence.
|
||||
|
||||
Args:
|
||||
path (str): URL path. Must begin with a slash (``/``). Asterisk
|
||||
wildcards (``*``) are supported. Note that if the path
|
||||
``/*`` is registered, it will match any requested URL.
|
||||
entrypoint (callable): Function or other callable object that
|
||||
gets called when a request is processed with a matching
|
||||
URL path. A :class:`~gmcapsule.gemini.Request` is passed in as the
|
||||
only argument.
|
||||
hostname (str): Hostname for the entry point. If omitted,
|
||||
the entry point applies to all configured hostnames.
|
||||
protocol (str): Protocol for the entry point.
|
||||
"""
|
||||
if hostname:
|
||||
self.add_entrypoint(protocol, hostname, path, entrypoint)
|
||||
else:
|
||||
for hostname in self.cfg.hostnames():
|
||||
if not hostname:
|
||||
raise Exception(f'invalid hostname: "{hostname}"')
|
||||
self.add_entrypoint(protocol, hostname, path, entrypoint)
|
||||
|
||||
def load_modules(self):
|
||||
# The configuration can override default priorities.
|
||||
mod_priority = {}
|
||||
if 'priority' in self.cfg.ini:
|
||||
for name, priority in self.cfg.section('priority').items():
|
||||
mod_priority[name] = int(priority)
|
||||
|
||||
# We will load all recognized modules.
|
||||
name_pattern = re.compile(r'([0-9][0-9])_(.*)\.py')
|
||||
dirs = []
|
||||
for user_dir in self.cfg.mod_dirs():
|
||||
if user_dir not in dirs:
|
||||
dirs.append(user_dir)
|
||||
dirs += [Path(__file__).parent.resolve() / 'modules']
|
||||
mods = []
|
||||
for mdir in dirs:
|
||||
for mod_file in sorted(os.listdir(mdir)):
|
||||
m = name_pattern.match(mod_file)
|
||||
if m:
|
||||
path = (mdir / mod_file).resolve()
|
||||
name = m.group(2)
|
||||
loader = importlib.machinery.SourceFileLoader(name, str(path))
|
||||
spec = importlib.util.spec_from_loader(name, loader)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
loader.exec_module(mod)
|
||||
if name in mod_priority:
|
||||
priority = mod_priority[name]
|
||||
else:
|
||||
priority = int(m.group(1))
|
||||
mods.append((priority, name, mod))
|
||||
|
||||
# Initialize in priority order.
|
||||
for _, name, mod in sorted(mods):
|
||||
self.print(f'Init:', mod.__doc__ if mod.__doc__ else name)
|
||||
mod.init(self)
|
||||
|
||||
def shutdown_event(self):
|
||||
"""
|
||||
Returns:
|
||||
threading.Event: Event that is set when the server is
|
||||
shutting down. Background workers must wait on this and stop
|
||||
when the event is set.
|
||||
"""
|
||||
return self.shutdown
|
||||
|
||||
def call_entrypoint(self, request):
|
||||
"""
|
||||
Calls the registered entry point for a request.
|
||||
|
||||
Args:
|
||||
request (Request): Request object.
|
||||
|
||||
Returns:
|
||||
Tuple with (response, cache). The response can be binary data, text,
|
||||
tuple with status and meta string, or tuple with status, meta, and body.
|
||||
The cache is None if the data was not read from a cache.
|
||||
"""
|
||||
entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)
|
||||
|
||||
caches = self.caches if (request.scheme == 'gemini' and
|
||||
not request.identity and
|
||||
not request.query) else []
|
||||
from_cache = None
|
||||
|
||||
if entrypoint:
|
||||
# Check the caches first.
|
||||
for cache in caches:
|
||||
media, content = cache.try_load(request.hostname + request.path)
|
||||
if not media is None:
|
||||
response = 20, media, content
|
||||
if hasattr(content, '__len__'):
|
||||
self.print('%d bytes from cache, %s' % (len(content), media))
|
||||
else:
|
||||
self.print('stream from cache,', media)
|
||||
return response, cache
|
||||
|
||||
# Process the request normally if there is nothing cached.
|
||||
if not from_cache:
|
||||
try:
|
||||
return entrypoint(request), None
|
||||
except Exception as x:
|
||||
import traceback
|
||||
traceback.print_exception(x)
|
||||
raise GeminiError(40, 'Temporary failure')
|
||||
|
||||
raise GeminiError(50, 'Permanent failure')
|
||||
|
||||
|
||||
class Worker(mp.Process):
|
||||
"""Process that handles incoming requests from clients."""
|
||||
|
||||
def __init__(self, id, cfg, work_queue, shutdown_event):
|
||||
super().__init__(target=Worker._run, args=(self,))
|
||||
self.id = id
|
||||
self.server = server
|
||||
self.jobs = server.work_queue
|
||||
self.cfg = cfg
|
||||
self.port = cfg.port()
|
||||
self.context = WorkerContext(self.cfg, shutdown_event)
|
||||
self.context.set_quiet(id > 0)
|
||||
self.jobs = work_queue
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
stream, from_addr = self.jobs.get()
|
||||
if stream is None:
|
||||
break
|
||||
|
||||
try:
|
||||
self.process_request(stream, from_addr)
|
||||
except OpenSSL.SSL.SysCallError as error:
|
||||
self.log(f'OpenSSL error: ' + str(error))
|
||||
except AbortedIOError as error:
|
||||
self.log(f'Send aborted: ' + str(error))
|
||||
except Exception as error:
|
||||
self.log(f'Problem: ' + str(error))
|
||||
# Some unexpected problem...
|
||||
#import traceback
|
||||
#traceback.print_exc()
|
||||
# try:
|
||||
# report_error(stream, 42, str(error))
|
||||
# except:
|
||||
# pass
|
||||
|
||||
safe_close(stream)
|
||||
stream, from_addr = None, None
|
||||
def _run(self):
|
||||
try:
|
||||
# Extensions are initialized in the worker process.
|
||||
self.context.load_modules()
|
||||
self.context.set_quiet(False)
|
||||
while True:
|
||||
stream, from_addr = self.jobs.get()
|
||||
if stream is None:
|
||||
break
|
||||
try:
|
||||
self.process_request(stream, from_addr)
|
||||
except OpenSSL.SSL.SysCallError as error:
|
||||
self.log(f'OpenSSL error: ' + str(error))
|
||||
except AbortedIOError as error:
|
||||
self.log(f'Send aborted: ' + str(error))
|
||||
except Exception as error:
|
||||
self.log(f'Problem: ' + str(error))
|
||||
# Some unexpected problem...
|
||||
#import traceback
|
||||
#traceback.print_exc()
|
||||
# try:
|
||||
# report_error(stream, 42, str(error))
|
||||
# except:
|
||||
# pass
|
||||
safe_close(stream)
|
||||
stream, from_addr = None, None
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def log(self, *args):
|
||||
print(time.strftime('%Y-%m-%d %H:%M:%S'), f'[{self.id}]', '--', *args)
|
||||
|
@ -418,7 +534,7 @@ class Worker(threading.Thread):
|
|||
identity = Identity(cl_cert) if cl_cert else None
|
||||
|
||||
if request.startswith('titan:'):
|
||||
if identity is None and self.server.require_upload_identity:
|
||||
if identity is None and self.cfg.require_upload_identity():
|
||||
report_error(stream, 60, "Client certificate required for upload")
|
||||
return
|
||||
|
||||
|
@ -433,7 +549,8 @@ class Worker(threading.Thread):
|
|||
elif p.startswith('mime='):
|
||||
req_mime = p[5:]
|
||||
self.log(f'Receiving Titan content: {expected_size}')
|
||||
if expected_size > self.server.max_upload_size and self.server.max_upload_size > 0:
|
||||
max_upload_size = self.cfg.max_upload_size()
|
||||
if expected_size > max_upload_size and max_upload_size > 0:
|
||||
report_error(stream, 59, "Maximum content length exceeded")
|
||||
return
|
||||
while len(data) < expected_size:
|
||||
|
@ -458,7 +575,7 @@ class Worker(threading.Thread):
|
|||
path = '/'
|
||||
hostname = url.hostname
|
||||
|
||||
if url.port != None and url.port != self.server.port:
|
||||
if url.port != None and url.port != self.port:
|
||||
report_error(stream, 59, "Invalid port number")
|
||||
return
|
||||
if not stream.get_servername():
|
||||
|
@ -481,7 +598,7 @@ class Worker(threading.Thread):
|
|||
content_mime=req_mime,
|
||||
content=data if len(data) else None
|
||||
)
|
||||
response, from_cache = self.server.call_entrypoint(request)
|
||||
response, from_cache = self.context.call_entrypoint(request)
|
||||
|
||||
# Determine status code, meta line, and body content.
|
||||
if type(response) == tuple:
|
||||
|
@ -521,24 +638,25 @@ class Worker(threading.Thread):
|
|||
|
||||
|
||||
class Server:
|
||||
def __init__(self, hostname_or_hostnames, cert_path, key_path,
|
||||
address='localhost', port=1965,
|
||||
cache=None, session_id=None, max_upload_size=0, num_threads=1,
|
||||
require_upload_identity=True):
|
||||
def __init__(self, cfg):
|
||||
mp.set_start_method('spawn')
|
||||
|
||||
hostname_or_hostnames = cfg.hostnames()
|
||||
cert_path = cfg.certs_dir() / 'cert.pem'
|
||||
key_path = cfg.certs_dir() / 'key.pem'
|
||||
address = cfg.address()
|
||||
port = cfg.port()
|
||||
session_id = f'GmCapsule:{cfg.port()}'.encode('utf-8')
|
||||
num_threads = cfg.num_threads()
|
||||
|
||||
self.hostnames = [hostname_or_hostnames] \
|
||||
if type(hostname_or_hostnames) == str else hostname_or_hostnames
|
||||
self.address = address
|
||||
self.port = port
|
||||
self.entrypoints = {'gemini': {}, 'titan': {}}
|
||||
for proto in ['gemini', 'titan']:
|
||||
self.entrypoints[proto] = {}
|
||||
for hostname in self.hostnames:
|
||||
self.entrypoints[proto][hostname] = []
|
||||
self.caches = []
|
||||
if cache:
|
||||
self.caches.append(cache)
|
||||
self.max_upload_size = max_upload_size
|
||||
self.require_upload_identity = require_upload_identity
|
||||
#if cache:
|
||||
# self.caches.append(cache)
|
||||
#self.max_upload_size = max_upload_size
|
||||
#self.require_upload_identity = require_upload_identity
|
||||
|
||||
if not os.path.exists(cert_path):
|
||||
raise Exception("certificate file not found: " + str(cert_path))
|
||||
|
@ -555,32 +673,17 @@ class Server:
|
|||
self.context.set_session_id(session_id)
|
||||
|
||||
# Spawn the worker threads.
|
||||
self.shutdown_event = threading.Event()
|
||||
self.shutdown_event = mp.Event()
|
||||
self.workers = []
|
||||
self.work_queue = queue.Queue()
|
||||
self.work_queue = mp.Queue()
|
||||
for worker_id in range(max(num_threads, 1)):
|
||||
worker = Worker(worker_id, self)
|
||||
worker = Worker(worker_id, cfg, self.work_queue, self.shutdown_event)
|
||||
self.workers.append(worker)
|
||||
|
||||
self.sock = None
|
||||
self.sv_conn = None
|
||||
|
||||
def add_cache(self, cache):
|
||||
self.caches.append(cache)
|
||||
|
||||
def add_entrypoint(self, protocol, hostname, path_pattern, entrypoint):
|
||||
self.entrypoints[protocol][hostname].append((path_pattern, entrypoint))
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
for hostname in self.hostnames:
|
||||
self.add_entrypoint('gemini', hostname, key, value)
|
||||
|
||||
def run(self, memtrace=False):
|
||||
self.memtrace = memtrace
|
||||
if self.memtrace:
|
||||
import tracemalloc
|
||||
tracemalloc.start(10)
|
||||
|
||||
def run(self):
|
||||
attempts = 60
|
||||
print(f'Opening port {self.port}...')
|
||||
while True:
|
||||
|
@ -599,9 +702,9 @@ class Server:
|
|||
print('...')
|
||||
print(f'Server started on port {self.port}')
|
||||
|
||||
MULTITHREAD = True
|
||||
MULTIPROCESS = True
|
||||
|
||||
if MULTITHREAD:
|
||||
if MULTIPROCESS:
|
||||
for worker in self.workers:
|
||||
worker.start()
|
||||
print(len(self.workers), 'worker(s) started')
|
||||
|
@ -615,7 +718,7 @@ class Server:
|
|||
stream._socket.settimeout(10)
|
||||
self.work_queue.put((stream, from_addr))
|
||||
|
||||
if not MULTITHREAD:
|
||||
if not MULTIPROCESS:
|
||||
self.work_queue.put((None, None)) # single iteration only
|
||||
self.workers[0].run()
|
||||
|
||||
|
@ -629,22 +732,6 @@ class Server:
|
|||
#traceback.print_exc()
|
||||
print(ex)
|
||||
|
||||
if self.memtrace:
|
||||
old_snapshot = snapshot
|
||||
gc.collect()
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
filters = (
|
||||
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
|
||||
tracemalloc.Filter(False, "<unknown>"),
|
||||
tracemalloc.Filter(False, "*/linecache.py"),
|
||||
tracemalloc.Filter(False, "*/tracemalloc.py"),
|
||||
tracemalloc.Filter(False, "*/mimetypes.py"),
|
||||
tracemalloc.Filter(False, "*/fnmatch.py")
|
||||
)
|
||||
snapshot = snapshot.filter_traces(filters)
|
||||
top_stats = snapshot.statistics('lineno')
|
||||
display_memtop(snapshot, None) #old_snapshot)
|
||||
|
||||
# Close the server socket.
|
||||
self.sv_conn = None
|
||||
self.sock.close()
|
||||
|
@ -652,7 +739,7 @@ class Server:
|
|||
|
||||
# Stop all workers.
|
||||
self.shutdown_event.set()
|
||||
if MULTITHREAD:
|
||||
if MULTIPROCESS:
|
||||
for i in range(len(self.workers)):
|
||||
self.work_queue.put((None, None))
|
||||
for worker in self.workers:
|
||||
|
@ -678,34 +765,3 @@ class Server:
|
|||
return None
|
||||
|
||||
return None
|
||||
|
||||
def call_entrypoint(self, request):
|
||||
entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)
|
||||
|
||||
caches = self.caches if (request.scheme == 'gemini' and
|
||||
not request.identity and
|
||||
not request.query) else []
|
||||
from_cache = None
|
||||
|
||||
if entrypoint:
|
||||
# Check the caches first.
|
||||
for cache in caches:
|
||||
media, content = cache.try_load(request.hostname + request.path)
|
||||
if not media is None:
|
||||
response = 20, media, content
|
||||
if hasattr(content, '__len__'):
|
||||
print('%d bytes from cache, %s' % (len(content), media))
|
||||
else:
|
||||
print('stream from cache,', media)
|
||||
return response, cache
|
||||
|
||||
# Process the request normally if there is nothing cached.
|
||||
if not from_cache:
|
||||
try:
|
||||
return entrypoint(request), None
|
||||
except Exception as x:
|
||||
import traceback
|
||||
traceback.print_exception(x)
|
||||
raise GeminiError(40, 'Temporary failure')
|
||||
|
||||
raise GeminiError(50, 'Permanent failure')
|
||||
|
|
|
@ -7,8 +7,8 @@ import re
|
|||
|
||||
|
||||
class PathRewriteHandler:
|
||||
def __init__(self, capsule, rewritten_path):
|
||||
self.capsule = capsule
|
||||
def __init__(self, context, rewritten_path):
|
||||
self.context = context
|
||||
self.rewritten_path = rewritten_path
|
||||
|
||||
def __call__(self, req):
|
||||
|
@ -23,8 +23,8 @@ class PathRewriteHandler:
|
|||
if req.num_rewrites == 100:
|
||||
return 40, "Stuck in rewrite loop: " + req.url()
|
||||
|
||||
print("[rewrite]", old_path, "->", req.path)
|
||||
return self.capsule.call_entrypoint(req)[0]
|
||||
self.context.print("[rewrite]", old_path, "->", req.path)
|
||||
return self.context.call_entrypoint(req)[0]
|
||||
|
||||
|
||||
class Responder:
|
||||
|
@ -38,8 +38,8 @@ class Responder:
|
|||
|
||||
|
||||
class Rewriter:
|
||||
def __init__(self, capsule, protocol, host, src_path, dst_path, status):
|
||||
self.capsule = capsule
|
||||
def __init__(self, context, protocol, host, src_path, dst_path, status):
|
||||
self.context = context
|
||||
self.protocol = protocol
|
||||
self.host = host
|
||||
self.src_path = src_path
|
||||
|
@ -52,7 +52,7 @@ class Rewriter:
|
|||
if self.dst_path:
|
||||
new_path = self.src_path.sub(self.dst_path, path)
|
||||
if new_path != path:
|
||||
return PathRewriteHandler(self.capsule, new_path)
|
||||
return PathRewriteHandler(self.context, new_path)
|
||||
|
||||
elif self.status:
|
||||
m = self.src_path.match(path)
|
||||
|
@ -63,14 +63,14 @@ class Rewriter:
|
|||
if cap:
|
||||
status = status.replace(f'\\{i}', cap)
|
||||
code, meta = status.split()
|
||||
print("[rewrite]", code, meta)
|
||||
self.context.print("[rewrite]", code, meta)
|
||||
return Responder(int(code), meta)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def init(capsule):
|
||||
cfg = capsule.config()
|
||||
def init(context):
|
||||
cfg = context.config()
|
||||
for section in cfg.prefixed_sections('rewrite.').values():
|
||||
protocol = section.get('protocol', None)
|
||||
host = section.get('host', cfg.hostnames()[0])
|
||||
|
@ -78,7 +78,7 @@ def init(capsule):
|
|||
dst_path = section.get('repl', None)
|
||||
status = section.get('status', None)
|
||||
for proto in [protocol] if protocol else ['gemini', 'titan']:
|
||||
capsule.add(Rewriter(capsule, proto, host, src_path, dst_path, status),
|
||||
context.add(Rewriter(context, proto, host, src_path, dst_path, status),
|
||||
None, # `Rewriter` will return a suitable handler callback.
|
||||
host,
|
||||
proto)
|
||||
|
|
|
@ -14,7 +14,7 @@ import time
|
|||
import urllib
|
||||
from pathlib import Path
|
||||
|
||||
from gmcapsule import Cache, Capsule, markdown_to_gemtext
|
||||
from gmcapsule import Cache, markdown_to_gemtext
|
||||
|
||||
pjoin = os.path.join
|
||||
|
||||
|
@ -69,6 +69,7 @@ class GitViewCache(Cache):
|
|||
return True
|
||||
|
||||
|
||||
CONFIG = None
|
||||
GIT = '/usr/bin/git'
|
||||
HOSTNAME = 'localhost'
|
||||
NUM_COMMITS_FRONT = 8
|
||||
|
@ -95,7 +96,7 @@ def preformat(raw, alt_text=''):
|
|||
|
||||
def repositories():
|
||||
roots = []
|
||||
for name, cfg in Capsule.config().prefixed_sections('gitview.').items():
|
||||
for name, cfg in CONFIG.prefixed_sections('gitview.').items():
|
||||
url = cfg['url_root']
|
||||
if not url.startswith('/'): url = '/' + url
|
||||
if not url.endswith('/'): url += '/'
|
||||
|
@ -342,7 +343,7 @@ def handle_request(gemini_request):
|
|||
email_subject = urllib.parse.quote(f"{req.cfg['title']} commit {hash}")
|
||||
email_body = urllib.parse.quote("=> gemini://%s:%d%scommits/%s" %
|
||||
(HOSTNAME,
|
||||
Capsule.config().port(),
|
||||
CONFIG.port(),
|
||||
req.url_root + req.ubranch,
|
||||
full_hash)
|
||||
)
|
||||
|
@ -465,8 +466,12 @@ def main_page(req):
|
|||
return page
|
||||
|
||||
|
||||
def init(capsule):
|
||||
cfg = capsule.config()
|
||||
def init(context):
|
||||
cfg = context.config()
|
||||
|
||||
global CONFIG
|
||||
CONFIG = cfg
|
||||
|
||||
try:
|
||||
mod_cfg = cfg.section('gitview')
|
||||
|
||||
|
@ -479,13 +484,13 @@ def init(capsule):
|
|||
HOSTNAME = cfg.hostnames()[0]
|
||||
|
||||
if 'cache_path' in mod_cfg:
|
||||
capsule.add_cache(GitViewCache(HOSTNAME, mod_cfg['cache_path']))
|
||||
context.add_cache(GitViewCache(HOSTNAME, mod_cfg['cache_path']))
|
||||
|
||||
for name, url_root, _ in repositories():
|
||||
print(f' Adding repository "{name}"...')
|
||||
capsule.add('/', main_page, hostname=HOSTNAME)
|
||||
capsule.add(url_root[:-1], redirect_to_default, hostname=HOSTNAME)
|
||||
capsule.add(url_root + '*', handle_request, hostname=HOSTNAME)
|
||||
context.print(f' Adding repository "{name}"...')
|
||||
context.add('/', main_page, hostname=HOSTNAME)
|
||||
context.add(url_root[:-1], redirect_to_default, hostname=HOSTNAME)
|
||||
context.add(url_root + '*', handle_request, hostname=HOSTNAME)
|
||||
|
||||
except KeyError:
|
||||
# GitView not configured.
|
||||
|
|
|
@ -10,11 +10,11 @@ import subprocess
|
|||
import urllib.parse
|
||||
|
||||
import gmcapsule
|
||||
from gmcapsule import Capsule
|
||||
|
||||
|
||||
class CgiContext:
|
||||
def __init__(self, url_path, args, work_dir=None):
|
||||
def __init__(self, port, url_path, args, work_dir=None):
|
||||
self.port = port
|
||||
self.args = args
|
||||
self.base_path = url_path
|
||||
if self.base_path.endswith('/*'):
|
||||
|
@ -35,7 +35,7 @@ class CgiContext:
|
|||
env_vars['SERVER_SOFTWARE'] = 'GmCapsule/' + gmcapsule.__version__
|
||||
env_vars['SERVER_PROTOCOL'] = req.scheme.upper()
|
||||
env_vars['SERVER_NAME'] = req.hostname
|
||||
env_vars['SERVER_PORT'] = str(Capsule.config().port())
|
||||
env_vars['SERVER_PORT'] = str(self.port)
|
||||
env_vars[req.scheme.upper() + '_URL'] = f"{req.scheme}://{req.hostname}{req.path}" + (
|
||||
'?' + req.query if req.query != None else '')
|
||||
env_vars[req.scheme.upper() + '_URL_PATH'] = req.path
|
||||
|
@ -89,9 +89,10 @@ class CgiContext:
|
|||
|
||||
|
||||
class CgiTreeMapper:
|
||||
def __init__(self, protocol, host, root_dir):
|
||||
def __init__(self, protocol, host, port, root_dir):
|
||||
self.protocol = protocol
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.root_dir = pathlib.Path(root_dir)
|
||||
|
||||
def __call__(self, url_path):
|
||||
|
@ -103,18 +104,18 @@ class CgiTreeMapper:
|
|||
if os.path.isdir(fn):
|
||||
return None
|
||||
if os.access(fn, os.X_OK):
|
||||
return CgiContext(url_path, [fn], work_dir=os.path.dirname(fn))
|
||||
return CgiContext(self.port, url_path, [fn], work_dir=os.path.dirname(fn))
|
||||
return None
|
||||
|
||||
|
||||
# # NOTE: This require restarting the server when binaries are added/removed.
|
||||
# def add_cgibin_entrypoints_recursively(capsule, host, base, cur_dir=None):
|
||||
# def add_cgibin_entrypoints_recursively(context, host, base, cur_dir=None):
|
||||
# if cur_dir is None:
|
||||
# cur_dir = base
|
||||
# for name in os.listdir(cur_dir):
|
||||
# fn = cur_dir / name
|
||||
# if os.path.isdir(fn):
|
||||
# add_cgibin_entrypoints_recursively(capsule, host, base, fn)
|
||||
# add_cgibin_entrypoints_recursively(context, host, base, fn)
|
||||
# elif os.access(fn, os.X_OK):
|
||||
# protocol = 'gemini'
|
||||
# if name.endswith(',titan'):
|
||||
|
@ -125,31 +126,31 @@ class CgiTreeMapper:
|
|||
# if protocol == 'titan':
|
||||
# path = path[:-6]
|
||||
# print(f' {protocol}://{host}{path} ->', args)
|
||||
# capsule.add(path, CgiContext(path, args, work_dir=cur_dir), host, protocol)
|
||||
# context.add(path, CgiContext(path, args, work_dir=cur_dir), host, protocol)
|
||||
|
||||
|
||||
def init(capsule):
|
||||
cfg = Capsule.config()
|
||||
def init(context):
|
||||
cfg = context.config()
|
||||
default_host = cfg.hostnames()[0]
|
||||
|
||||
# Custom entrypoints for specific URLs.
|
||||
for section in Capsule.config().prefixed_sections('cgi.').values():
|
||||
for section in cfg.prefixed_sections('cgi.').values():
|
||||
protocol = section.get('protocol', fallback='gemini')
|
||||
host = section.get('host', fallback=default_host)
|
||||
work_dir = section.get('cwd', fallback=None)
|
||||
args = shlex.split(section.get('command'))
|
||||
for path in shlex.split(section.get('path', fallback='/*')):
|
||||
print(f' {protocol}://{host}{path} ->', args)
|
||||
capsule.add(path, CgiContext(path, args, work_dir),
|
||||
context.print(f' {protocol}://{host}{path} ->', args)
|
||||
context.add(path, CgiContext(cfg.port(), path, args, work_dir),
|
||||
host, protocol)
|
||||
|
||||
# Automatic entrypoints for all executables.
|
||||
bin_root = Capsule.config().ini.get('cgi', 'bin_root', fallback=None)
|
||||
bin_root = cfg.ini.get('cgi', 'bin_root', fallback=None)
|
||||
if bin_root != None:
|
||||
bin_root = pathlib.Path(bin_root).resolve()
|
||||
for host in Capsule.config().hostnames():
|
||||
for host in cfg.hostnames():
|
||||
host_bin_root = bin_root / host
|
||||
for protocol in ['gemini', 'titan']:
|
||||
capsule.add(
|
||||
CgiTreeMapper(protocol, host, host_bin_root), None,
|
||||
context.add(
|
||||
CgiTreeMapper(protocol, host, cfg.port(), host_bin_root), None,
|
||||
host, protocol)
|
||||
|
|
|
@ -7,18 +7,18 @@ import fnmatch
|
|||
import os.path
|
||||
import string
|
||||
|
||||
from gmcapsule import Capsule, get_mime_type
|
||||
from gmcapsule import get_mime_type
|
||||
from pathlib import Path
|
||||
|
||||
META = '.meta'
|
||||
CONFIG = None
|
||||
|
||||
|
||||
def check_meta_rules(path, hostname):
|
||||
cfg = Capsule.config()
|
||||
root = cfg.root_dir() / hostname
|
||||
root = CONFIG.root_dir() / hostname
|
||||
dir = Path(path).parent
|
||||
while True:
|
||||
if not str(dir).startswith(str(cfg.root_dir())):
|
||||
if not str(dir).startswith(str(CONFIG.root_dir())):
|
||||
break
|
||||
if (dir / META).exists():
|
||||
for rule in open(dir / META, 'rt').readlines():
|
||||
|
@ -41,7 +41,7 @@ def serve_file(req):
|
|||
if req.scheme != 'gemini':
|
||||
return 59, "Only Gemini requests allowed"
|
||||
|
||||
cfg = Capsule.config()
|
||||
cfg = CONFIG
|
||||
if req.path == '':
|
||||
return 31, '/'
|
||||
|
||||
|
@ -71,8 +71,10 @@ def serve_file(req):
|
|||
return status, meta, (open(path, 'rb') if status == 20 else None)
|
||||
|
||||
|
||||
def init(capsule):
|
||||
cfg = capsule.config()
|
||||
def init(context):
|
||||
cfg = context.config()
|
||||
global CONFIG
|
||||
CONFIG = cfg
|
||||
if 'static' in cfg.ini and 'root' in cfg.section('static'):
|
||||
print(' Content directory:', cfg.root_dir() / '{hostname}')
|
||||
capsule.add('/*', serve_file)
|
||||
context.print(' Content directory:', cfg.root_dir() / '{hostname}')
|
||||
context.add('/*', serve_file)
|
||||
|
|
26
gmcapsuled
26
gmcapsuled
|
@ -5,25 +5,21 @@
|
|||
# License: BSD-2-Clause
|
||||
|
||||
import argparse
|
||||
|
||||
import gmcapsule
|
||||
from pathlib import Path
|
||||
|
||||
VERSION = gmcapsule.__version__
|
||||
|
||||
print(f"GmCapsule v{VERSION}")
|
||||
if __name__ == '__main__':
|
||||
print(f"GmCapsule v{VERSION}")
|
||||
|
||||
argp = argparse.ArgumentParser(description='GmCapsule is an extensible server for Gemini and Titan.')
|
||||
argp.add_argument('-c', '--config',
|
||||
dest='config_file',
|
||||
default=Path.home() / '.gmcapsulerc',
|
||||
help='Configuration file to load at startup')
|
||||
argp.add_argument('--trace-malloc',
|
||||
action='store_true',
|
||||
help='Enable memory allocation tracing (for debugging)')
|
||||
args = argp.parse_args()
|
||||
argp = argparse.ArgumentParser(description='GmCapsule is an extensible server for Gemini and Titan.')
|
||||
argp.add_argument('-c', '--config',
|
||||
dest='config_file',
|
||||
default=Path.home() / '.gmcapsulerc',
|
||||
help='Configuration file to load at startup')
|
||||
args = argp.parse_args()
|
||||
|
||||
cfg = gmcapsule.Config(args.config_file)
|
||||
cfg.debug_memtrace = args.trace_malloc
|
||||
capsule = gmcapsule.Capsule(cfg)
|
||||
capsule.run()
|
||||
cfg = gmcapsule.Config(args.config_file)
|
||||
capsule = gmcapsule.Capsule(cfg)
|
||||
capsule.run()
|
||||
|
|
Loading…
Reference in New Issue