fixed quic timeouts
This commit is contained in:
parent
3aab283aa3
commit
7437a34a55
57
src/lib.rs
57
src/lib.rs
|
@ -38,7 +38,8 @@ mod quic;
|
|||
pub mod config;
|
||||
use config::{Configuration, KeyPair};
|
||||
|
||||
// const STREAM_ID: u64 = 0b000;
|
||||
const CLIENT_DATA: u64 = 0x12;
|
||||
const SERVER_DATA: u64 = 0x13;
|
||||
|
||||
pub async fn client_main(c: Arc<Configuration>, tls: KeyPair) -> anyhow::Result<()> {
|
||||
let tun = Tun::builder()
|
||||
|
@ -68,23 +69,26 @@ pub async fn client_main(c: Arc<Configuration>, tls: KeyPair) -> anyhow::Result<
|
|||
eprintln!("{} recieved {} bytes", conn.trace_id(), read);
|
||||
let stream_buf = &rx_buffer[..read];
|
||||
eprintln!(
|
||||
"{} stream {} has {} bytes (fin? {})",
|
||||
"{} stream {:x} has {} bytes (fin? {})",
|
||||
conn.trace_id(),
|
||||
stream_id,
|
||||
stream_buf.len(),
|
||||
fin
|
||||
);
|
||||
if stream_id != 3 {
|
||||
continue;
|
||||
}
|
||||
let vec = Vec::from(stream_buf);
|
||||
if let Err(SendError(_)) = to_tun.send(vec) {
|
||||
conn.close(true, 0x0, b"exiting")?;
|
||||
return Ok(());
|
||||
#[allow(clippy::single_match)]
|
||||
match stream_id {
|
||||
SERVER_DATA => {
|
||||
let vec = Vec::from(stream_buf);
|
||||
if let Err(SendError(_)) = to_tun.send(vec) {
|
||||
conn.close(true, 0x0, b"exiting").ok();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
match conn.stream_writable(2, 1350) {
|
||||
match conn.stream_writable(CLIENT_DATA, 1350) {
|
||||
Ok(false) => {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -95,7 +99,7 @@ pub async fn client_main(c: Arc<Configuration>, tls: KeyPair) -> anyhow::Result<
|
|||
loop {
|
||||
if let Some((buf, mut pos)) = currently_sending.take() {
|
||||
eprintln!(" retrying packet send");
|
||||
match conn.stream_send(2, &buf[pos..], false) {
|
||||
match conn.stream_send(CLIENT_DATA, &buf[pos..], false) {
|
||||
Ok(written) => pos += written,
|
||||
Err(quiche::Error::Done) => break,
|
||||
Err(e) => {
|
||||
|
@ -115,8 +119,8 @@ pub async fn client_main(c: Arc<Configuration>, tls: KeyPair) -> anyhow::Result<
|
|||
return Ok(());
|
||||
}
|
||||
};
|
||||
eprintln!("sending packet on stream 2");
|
||||
let written = match conn.stream_send(2, &buf, false) {
|
||||
eprintln!("sending packet on stream {CLIENT_DATA:x}");
|
||||
let written = match conn.stream_send(CLIENT_DATA, &buf, false) {
|
||||
Ok(v) => v,
|
||||
Err(quiche::Error::Done) => 0,
|
||||
Err(e) => {
|
||||
|
@ -177,23 +181,26 @@ pub async fn server_main(c: Arc<Configuration>, tun: Tun, tls: KeyPair) -> anyho
|
|||
eprintln!("{} recieved {} bytes", client.conn.trace_id(), read);
|
||||
let stream_buf = &rx_buffer[..read];
|
||||
eprintln!(
|
||||
"{} stream {} has {} bytes (fin? {})",
|
||||
"{} stream {:x} has {} bytes (fin? {})",
|
||||
client.conn.trace_id(),
|
||||
stream_id,
|
||||
stream_buf.len(),
|
||||
fin
|
||||
);
|
||||
if stream_id != 2 {
|
||||
continue;
|
||||
}
|
||||
let vec = Vec::from(stream_buf);
|
||||
if let Err(SendError(_)) = to_tun.send(vec) {
|
||||
client.conn.close(true, 0x0, b"exiting")?;
|
||||
return Ok(());
|
||||
#[allow(clippy::single_match)]
|
||||
match stream_id {
|
||||
CLIENT_DATA => {
|
||||
let vec = Vec::from(stream_buf);
|
||||
if let Err(SendError(_)) = to_tun.send(vec) {
|
||||
client.conn.close(true, 0x0, b"exiting").ok();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
match client.conn.stream_writable(3, 1350) {
|
||||
match client.conn.stream_writable(SERVER_DATA, 1350) {
|
||||
Ok(false) => {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -204,7 +211,7 @@ pub async fn server_main(c: Arc<Configuration>, tun: Tun, tls: KeyPair) -> anyho
|
|||
loop {
|
||||
if let Some((buf, mut pos)) = currently_sending.take() {
|
||||
eprintln!(" retrying packet send");
|
||||
match client.conn.stream_send(3, &buf[pos..], false) {
|
||||
match client.conn.stream_send(SERVER_DATA, &buf[pos..], false) {
|
||||
Ok(written) => pos += written,
|
||||
Err(quiche::Error::Done) => break,
|
||||
Err(e) => {
|
||||
|
@ -224,8 +231,8 @@ pub async fn server_main(c: Arc<Configuration>, tun: Tun, tls: KeyPair) -> anyho
|
|||
return Ok(());
|
||||
}
|
||||
};
|
||||
eprintln!("sending packet on stream 3");
|
||||
let written = match client.conn.stream_send(3, &buf, false) {
|
||||
eprintln!("sending packet on stream {SERVER_DATA:x}");
|
||||
let written = match client.conn.stream_send(SERVER_DATA, &buf, false) {
|
||||
Ok(v) => v,
|
||||
Err(quiche::Error::Done) => 0,
|
||||
Err(e) => {
|
||||
|
|
16
src/quic.rs
16
src/quic.rs
|
@ -100,7 +100,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
|||
|
||||
use anyhow::Context;
|
||||
use ring::rand::*;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::{net::UdpSocket, time::Instant};
|
||||
|
||||
fn configure_server(tls: crate::config::KeyPair) -> anyhow::Result<quiche::Config> {
|
||||
let mut config = quiche::Config::with_boring_ssl_ctx(quiche::PROTOCOL_VERSION, tls.ssl_ctx()?)?;
|
||||
|
@ -170,6 +170,8 @@ struct PartialSend {
|
|||
}
|
||||
pub struct ConnectedClient {
|
||||
pub conn: quiche::Connection,
|
||||
pub ka_last_tx: Instant,
|
||||
pub ka_last_rx: Instant,
|
||||
partial_sends: HashMap<u64, PartialSend>,
|
||||
}
|
||||
impl ConnectedClient {
|
||||
|
@ -177,6 +179,8 @@ impl ConnectedClient {
|
|||
Self {
|
||||
conn,
|
||||
partial_sends: HashMap::new(),
|
||||
ka_last_tx: Instant::now(),
|
||||
ka_last_rx: Instant::now(),
|
||||
}
|
||||
}
|
||||
fn handle_writable(&mut self, stream_id: u64) {
|
||||
|
@ -331,6 +335,11 @@ pub async fn server_loop(
|
|||
eprintln!("{} processed {} bytes", client.conn.trace_id(), read);
|
||||
}
|
||||
for client in client_map.values_mut() {
|
||||
if let Some(i) = client.conn.timeout_instant() {
|
||||
if !i.elapsed().is_zero() {
|
||||
client.conn.on_timeout();
|
||||
}
|
||||
}
|
||||
if client.conn.is_in_early_data() || client.conn.is_established() {
|
||||
for stream_id in client.conn.writable() {
|
||||
client.handle_writable(stream_id);
|
||||
|
@ -424,6 +433,11 @@ pub async fn client_loop(
|
|||
conn.recv(&mut rx_buffer[..len], recv_info)
|
||||
.context("recv failed")?;
|
||||
}
|
||||
if let Some(i) = conn.timeout_instant() {
|
||||
if !i.elapsed().is_zero() {
|
||||
conn.on_timeout();
|
||||
}
|
||||
}
|
||||
if conn.is_closed() {
|
||||
eprintln!("connection closed");
|
||||
return Ok(());
|
||||
|
|
Loading…
Reference in New Issue