Use multiple processes to handle requests

Normal Python threading is subject to the Global Interpreter Lock,
which means only one thread gets to execute Python code at once.
This still allows concurrent I/O operations, but if request handling
requires long-running Python code, only one request would be handled
at any given time, slowing things down.

This commit adds a configurable number of RequestHandlers
that offload the processing of requests to separately running
processes. Several changes were necessary to allow passing data
between the main process and the workers, mostly to ensure that
everything is pickleable.

The main process still uses multiple threads to handle incoming
connections and do I/O with the clients.
This commit is contained in:
Jaakko Keränen 2023-06-17 10:24:30 +03:00
parent d17e4e2f7b
commit e54db584a4
No known key found for this signature in database
GPG Key ID: BACCFCFB98DB2EDC
6 changed files with 414 additions and 259 deletions

View File

@ -51,7 +51,7 @@ The log can be viewed via journalctl (or syslog):
* Extension modules can register new protocols in addition to the built-in Gemini and Titan.
* SIGHUP causes the configuration file to be reloaded and workers to be restarted. The listening socket remains open, so the socket and TLS parameters cannot be changed.
* API change: Extension modules get initialized separately in each worker thread. Instead of a `Capsule`, the extension module `init` method is passed a `WorkerContext`. `Capsule` is no longer available as global state.
* API change: Extension modules get initialized separately in each worker thread. Instead of a `Capsule`, the extension module `init` method is passed a `Context`. `Capsule` is no longer available as global state.
### v0.4

View File

@ -10,9 +10,9 @@ These classes and functions are available to extension modules by importing
Config
Capsule
Cache
Context
gemini.Request
gemini.Identity
WorkerContext
Classes
@ -34,6 +34,11 @@ Cache
.. autoclass:: gmcapsule.Cache
:members:
Context
-------
.. autoclass:: gmcapsule.Context
:members:
Request
-------
.. autoclass:: gmcapsule.gemini.Request
@ -44,11 +49,6 @@ Identity
.. autoclass:: gmcapsule.gemini.Identity
:members:
WorkerContext
-------------
.. autoclass:: gmcapsule.WorkerContext
:members:
Functions
*********

View File

@ -6,8 +6,9 @@
;address = 0.0.0.0
;port = 1965
;certs = .certs
;modules =
;modules =
;threads = 5
;processes = 2
[static]
root = .
@ -15,6 +16,9 @@ root = .
[titan]
;upload_limit = 10485760
[cgi]
bin_root = ./cgi-bin
[cgi.booster]
protocol = titan
host = localhost
@ -25,11 +29,15 @@ command = /usr/bin/python3 ../booster/booster.py
path = /cgienv
command = printenv
[rewrite.test]
path = ^/altenv$
status = 30 gemini://localhost/cgienv${QUERY_STRING}
;--------------------------------------------------------------------------
;[gitview]
;git = /usr/bin/git
;cache_path = /Users/jaakko/Library/Caches/gmgitview
[gitview]
git = /usr/bin/git
cache_path = /Users/jaakko/Library/Caches/gmgitview
[gitview.lagrange]
title = Lagrange
@ -47,3 +55,12 @@ clone_url = https://git.skyjake.fi/gemini/gitview.git
path = /Users/jaakko/src/gmgitview
url_root = gmgitview
default_branch = main
[gitview.bubble]
title = Bubble
brief = Bulletin Boards for Gemini
clone_url = https://git.skyjake.fi/gemini/bubble.git
path = /Users/jaakko/src/bubble
url_root = bubble
default_branch = main

View File

@ -428,7 +428,7 @@ Each extension module is required to have an initialization function:
:param context: Worker context. The extension can use this to access
configuration parameters, install caches, and register entry
points and custom scheme handlers.
:type context: gmcapsule.WorkerContext
:type context: gmcapsule.Context
Requests
@ -481,13 +481,13 @@ import shlex
import subprocess
from pathlib import Path
from .gemini import Server, Cache, WorkerContext
from .gemini import Server, Cache, Context
from .markdown import to_gemtext as markdown_to_gemtext
__version__ = '0.5.0'
__all__ = [
'Config', 'Cache', 'WorkerContext',
'Config', 'Cache', 'Context',
'get_mime_type', 'markdown_to_gemtext'
]
@ -557,6 +557,9 @@ class Config:
def num_threads(self):
return self.ini.getint('server', 'threads', fallback=5)
def num_processes(self):
return self.ini.getint('server', 'processes', fallback=2)
def max_upload_size(self):
return self.ini.getint('titan', 'upload_limit', fallback=10 * 1024 * 1024)

View File

@ -7,7 +7,7 @@ import importlib
import os.path
import select
import socket
#import multiprocessing as mp
import multiprocessing as mp
import threading
import queue
import re
@ -74,6 +74,10 @@ def safe_recv(stream, max_len, stall_timeout=10):
return data
def is_bytes(data):
return type(data) == bytes or type(data) == bytearray
def safe_sendall(stream, data, stall_timeout=30):
"""
Send data over an SSL connection, accounting for stalls and retries
@ -86,46 +90,55 @@ def safe_sendall(stream, data, stall_timeout=30):
stall_timeout (float): Number of seconds to wait until
terminating a stalled send.
"""
if type(data) == bytes or type(data) == bytearray:
streaming = False
else:
streaming = True
try:
if is_bytes(data):
streaming = False
else:
streaming = True
if isinstance(data, Path):
print('opening', data)
data = open(data, 'rb')
# We may need to retry sending with the exact same buffer,
# so keep it around until successful.
BUF_LEN = 32768
if streaming:
send_buf = data.read(BUF_LEN)
else:
send_buf = data[:BUF_LEN]
# We may need to retry sending with the exact same buffer,
# so keep it around until successful.
BUF_LEN = 32768
if streaming:
send_buf = data.read(BUF_LEN)
else:
send_buf = data[:BUF_LEN]
last_time = time.time()
pos = 0
while len(send_buf) > 0:
try:
if time.time() - last_time > stall_timeout:
raise AbortedIOError('stalled')
sent = stream.send(send_buf)
if sent < 0:
raise AbortedIOError('failed to send')
pos += sent
if streaming:
send_buf = send_buf[sent:]
if len(send_buf) < BUF_LEN / 2:
send_buf += data.read(BUF_LEN)
else:
send_buf = data[pos : pos + BUF_LEN]
if sent > 0:
last_time = time.time()
else:
last_time = time.time()
pos = 0
while len(send_buf) > 0:
try:
if time.time() - last_time > stall_timeout:
raise AbortedIOError('stalled')
sent = stream.send(send_buf)
if sent < 0:
raise AbortedIOError('failed to send')
pos += sent
if streaming:
send_buf = send_buf[sent:]
if len(send_buf) < BUF_LEN / 2:
send_buf += data.read(BUF_LEN)
else:
send_buf = data[pos : pos + BUF_LEN]
if sent > 0:
last_time = time.time()
else:
wait_for_write(stream, stall_timeout)
except OpenSSL.SSL.WantReadError:
pass
except OpenSSL.SSL.WantWriteError:
# Wait until the socket is ready for writing.
wait_for_write(stream, stall_timeout)
except OpenSSL.SSL.WantReadError:
pass
except OpenSSL.SSL.WantWriteError:
# Wait until the socket is ready for writing.
wait_for_write(stream, stall_timeout)
except OpenSSL.SSL.WantX509LookupError:
pass
except OpenSSL.SSL.WantX509LookupError:
pass
finally:
# Close resources and handles.
if data and hasattr(data, 'close'):
data.close()
def safe_close(stream):
@ -148,6 +161,19 @@ def report_error(stream, code, msg):
safe_close(stream)
def cert_subject(cert):
comps = {}
for name, value in cert.get_subject().get_components():
comps[name.decode()] = value.decode()
return comps
def cert_issuer(cert):
comps = {}
for name, value in cert.get_issuer().get_components():
comps[name.decode()] = value.decode()
return comps
class Identity:
"""
Client certificate.
@ -156,19 +182,21 @@ class Identity:
just for the public key.
Attributes:
cert (OpenSSL.SSL.X509): Certificate.
pubkey (OpenSSL.SSL.PKey): Public key.
cert (bytes): Certificate (DER format).
pubkey (bytes): Public key (DER format).
fp_cert (str): SHA-256 hash of the certificate.
fp_pubkey (str): SHA-256 hash of the public key.
"""
def __init__(self, cert):
self.cert = cert
self._subject = cert_subject(cert)
self._issuer = cert_issuer(cert)
self.cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
m = hashlib.sha256()
m.update(crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert))
m.update(self.cert)
self.fp_cert = m.hexdigest()
self.pubkey = self.cert.get_pubkey()
self.pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey())
m = hashlib.sha256()
m.update(crypto.dump_publickey(crypto.FILETYPE_ASN1, self.pubkey))
m.update(self.pubkey)
self.fp_pubkey = m.hexdigest()
def __str__(self):
@ -177,22 +205,16 @@ class Identity:
def subject(self):
"""
Returns:
dict: Name components of the certificate subject, e.g.: ``{'CN': 'Name'}``
dict: Name components of the certificate subject, e.g.: ``{'CN': 'Name'}``.
"""
comps = {}
for name, value in self.cert.get_subject().get_components():
comps[name.decode()] = value.decode()
return comps
return self._subject
def issuer(self):
"""
Returns:
dict: Name components of the certificate issuer, e.g.: ``{'CN': 'Name'}``
dict: Name components of the certificate issuer, e.g.: ``{'CN': 'Name'}``.
"""
comps = {}
for name, value in self.cert.get_issuer().get_components():
comps[name.decode()] = value.decode()
return comps
return self._issuer
class Request:
@ -220,7 +242,8 @@ class Request:
May be ``None``.
"""
def __init__(self, identity=None, scheme='gemini', hostname='', path='', query=None,
remote_address=None, content_token=None, content_mime=None, content=None):
remote_address=None, content_token=None, content_mime=None, content=None,
worker_id=None):
self.remote_address = remote_address
self.scheme = scheme
self.identity = identity
@ -230,6 +253,7 @@ class Request:
self.content_token = content_token
self.content_mime = content_mime
self.content = content
self.worker_id = worker_id
def url(self):
return f'{self.scheme}://{self.hostname}{self.path}{"?" + self.query if self.query else ""}'
@ -297,10 +321,128 @@ class Cache:
return None, None
class WorkerContext:
def __init__(self, cfg, shutdown_event):
def handle_gemini_or_titan_request(request_data):
worker = request_data.worker
stream = request_data.stream
data = request_data.buffered_data
from_addr = request_data.from_addr
identity = request_data.identity
request = request_data.request
expected_size = 0
req_token = None
req_mime = None
if request.startswith('titan:'):
if identity is None and worker.cfg.require_upload_identity():
report_error(stream, 60, "Client certificate required for upload")
return
# Read the rest of the data.
parms = request.split(';')
request = parms[0]
for p in parms:
if p.startswith('size='):
expected_size = int(p[5:])
elif p.startswith('token='):
req_token = p[6:]
elif p.startswith('mime='):
req_mime = p[5:]
worker.log(f'Receiving Titan content: {expected_size}')
max_upload_size = worker.cfg.max_upload_size()
if expected_size > max_upload_size and max_upload_size > 0:
report_error(stream, 59, "Maximum content length exceeded")
return
while len(data) < expected_size:
incoming = safe_recv(stream, 65536)
if len(incoming) == 0:
break
data += incoming
if len(data) != expected_size:
report_error(stream, 59, "Invalid content length")
return
else:
# No Payload in Gemini.
if len(data):
report_error(stream, 59, "Bad request")
return
url = urlparse(request)
path = url.path
if path == '':
path = '/'
hostname = url.hostname
if url.port != None and url.port != worker.port:
report_error(stream, 59, "Invalid port number")
return
if not stream.get_servername():
# Server name indication is required.
report_error(stream, 59, "Missing TLS server name indication")
return
if stream.get_servername().decode() != hostname:
report_error(stream, 53, "Proxy request refused")
return
try:
status, meta, body, from_cache = worker.context.call_entrypoint(Request(
identity,
remote_address=from_addr,
scheme=url.scheme,
hostname=hostname,
path=path,
query=url.query if '?' in request else None,
content_token=req_token,
content_mime=req_mime,
content=data if len(data) else None,
worker_id=request_data.worker.id
))
output = f'{status} {meta}\r\n'.encode('utf-8')
if is_bytes(body):
safe_sendall(stream, output + body)
else:
# `body` is some sort of streamable data, cannot send in one call.
safe_sendall(stream, output)
safe_sendall(stream, body)
# Save to cache.
if not from_cache and status == 20 and is_bytes(body):
for cache in worker.context.caches:
if cache.save(hostname + path, meta, body):
break
except GeminiError as error:
report_error(stream, error.status, str(error))
return
def unpack_response(response):
if type(response) == tuple:
if len(response) == 2:
status, meta = response
response = ''
else:
status, meta, response = response
else:
status = 20
meta = 'text/gemini; charset=utf-8'
if response == None:
body = b''
elif type(response) == str:
body = response.encode('utf-8')
else:
body = response
return status, meta, body
class Context:
def __init__(self, cfg, allow_extension_workers=False, handler_queue=None,
response_queues=None):
self.cfg = cfg
self.shutdown = shutdown_event
self.is_quiet = True
self.shutdown_events = []
self.allow_extension_workers = allow_extension_workers
self.hostnames = cfg.hostnames()
self.entrypoints = {'gemini': {}, 'titan': {}}
for proto in ['gemini', 'titan']:
@ -309,19 +451,41 @@ class WorkerContext:
self.entrypoints[proto][hostname] = []
self.caches = []
self.protocols = {}
self.is_quiet = False
self.add_protocol('gemini', handle_gemini_or_titan_request)
self.add_protocol('titan', handle_gemini_or_titan_request)
# Queue for pending handler jobs.
self.job_lock = threading.Lock()
self.job_id = 0
self.handler_queue = handler_queue
self.response_queues = response_queues
def config(self):
return self.cfg
def shutdown_event(self):
def is_background_work_allowed(self):
"""
Returns:
threading.Event: Event that is set when the server is
Determines whether extension modules are allowed to start workers.
"""
return self.allow_extension_workers
def add_shutdown_event(self, event):
"""
Registers a shutdown event. Extension modules must call this to
get notified when the server is being shut down.
Args:
event (threading.Event): Event that is set when the server is
shutting down. Background workers must wait on this and stop
when the event is set.
"""
return self.shutdown
if not self.is_background_work_allowed():
raise Exception("background work not allowed")
# This is used in a parser thread that is allowed to launch workers.
return self.shutdown_events.append(event)
def set_shutdown(self):
for event in self.shutdown_events:
event.set()
def set_quiet(self, is_quiet):
self.is_quiet = is_quiet
@ -452,9 +616,8 @@ class WorkerContext:
request (Request): Request object.
Returns:
Tuple with (response, cache). The response can be binary data, text,
tuple with status and meta string, or tuple with status, meta, and body.
The cache is None if the data was not read from a cache.
Tuple with (status, meta, body, cache). The body can be bytes/bytearray or
an I/O object. The cache is None if the data was not read from a cache.
"""
entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)
@ -468,17 +631,39 @@ class WorkerContext:
for cache in caches:
media, content = cache.try_load(request.hostname + request.path)
if not media is None:
response = 20, media, content
if hasattr(content, '__len__'):
self.print('%d bytes from cache, %s' % (len(content), media))
else:
self.print('stream from cache,', media)
return response, cache
return 20, media, content, cache
# Process the request normally if there is nothing cached.
if not from_cache:
try:
return entrypoint(request), None
if not self.handler_queue:
# Handle in the same thread/process synchronously. This is probably
# running under a RequestHandler process.
response = entrypoint(request)
else:
# Put it in the handler queue and wait for completion. Parser threads use
# this to hand work off to the handler processes.
with self.job_lock:
# The job ID is for verifying we are getting the right response.
self.job_id += 1
job_id = self.job_id
self.handler_queue.put((job_id, request, request.worker_id))
result_id, response = self.response_queues[request.worker_id].get()
if result_id != job_id:
raise Exception('response queue out of sync: request handler returned wrong job ID')
if isinstance(response, Exception):
raise response
status, meta, body = unpack_response(response)
return status, meta, body, None
except Exception as x:
import traceback
traceback.print_exception(x)
@ -499,138 +684,19 @@ class RequestData:
self.request = request
def handle_gemini_or_titan_request(request_data):
worker = request_data.worker
stream = request_data.stream
data = request_data.buffered_data
from_addr = request_data.from_addr
identity = request_data.identity
request = request_data.request
expected_size = 0
req_token = None
req_mime = None
class RequestParser(threading.Thread):
"""Thread that parses incoming requests from clients."""
if request.startswith('titan:'):
if identity is None and worker.cfg.require_upload_identity():
report_error(stream, 60, "Client certificate required for upload")
return
# Read the rest of the data.
parms = request.split(';')
request = parms[0]
for p in parms:
if p.startswith('size='):
expected_size = int(p[5:])
elif p.startswith('token='):
req_token = p[6:]
elif p.startswith('mime='):
req_mime = p[5:]
worker.log(f'Receiving Titan content: {expected_size}')
max_upload_size = worker.cfg.max_upload_size()
if expected_size > max_upload_size and max_upload_size > 0:
report_error(stream, 59, "Maximum content length exceeded")
return
while len(data) < expected_size:
incoming = safe_recv(stream, 65536)
if len(incoming) == 0:
break
data += incoming
if len(data) != expected_size:
report_error(stream, 59, "Invalid content length")
return
else:
# No Payload in Gemini.
if len(data):
report_error(stream, 59, "Bad request")
return
url = urlparse(request)
path = url.path
if path == '':
path = '/'
hostname = url.hostname
if url.port != None and url.port != worker.port:
report_error(stream, 59, "Invalid port number")
return
if not stream.get_servername():
# Server name indication is required.
report_error(stream, 59, "Missing TLS server name indication")
return
if stream.get_servername().decode() != hostname:
report_error(stream, 53, "Proxy request refused")
return
try:
request = Request(
identity,
remote_address=from_addr,
scheme=url.scheme,
hostname=hostname,
path=path,
query=url.query if '?' in request else None,
content_token=req_token,
content_mime=req_mime,
content=data if len(data) else None
)
response, from_cache = worker.context.call_entrypoint(request)
# Determine status code, meta line, and body content.
if type(response) == tuple:
if len(response) == 2:
status, meta = response
response = ''
else:
status, meta, response = response
else:
status = 20
meta = 'text/gemini; charset=utf-8'
if response == None:
response_data = b''
elif type(response) == str:
response_data = response.encode('utf-8')
else:
response_data = response
safe_sendall(stream, f'{status} {meta}\r\n'.encode('utf-8'))
safe_sendall(stream, response_data)
# Save to cache.
if not from_cache and status == 20 and \
(type(response_data) == bytes or type(response_data) == bytearray):
for cache in worker.context.caches:
if cache.save(hostname + path, meta, response_data):
break
# Close handles.
if hasattr(response_data, 'close'):
response_data.close()
except GeminiError as error:
report_error(stream, error.status, str(error))
return
class Worker(threading.Thread):
"""Thread that handles incoming requests from clients."""
def __init__(self, id, cfg, work_queue, shutdown_event):
#super().__init__(target=Worker._run, args=(self,)) # multiprocessing
def __init__(self, id, context, job_queue):
super().__init__()
self.id = id
self.cfg = cfg
self.port = cfg.port()
self.context = WorkerContext(self.cfg, shutdown_event)
self.context.add_protocol('gemini', handle_gemini_or_titan_request)
self.context.add_protocol('titan', handle_gemini_or_titan_request)
self.context.set_quiet(id > 0)
self.jobs = work_queue
self.context = context
self.cfg = context.cfg
self.port = self.cfg.port()
self.jobs = job_queue
def run(self):
try:
# Extensions are initialized in the worker process.
self.context.load_modules()
self.context.set_quiet(False)
while True:
stream, from_addr = self.jobs.get()
if stream is None:
@ -660,7 +726,7 @@ class Worker(threading.Thread):
def process_request(self, stream, from_addr):
data = bytes()
MAX_LEN = 1024
MAX_LEN = 1024 # TODO: Gemini/Titan limitation only.
MAX_RECV = MAX_LEN + 2 # includes terminator "\r\n"
request = None
incoming = safe_recv(stream, MAX_RECV)
@ -696,32 +762,63 @@ class Worker(threading.Thread):
for scheme, handler in self.context.protocols.items():
if request.startswith(scheme + ':'):
self.log(request)
response_data = handler(RequestData(self,
stream, data, from_addr,
identity, request))
if not response_data is None:
safe_sendall(stream, response_data)
# Close handles.
if hasattr(response_data, 'close'):
response_data.close()
response = handler(RequestData(self, stream, data, from_addr, identity, request))
if not response is None:
safe_sendall(stream, response)
return
report_error(stream, 59, "Unsupported protocol")
_server_instance = None
class ServerRestarter:
def __init__(self, server):
self.server = server
def _restart_workers(signum, frame):
if signum == signal.SIGHUP:
_server_instance.restart_workers()
def __call__(self, signum, frame):
if signum == signal.SIGHUP:
print('--- SIGHUP ---')
self.server.restart_workers()
class RequestHandler(mp.Process):
def __init__(self, id, cfg, job_queue, result_queues):
super().__init__(target=RequestHandler._run, args=(self,))
self.id = id
self.cfg = cfg
self.jobs = job_queue
self.results = result_queues
self.context = None
def _run(self):
self.context = Context(self.cfg)
self.context.load_modules()
# Wait for request processing jobs.
try:
while True:
job_id, request, queue_id = self.jobs.get()
if job_id is None:
break
result_queue = self.results[queue_id]
entrypoint = self.context.find_entrypoint(request.scheme, request.hostname, request.path)
if not entrypoint:
result_queue.put((job_id, Exception("Missing entrypoint: " + request.url())))
continue
try:
response = unpack_response(entrypoint(request))
result_queue.put((job_id, response))
except Exception as error:
result_queue.put((job_id, error))
except KeyboardInterrupt:
pass
class Server:
def __init__(self, cfg):
global _server_instance
assert _server_instance is None
_server_instance = self
#mp.set_start_method('spawn')
mp.set_start_method('spawn')
self.cfg = cfg
self.address = cfg.address()
self.port = cfg.port()
@ -745,25 +842,18 @@ class Server:
self.context.set_session_id(session_id)
# Spawn the worker threads.
self.shutdown_event = threading.Event()
self.workers = []
self.work_queue = queue.Queue()
#self.parser_shutdown_event = threading.Event()
self.parser_queue = queue.Queue()
#self.handler_shutdown_event = mp.Event()
self.handler_queue = mp.Queue()
self.init_parser_context()
self.parsers = []
self.handlers = []
self.create_workers(cfg)
self.sock = None
self.sv_conn = None
def restart_workers(self):
"""
Restarts workers with an updated configuration. The server socket or
TLS configuration are not modified, even if the values have changed
in the configuration file.
"""
self.stop_workers()
self.cfg.reload()
self.create_workers(self.cfg)
self.start_workers()
def run(self):
attempts = 60
print(f'Opening port {self.port}...')
@ -786,7 +876,7 @@ class Server:
self.start_workers()
try:
signal.signal(signal.SIGHUP, _restart_workers)
signal.signal(signal.SIGHUP, ServerRestarter(self))
except ValueError:
print('Restarting with SIGHUP not supported')
@ -795,7 +885,7 @@ class Server:
try:
stream, from_addr = self.sv_conn.accept()
stream._socket.settimeout(10)
self.work_queue.put((stream, from_addr))
self.parser_queue.put((stream, from_addr))
del stream
del from_addr
except KeyboardInterrupt:
@ -816,23 +906,68 @@ class Server:
print('Done')
def init_parser_context(self):
self.handler_results = []
if self.is_using_handler_processes():
for _ in range(max(self.cfg.num_threads(), 1)):
# Handler processes put results in these queues.
self.handler_results.append(mp.Queue())
self.parser_context = Context(self.cfg,
allow_extension_workers=True,
handler_queue=self.handler_queue if self.is_using_handler_processes() else None,
response_queues=self.handler_results if self.is_using_handler_processes() else None)
self.parser_context.set_quiet(False)
self.parser_context.load_modules()
def restart_workers(self):
"""
Restarts workers with an updated configuration. The server socket or
TLS configuration are not modified, even if the values have changed
in the configuration file.
"""
self.stop_workers()
self.cfg.reload()
self.init_parser_context()
self.create_workers(self.cfg)
self.start_workers()
def is_using_handler_processes(self):
return self.cfg.num_processes() > 0
def create_workers(self, cfg):
self.shutdown_event.clear()
for worker_id in range(max(cfg.num_threads(), 1)):
worker = Worker(worker_id, cfg, self.work_queue, self.shutdown_event)
self.workers.append(worker)
for proc_id in range(max(cfg.num_processes(), 0)):
proc = RequestHandler(proc_id, cfg, self.handler_queue, self.handler_results)
self.handlers.append(proc)
for parser_id in range(max(cfg.num_threads(), 1)):
# Threads share one context (note: GIL).
parser = RequestParser(parser_id, self.parser_context, self.parser_queue)
self.parsers.append(parser)
def start_workers(self):
for worker in self.workers:
worker.start()
print(len(self.workers), 'worker(s) started')
for handler in self.handlers:
handler.start()
for parser in self.parsers:
parser.start()
print(len(self.parsers), 'parser(s) and', len(self.handlers), 'handler(s) started')
def stop_workers(self):
self.shutdown_event.set()
n = len(self.workers)
for _ in range(n):
self.work_queue.put((None, None))
for worker in self.workers:
worker.join()
self.workers = []
print(n, 'worker(s) stopped')
self.parser_context.set_shutdown()
# Stop parsers first so all ongoing handler processes get to finish, and no new
# requests can come in.
for _ in range(len(self.parsers)):
self.parser_queue.put((None, None))
for _ in range(len(self.handlers)):
self.handler_queue.put((None, None, None))
for p in self.parsers:
p.join()
for h in self.handlers:
h.join()
print(len(self.parsers), 'parser(s) and', len(self.handlers), 'handler(s) stopped')
self.parsers = []
self.handlers = []
self.parser_context.shutdown_events = []

View File

@ -66,9 +66,9 @@ def serve_file(req):
if not os.path.exists(path):
return 51, "Not found"
# Note: We return the file object so the sender doesn't have to buffer
# Note: We return a Path object so the sender doesn't have to buffer
# the entire file in memory first.
return status, meta, (open(path, 'rb') if status == 20 else None)
return status, meta, (Path(path) if status == 20 else None)
def init(context):