Remove multi-threaded code

The server is synchronous and single-threaded for now. The server
uses a RequestProcessor to process requests, which, unlike the
previous `fn(&Request) -> Response` server functions, can hold
state and use that state when processing requests.

In short, the server can now be used for real tasks. Multithreading
will be added back in for a future release, the current single-
-threaded code will not perform well under concurrent loads.
This commit is contained in:
Ben Bridle 2022-08-24 17:42:27 +12:00
parent ab92b5a802
commit 70a63ae002
4 changed files with 44 additions and 126 deletions

View File

@ -2,7 +2,7 @@
name = "wetstring"
version = "1.0.0"
authors = ["Ben Bridle <bridle.benjamin@gmail.com>"]
edition = "2018"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -1,30 +1,30 @@
use crate::{Request, RequestParser};
use crate::RequestParser;
use mio::{net::TcpStream, Token};
use std::net::SocketAddr;
pub struct Connection<Req: Request> {
pub struct Connection<Parser: RequestParser> {
pub stream: TcpStream,
pub state: ConnectionState<Req>,
pub state: ConnectionState<Parser>,
pub client_address: SocketAddr,
pub token: Token,
}
impl<Req: Request> Connection<Req> {
impl<Parser: RequestParser> Connection<Parser> {
pub fn new(stream: TcpStream, client_address: SocketAddr, token: Token) -> Self {
Self {
stream,
client_address,
token,
state: ConnectionState::Incoming(Req::Parser::new(client_address)),
state: ConnectionState::Incoming(Parser::new(client_address)),
}
}
}
pub enum ConnectionState<Req: Request> {
pub enum ConnectionState<Parser: RequestParser> {
/// The request has not yet been fully received.
Incoming(Req::Parser),
Incoming(Parser),
/// The request is valid and needs to be processed.
Processing(Req),
// Processing(Parser::Req),
/// A response has been generated for the request and is waiting to be sent.
Outgoing(Req::Response),
Outgoing(Parser::Res),
}

View File

@ -3,30 +3,26 @@ use std::net::SocketAddr;
/// A trait for an object that can store the state of a partially parsed request.
/// The incoming byte stream can be parsed incrementally, and the request can be converted
/// to an error response if the incoming bytes are invalid.
pub trait RequestParser<Req: Request> {
pub trait RequestParser {
type Req: Request;
type Res: Response;
fn new(client_address: SocketAddr) -> Self;
fn push_bytes(&mut self, bytes: &[u8]);
fn try_parse(&mut self) -> RequestParseResult<Req>;
fn try_parse(&mut self) -> RequestParseResult<Self::Req, Self::Res>;
}
/// A trait for objects that represent a valid network request.
pub trait Request: Sized {
type Response: Response;
type Parser: RequestParser<Self>;
fn process(&self, request_processor: RequestProcessor<Self>) -> Self::Response {
request_processor(&self)
}
}
pub trait Request {}
/// A trait for objects that represent the response to a network request.
pub trait Response {
fn to_bytes(self) -> Vec<u8>;
}
pub enum RequestParseResult<Req: Request> {
pub enum RequestParseResult<Req: Request, Res: Response> {
/// The request is waiting for the client to send more data before the
/// request can either be discarded as erroneous or parsed.
Incomplete,
@ -35,11 +31,12 @@ pub enum RequestParseResult<Req: Request> {
Complete(Req),
/// An error has been encountered in the received data, and a response
/// has been generated to be returned to the client.
Invalid(Req::Response),
Invalid(Res),
}
/// A function that converts a Request into a Response. This one function will
/// contain all of the logic for a server.
pub type RequestProcessor<Req> = fn(request: &Req) -> <Req as Request>::Response;
pub trait RequestProcessor {
type Req: Request;
type Res: Response;
// TODO: pub type InvalidRequestProcessor<Req, Res> = fn(invalid_request: &Req) -> Res;
fn process_request(&self, request: &Self::Req) -> Self::Res;
}

View File

@ -1,37 +1,16 @@
use crate::*;
use crossbeam_deque::{Injector, Steal};
use log::{debug, error, info, warn};
use mio::{event::Event, net::TcpListener, Events, Interest, Poll, Token};
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read as _, Write as _};
use std::marker::Send;
use std::net::SocketAddr;
use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant};
const MINIMUM_POLL_DURATION: Duration = Duration::from_millis(1);
// TODO: Implement sleeping when inactive. If no read or write events have
// taken place in the past duration, increase polling time. Make this
// configurable, because a Gemini server will have different requirements to
// the main Doctrine API. Default to no sleeping.
use std::time::Duration;
/// A server that processes incoming requests and generates responses
pub struct Server<Req: Request> {
/// The maximum number of concurrent inbound TCP connections that this
/// server supports. When the number of concurrent connections equals
/// this number, any new incoming connections will be dropped immediately
/// with no response.
/// TODO: Figure out a more elegant method of preventing overload below
/// this level. Measure current CPU load, perhaps? Or measure median
/// request time and throttle based on that?
/// TODO: Consider checking the size of the connections vector every so
/// often during quiet moments, and shrinking it if possible. This would
/// prevent peak surges of traffic from allocating large amounts of RAM
/// in perpetuity.
max_connections: usize,
pub struct Server<Parser: RequestParser, Proc: RequestProcessor> {
/// Hold all current inbound connections in a fixed-size Vec.
connections: Vec<Option<Connection<Req>>>,
connections: Vec<Option<Connection<Parser>>>,
max_connections: usize,
poll: Poll,
listener: TcpListener,
@ -39,37 +18,21 @@ pub struct Server<Req: Request> {
/// The ID of the next token to be allocated. All tokens up to this value
/// have already been allocated. This is also the size of the connections
/// Vec.
/// TODO: Could this be removed in favour of counting the length of the
/// connections Vec?
next_token_id: usize,
/// A vector containing all tokens that are ready to be reused. These
/// tokens have been allocated, used for a past connection, and then freed.
freed_tokens: Vec<Token>,
/// All request processing threads.
worker_threads: Vec<std::thread::JoinHandle<()>>,
/// A queue of all the requests that are waiting to be sent to a worker
/// thread to be processed into responses. Each worker thread has direct
/// access to this queue.
request_queue: Arc<Injector<Connection<Req>>>,
/// The end of the one-way channel that connects all worker threads to
/// this server.
response_receiver: mpsc::Receiver<Connection<Req>>,
request_processor: Proc,
}
impl<Req> Server<Req>
impl<Parser, Proc> Server<Parser, Proc>
where
Req: Request + 'static + Send,
Req::Response: Send,
Req::Parser: Send,
Parser: RequestParser,
Proc: RequestProcessor<Req = Parser::Req, Res = Parser::Res>,
{
/// Create a new server and create worker threads to process requests.
pub fn new(
server_address: SocketAddr,
max_connections: usize,
worker_count: usize,
request_processor: RequestProcessor<Req>,
) -> Self {
pub fn new(server_address: SocketAddr, request_processor: Proc) -> Self {
let mut listener = match TcpListener::bind(server_address) {
Ok(listener) => listener,
Err(error) => handle_server_bind_error(error, server_address),
@ -83,61 +46,21 @@ where
.register(&mut listener, Token(0), Interest::READABLE)
.unwrap();
// Create a channel to connect worker threads to the main thread, so
// that responses can be collected and returned to each client.
let (response_sender, response_receiver) = mpsc::channel();
let mut worker_threads = Vec::new();
let request_queue: Arc<Injector<Connection<Req>>> = Arc::new(Injector::new());
// Start a number of worker threads, which will be used to process
// requests into responses.
for _ in 0..worker_count {
let request_queue = request_queue.clone();
let response_sender = response_sender.clone();
worker_threads.push(std::thread::spawn(move || loop {
match request_queue.steal() {
Steal::Success(mut connection) => {
let request = match connection.state {
ConnectionState::Processing(ref request) => request,
_ => unreachable!(),
};
let response = request_processor(request);
connection.state = ConnectionState::Outgoing(response);
response_sender.send(connection).unwrap()
}
Steal::Empty | Steal::Retry => (),
}
// TODO: Instead of sleeping for a fixed duration, keep a
// record of how busy the server has been for the past while.
// If the worker threads are mostly idle, sleep for longer.
// If the worker threads are screaming along, don't sleep.
std::thread::sleep(MINIMUM_POLL_DURATION);
}))
}
match worker_count {
1 => info!("{} worker thread has been created", worker_count),
_ => info!("{} worker threads have been created", worker_count),
}
Self {
max_connections,
connections: Vec::new(),
max_connections: 100,
poll,
listener,
next_token_id: 1,
freed_tokens: Vec::new(),
worker_threads,
request_queue,
response_receiver,
request_processor,
}
}
/// Poll for, and handle, incoming connections.
/// Handle read and write events on all current connections
pub fn poll(&mut self) {
let poll_start = Instant::now();
let mut events = Events::with_capacity(1024);
const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1));
self.poll.poll(&mut events, TIMEOUT).unwrap();
@ -154,16 +77,13 @@ where
warn!("Received unreadable and unwritable event")
}
}
while let Ok(connection) = self.response_receiver.try_recv() {
self.set_outgoing_connection(connection)
}
let elapsed = poll_start.elapsed();
if elapsed < MINIMUM_POLL_DURATION {
std::thread::sleep(MINIMUM_POLL_DURATION - elapsed);
}
const MINIMUM_POLL_DURATION: Duration = Duration::from_millis(1);
std::thread::sleep(MINIMUM_POLL_DURATION);
}
fn set_outgoing_connection(&mut self, connection: Connection<Req>) {
/// Change the polling mode of a connection from readable to writable
fn set_outgoing_connection(&mut self, connection: Connection<Parser>) {
let slot = self.connections.get_mut(connection.token.0).unwrap();
*slot = Some(connection);
if let Some(ref mut connection) = slot {
@ -202,8 +122,9 @@ where
RequestParseResult::Complete(request) => {
let mut connection =
std::mem::replace(&mut self.connections[slot_index], None).unwrap();
connection.state = ConnectionState::Processing(request);
self.request_queue.push(connection);
let response = self.request_processor.process_request(&request);
connection.state = ConnectionState::Outgoing(response);
self.set_outgoing_connection(connection)
}
RequestParseResult::Invalid(response) => {
connection.state = ConnectionState::Outgoing(response);