From 3f46685100dbd5bdd2f6492b3a90a673a3e09ea7 Mon Sep 17 00:00:00 2001 From: Ben Bridle Date: Sat, 7 Aug 2021 13:59:11 +1200 Subject: [PATCH] Initial commit --- .gitignore | 1 + Cargo.lock | 237 ++++++++++++++++++++++++++++++++++++++ Cargo.toml | 13 +++ src/connection.rs | 27 +++++ src/lib.rs | 8 ++ src/request_response.rs | 19 ++++ src/tcp_server.rs | 247 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 552 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/connection.rs create mode 100644 src/lib.rs create mode 100644 src/request_response.rs create mode 100644 src/tcp_server.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..3fd05ea --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,237 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + +[[package]] +name = "env_logger" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" + +[[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + +[[package]] +name = "mio" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "tcp_server" +version = "0.1.0" +dependencies = [ + "crossbeam-deque", + "env_logger", + "log", + "mio", +] + +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ad62f5a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tcp_server" +version = "0.1.0" +authors = ["Ben Bridle "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +mio = {version = "0.7.13", features = ["os-poll", "net"]} +crossbeam-deque = "0.8.1" +log = "0.4.14" +env_logger = "0.8.4" diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..e589007 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,27 @@ +use crate::{Request, Response}; +use mio::{net::TcpStream, Token}; +use std::net::SocketAddr; + +pub struct Connection { + pub stream: TcpStream, + pub state: RequestState, + pub address: SocketAddr, + pub token: Token, +} + +impl Connection { + pub fn new(stream: TcpStream, address: SocketAddr, token: Token) -> Self { + Self { + stream, + address, + token, + state: RequestState::Incoming(String::new()), + } + } +} + +pub enum RequestState { + Incoming(String), + Processing(Req), + Outgoing(Res), +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8aadef6 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,8 @@ +mod tcp_server; +pub use tcp_server::TcpServer; + +mod connection; +pub(crate) use connection::{Connection, RequestState}; + +mod request_response; +pub use request_response::{ProcessRequest, Request, RequestParseResult, Response}; diff --git a/src/request_response.rs b/src/request_response.rs new file mode 100644 index 0000000..b6eaaf5 --- /dev/null +++ b/src/request_response.rs @@ -0,0 +1,19 @@ +pub trait Request { + type Response; + fn from_string(request_string: &str) -> RequestParseResult + where + Self: Sized, + Self::Response: Response; +} + +pub trait Response { + fn to_string(self) -> String; +} + +pub type ProcessRequest = fn(request: &Req) -> Res; + +pub enum RequestParseResult { + Incomplete, + Complete(Req), + Invalid(Res), +} diff --git a/src/tcp_server.rs b/src/tcp_server.rs new file mode 100644 index 0000000..7cc0128 --- /dev/null +++ b/src/tcp_server.rs @@ -0,0 +1,247 @@ +use crate::*; +use crossbeam_deque::{Injector, Steal}; +use log::{error, info}; +use mio::{event::Event, net::TcpListener, Events, Interest, Poll, Token}; +use std::io::{Read as _, Write as _}; +use std::net::SocketAddr; +use std::sync::{mpsc, Arc}; +use std::thread; +use std::time::Duration; + +const MINIMUM_POLL_DURATION: Duration = Duration::from_millis(1); + +// TODO: Implement sleeping when inactive. If no read or write events have +// taken place in the past duration, increase polling time. Make this +// configurable, because a Gemini server will have different requirements to +// the main Doctrine API. Default to no sleeping. +pub struct TcpServer { + max_connections: usize, + connections: Vec>>, + poll: Poll, + listener: TcpListener, + + next_token_value: usize, + freed_tokens: Vec, + + worker_threads: Vec>, + request_queue: Arc>>, + response_receiver: mpsc::Receiver>, +} + +impl< + Req: 'static + Request + std::marker::Send + request_response::Request, + Res: 'static + Response + std::marker::Send, + > TcpServer +{ + pub fn new( + address: SocketAddr, + max_connections: usize, + worker_count: usize, + process_request: ProcessRequest, + ) -> Self { + let mut listener = TcpListener::bind(address).unwrap(); + info!("Server is listening at address {}", address); + let poll = Poll::new().unwrap(); + poll.registry() + .register(&mut listener, Token(0), Interest::READABLE) + .unwrap(); + + let (response_sender, response_receiver) = mpsc::channel(); + + let mut new_server = Self { + max_connections, + connections: Vec::new(), + poll, + listener, + + next_token_value: 1, + freed_tokens: Vec::new(), + + worker_threads: Vec::new(), + request_queue: Arc::new(Injector::new()), + response_receiver, + }; + + // Start the worker threads + for _ in 0..worker_count { + let request_queue = new_server.request_queue.clone(); + let response_sender = response_sender.clone(); + new_server.worker_threads.push(thread::spawn(move || loop { + match request_queue.steal() { + Steal::Success(mut connection) => { + let request = match connection.state { + RequestState::Processing(ref request) => request, + _ => unreachable!(), + }; + let response = process_request(request); + connection.state = RequestState::Outgoing(response); + response_sender.send(connection).unwrap() + } + Steal::Empty => (), + Steal::Retry => (), + } + std::thread::sleep(MINIMUM_POLL_DURATION); + })) + } + match worker_count { + 1 => info!("{} worker thread has been created", worker_count), + _ => info!("{} worker threads have been created", worker_count), + } + return new_server; + } + + pub fn poll(&mut self) { + let poll_start = std::time::Instant::now(); + let mut events = Events::with_capacity(1024); + const TIMEOUT: Option = Some(Duration::from_millis(1)); + self.poll.poll(&mut events, TIMEOUT).unwrap(); + for event in &events { + if event.is_readable() { + if event.token() == Token(0) { + self.accept_new_connections(); + } else { + self.process_read_event(event); + } + } else if event.is_writable() { + self.process_write_event(event); + } else { + info!("Received unreadable and unwritable event") + } + } + loop { + match self.response_receiver.try_recv() { + Ok(connection) => self.set_outgoing_connection(connection), + Err(_) => break, + } + } + let elapsed = poll_start.elapsed(); + if elapsed < MINIMUM_POLL_DURATION { + std::thread::sleep(MINIMUM_POLL_DURATION - elapsed); + } + } + + fn set_outgoing_connection(&mut self, connection: Connection) { + let slot = self.connections.get_mut(connection.token.0).unwrap(); + *slot = Some(connection); + if let Some(ref mut connection) = slot { + self.poll + .registry() + .reregister(&mut connection.stream, connection.token, Interest::WRITABLE) + .unwrap(); + } else { + unreachable!() + }; + } + + fn process_read_event(&mut self, event: &Event) { + let token = event.token(); + let connection = self.connections[token.0].as_mut().unwrap(); + if let RequestState::Incoming(ref mut request_data) = connection.state { + loop { + let mut buffer = [0 as u8; 1024]; + match connection.stream.read(&mut buffer) { + Ok(0) => { + self.remove_connection(event.token()).unwrap(); + return; + } + Ok(_) => match std::str::from_utf8(&buffer) { + Ok(s) => request_data.push_str(s.trim_matches(char::from(0))), + Err(e) => panic!( + "Incoming data from {:?} is not valid UTF-8: {}", + connection.address, e + ), + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Unexpected error: {}", e), + }; + } + + match Req::from_string(&request_data) { + RequestParseResult::Complete(request) => { + let mut connection = + std::mem::replace(&mut self.connections[token.0], None).unwrap(); + connection.state = RequestState::Processing(request); + self.request_queue.push(connection); + } + RequestParseResult::Invalid(response) => { + connection.state = RequestState::Outgoing(response); + let connection = + std::mem::replace(&mut self.connections[token.0], None).unwrap(); + self.set_outgoing_connection(connection); + } + RequestParseResult::Incomplete => (), + }; + } else { + info!("Received read event for non-incoming connection") + } + } + + fn process_write_event(&mut self, event: &Event) { + let token = event.token(); + let mut connection = std::mem::replace(&mut self.connections[token.0], None).unwrap(); + if let RequestState::Outgoing(response) = connection.state { + let response_string = response.to_string(); + let bytes = response_string.as_bytes(); + connection.stream.write_all(bytes).unwrap(); + } else { + info!("Received write event for non-outgoing connection") + } + self.remove_connection(connection.token).unwrap(); + info!( + "Closed connection from {} (token {})", + connection.address, connection.token.0 + ); + } + + fn accept_new_connections(&mut self) { + loop { + match self.listener.accept() { + Ok((stream, address)) => { + // Get an unused token + let token = if let Some(token) = self.freed_tokens.pop() { + token + } else if self.next_token_value < self.max_connections { + let token_value = self.next_token_value; + self.next_token_value += 1; + Token(token_value) + } else { + error!("Capacity reached, dropping connection from {}", address); + continue; + }; + + // Initialise the connection vec up to this point + if self.connections.len() <= token.0 { + let difference = token.0 - self.connections.len() + 1; + (0..difference).for_each(|_| self.connections.push(None)); + } + + // Create the connection object and register it as Readable + let slot = self.connections.get_mut(token.0).unwrap(); + *slot = Some(Connection::new(stream, address, token)); + if let Some(ref mut connection) = slot { + self.poll + .registry() + .register(&mut connection.stream, token, Interest::READABLE) + .unwrap(); + } else { + unreachable!() + }; + + info!( + "Accepted incoming connection from {} (token {})", + address, token.0 + ); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Unexpected error while accepting a connection: {}", e), + } + } + } + + fn remove_connection(&mut self, token: Token) -> Result<(), ()> { + let slot = self.connections.get_mut(token.0).ok_or(())?; + *slot = None; + self.freed_tokens.push(token); + Ok(()) + } +}