sleepytunny/src/quic.rs

516 lines
19 KiB
Rust

/*
* --------------------
* THIS FILE IS LICENSED UNDER THE FOLLOWING TERMS
*
* this code may not be used for any purpose. be gay, do crime
*
* THE FOLLOWING MESSAGE IS NOT A LICENSE
*
* <barrow@tilde.team> wrote this file.
* by reading this text, you are reading "TRANS RIGHTS".
* this file and the content within it is the gay agenda.
* if we meet some day, and you think this stuff is worth it,
* you can buy me a beer, tea, or something stronger.
* -Ezra Barrow
* --------------------
*/
/*
let sock: net::UdpSocket;
let conn: quiche::Connection;
let clients: Map<ConnId, Connection>;
loop {
'read: loop {
let rx_buffer;
let (len, from) = socket.recv_from(&mut rx_buffer)?;
let header = quiche::Header::from_slice(&mut rx_buffer[..len])?;
let conn_id = generate(header);
let client = if !clients.contains(conn_id) {
let conn = quiche::accept();
// send conn to accept queue?
conn
} else {
clients.get(conn_id)
}
let recv_info = quiche::RecvInfo {
to: socket.local_addr(),
from,
}
conn.recv(&mut rx_buffer[..len]?;
let len = conn.recv(&mut buf[..len], recv_info)?;
}
'write: loop {
let tx_buffer;
let (len, send_info) = conn.send(&mut tx_buffer)?;
socket.send_to(&tx_buffer[..len], send_info.to)?;
}
}
let data_len = conn.stream_send(stream_id, b"data", some_boolean_idk)?;
let mut tx_buffer = [u8; 1350];
let (tx_len, send_info) = conn.send(&mut tx_buffer)?;
sock.send_to(&tx_buffer[..tx_len], &send_info.to)?;
*/
/*
let sock: net::UdpSocket;
let conn: quiche::Connection;
loop {
'read: loop {
let rx_buffer;
let (len, from) = socket.recv_from(&mut rx_buffer)?;
let recv_info = quiche::RecvInfo {
to: socket.local_addr(),
from,
}
let len = conn.recv(&mut buf[..len], recv_info)?;
}
'write: loop {
let tx_buffer;
let (len, send_info) = conn.send(&mut tx_buffer)?;
socket.send_to(&tx_buffer[..len], send_info.to)?;
}
}
let data_len = conn.stream_send(stream_id, b"data", some_boolean_idk)?;
let mut tx_buffer = [u8; 1350];
let (tx_len, send_info) = conn.send(&mut tx_buffer)?;
sock.send_to(&tx_buffer[..tx_len], &send_info.to)?;
*/
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use anyhow::Context;
use boring::x509::X509;
use ring::rand::*;
use tokio::net::UdpSocket;
fn configure_server(tls: crate::config::TlsKeys) -> anyhow::Result<quiche::Config> {
let mut config =
quiche::Config::with_boring_ssl_ctx_builder(quiche::PROTOCOL_VERSION, tls.into_ssl_ctx()?)?;
config.set_application_protos(&[b"sleepytunny"])?; //change this to h3 eventually
config.set_max_idle_timeout(5000);
config.set_max_recv_udp_payload_size(1350);
config.set_max_send_udp_payload_size(1350);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(100);
config.set_initial_max_data(10_000_000);
config.set_initial_max_stream_data_bidi_local(1_000_000);
config.set_initial_max_stream_data_bidi_remote(1_000_000);
config.set_initial_max_stream_data_uni(1_000_000);
config.set_disable_active_migration(true);
config.verify_peer(true);
Ok(config)
}
fn configure_client(tls: crate::config::TlsKeys) -> anyhow::Result<quiche::Config> {
let mut config =
quiche::Config::with_boring_ssl_ctx_builder(quiche::PROTOCOL_VERSION, tls.into_ssl_ctx()?)?;
config.set_application_protos(&[b"sleepytunny"])?; //change this to h3 eventually
config.set_max_idle_timeout(5000);
config.set_max_recv_udp_payload_size(1350);
config.set_max_send_udp_payload_size(1350);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(100);
config.set_initial_max_data(10_000_000);
config.set_initial_max_stream_data_bidi_local(1_000_000);
config.set_initial_max_stream_data_bidi_remote(1_000_000);
config.set_initial_max_stream_data_uni(1_000_000);
config.set_disable_active_migration(true);
config.verify_peer(true);
Ok(config)
}
fn mint_token(header: &quiche::Header, from: &SocketAddr) -> Vec<u8> {
let mut token = Vec::with_capacity(6 + 16 + header.dcid.len());
token.extend_from_slice(b"quiche");
let addr = match from.ip() {
std::net::IpAddr::V4(a) => a.octets().to_vec(),
std::net::IpAddr::V6(a) => a.octets().to_vec(),
};
token.extend_from_slice(&addr);
token.extend_from_slice(&header.dcid);
token
}
fn validate_token<'a>(src: &SocketAddr, token: &'a [u8]) -> Option<quiche::ConnectionId<'a>> {
if token.len() < 6 {
return None;
}
if &token[..6] != b"quiche" {
return None;
}
let token = &token[6..];
let addr = match src.ip() {
std::net::IpAddr::V4(a) => a.octets().to_vec(),
std::net::IpAddr::V6(a) => a.octets().to_vec(),
};
if token.len() < addr.len() || token[..addr.len()] != addr[..] {
return None;
}
Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
}
struct PartialSend {
body: Vec<u8>,
written: usize,
}
pub struct ConnectedClient {
pub conn: quiche::Connection,
partial_sends: HashMap<u64, PartialSend>,
debug_established: bool,
}
impl ConnectedClient {
fn new(conn: quiche::Connection) -> Self {
Self {
conn,
partial_sends: HashMap::new(),
debug_established: false,
}
}
fn debug_established(&mut self) -> anyhow::Result<()> {
if !self.debug_established && self.conn.is_established() {
self.debug_established = true;
eprintln!("connection established! debug info:");
eprintln!(" cert chain:");
let chain = self.conn.peer_cert_chain().context("no peer cert chain")?;
for (i, cert) in chain.into_iter().enumerate().rev() {
eprintln!(" cert {i}:");
let cert = X509::from_der(cert).context("failed to deserialize der")?;
let subject_name = cert
.subject_name()
.entries()
.map(|entry| entry.data().as_utf8().unwrap().to_string())
.collect::<Vec<_>>()
.join("///");
eprintln!(" subject name: {subject_name}");
}
}
Ok(())
}
fn handle_writable(&mut self, stream_id: u64) {
let conn = &mut self.conn;
// eprintln!("{} stream {} is writable", conn.trace_id(), stream_id);
let Some(resp) = self.partial_sends.get_mut(&stream_id) else {
return
};
let body = &resp.body[resp.written..];
let written = match conn.stream_send(stream_id, body, false) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
self.partial_sends.remove(&stream_id);
eprintln!("{} stream send failed {:?}", conn.trace_id(), e);
return;
}
};
resp.written += written;
if resp.written == resp.body.len() {
self.partial_sends.remove(&stream_id);
}
}
}
pub async fn server_loop(
app_config: Arc<crate::config::Configuration>,
tls: crate::config::TlsKeys,
mut handle_incoming: impl FnMut(&mut ConnectedClient) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let local_addr = app_config.server()?.endpoint()?;
let socket = UdpSocket::bind(&local_addr).await?;
let mut config = configure_server(tls)?;
let conn_id_seed =
ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &SystemRandom::new()).unwrap();
let mut client_map: HashMap<quiche::ConnectionId<'static>, ConnectedClient> = HashMap::new();
let mut rx_buffer = [0u8; 65535];
let mut tx_buffer = [0u8; 1350];
loop {
'read: loop {
let (len, from) = match socket.try_recv_from(&mut rx_buffer) {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
break 'read;
} else {
return Err(e).context("try_recv_from failed");
}
}
};
let pkt_buf = &mut rx_buffer[..len];
let header = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) {
Ok(v) => v,
Err(e) => {
eprintln!("parsing packet header failed: {e:?}");
continue 'read;
}
};
eprintln!("got packet {header:?}");
let conn_id = ring::hmac::sign(&conn_id_seed, &header.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id = conn_id.to_vec().into();
let client = if !client_map.contains_key(&header.dcid)
&& !client_map.contains_key(&conn_id)
{
if header.ty != quiche::Type::Initial {
eprintln!("packet is not initial");
continue 'read;
}
if !quiche::version_is_supported(header.version) {
eprintln!("doing version negotiation");
let len = quiche::negotiate_version(&header.scid, &header.dcid, &mut tx_buffer)
.context("making version negotiation packet failed")?;
let out = &tx_buffer[..len];
match socket.try_send_to(out, from) {
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
eprintln!("version negotiation send_to() would block");
break 'read; // this break wasnt labeled in the example
}
res => res.context("version negotiation send_to failed")?,
};
continue 'read;
}
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
scid.copy_from_slice(&conn_id);
let scid = quiche::ConnectionId::from_ref(&scid);
let token = header
.token
.as_ref()
.expect("token is always present in initial packets");
if token.is_empty() {
eprintln!("client didnt send token, doing stateless retry");
let new_token = mint_token(&header, &from);
let len = quiche::retry(
&header.scid,
&header.dcid,
&scid,
&new_token,
header.version,
&mut tx_buffer,
)
.context("makin retry packet failed")?;
let out = &tx_buffer[..len];
match socket.try_send_to(out, from) {
Ok(l) => {
eprintln!("wrote {l} bytes of the {len} byte retry packet");
continue 'read;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
eprintln!("retry send_to() would block");
break 'read; // this break wasnt labeled in the example
}
Err(e) => return Err(e).context("version negotiation send_to failed"),
};
} else {
let odcid = validate_token(&from, token);
// The token was not valid, meaning the retry failed,
// so drop the packet
if odcid.is_none() {
eprintln!("Invalid address validation token");
continue 'read;
};
if scid.len() != header.dcid.len() {
eprintln!("Invalid destination connection ID");
continue 'read;
}
// Reuse the source connection id we sent in the retry packet
// instead of changing it again
let scid = header.dcid.clone();
eprintln!("New connection: dcid={:?} scid={:?}", header.dcid, scid);
let conn = quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)
.context("accepting connection failed")?;
client_map.insert(scid.clone(), ConnectedClient::new(conn));
client_map.get_mut(&scid).unwrap()
}
} else {
match client_map.get_mut(&header.dcid) {
Some(v) => v,
None => client_map.get_mut(&conn_id).unwrap(),
}
};
client.debug_established()?;
let recv_info = quiche::RecvInfo {
to: local_addr,
from,
};
let read = match client.conn.recv(pkt_buf, recv_info) {
Ok(v) => v,
Err(e) => {
eprintln!("{} recv failed: {:?}", client.conn.trace_id(), e);
continue 'read;
}
};
eprintln!("{} processed {} bytes", client.conn.trace_id(), read);
}
for client in client_map.values_mut() {
if let Some(i) = client.conn.timeout_instant() {
if !i.elapsed().is_zero() {
client.conn.on_timeout();
}
}
if client.conn.is_in_early_data() || client.conn.is_established() {
for stream_id in client.conn.writable() {
client.handle_writable(stream_id);
}
handle_incoming(client)?;
}
}
for client in client_map.values_mut() {
'write: loop {
let (write, send_info) = match client.conn.send(&mut tx_buffer) {
Ok(v) => v,
Err(quiche::Error::Done) => {
// eprintln!("{} done writing", client.conn.trace_id());
break 'write;
}
Err(e) => {
eprintln!("{} send failed: {:?}", client.conn.trace_id(), e);
client.conn.close(false, 0x1, b"fail").ok();
break 'write;
}
};
match socket.send_to(&tx_buffer[..write], send_info.to).await {
Ok(written) => eprintln!("sent {written} bytes of {write} to {}", send_info.to),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
eprintln!("send_to() would block");
break 'write;
}
Err(e) => return Err(e).context("send_to() failed"),
};
}
}
// Garbage collect closed connections
client_map.retain(|_, ref mut client| {
if client.conn.is_closed() {
eprintln!(
"{} connection garbage collected {:?}",
client.conn.trace_id(),
client.conn.stats()
);
}
!client.conn.is_closed()
});
}
}
pub async fn client_loop(
app_config: Arc<crate::config::Configuration>,
tls: crate::config::TlsKeys,
mut f: impl FnMut(&mut quiche::Connection) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let mut qconfig = configure_client(tls)?;
let socket = UdpSocket::bind(app_config.client()?.local_bind_addr()?).await?;
let mut scid = [0u8; quiche::MAX_CONN_ID_LEN];
SystemRandom::new().fill(&mut scid[..]).unwrap();
let scid = quiche::ConnectionId::from_ref(&scid);
let local_addr = socket.local_addr().unwrap();
let mut rx_buffer = [0u8; 65535];
let mut tx_buffer = [0u8; 1350];
let mut conn = quiche::connect(
app_config.client()?.server_name()?.map(String::as_str),
&scid,
local_addr,
app_config.client()?.endpoint()?,
&mut qconfig,
)?;
let (write, send_info) = conn
.send(&mut tx_buffer[..])
.context("initial send failed")?;
let written = socket.send_to(&tx_buffer[..write], send_info.to).await?;
eprintln!("wrote {written} bytes of {write} byte initiation");
// tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let mut debug_established = false;
loop {
'read: loop {
let (len, from) = match socket.try_recv_from(&mut rx_buffer) {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
break 'read;
} else {
return Err(e).context("try_recv_from failed");
}
}
};
let recv_info = quiche::RecvInfo {
to: local_addr,
from,
};
eprintln!("recieving {len} bytes from {from}");
conn.recv(&mut rx_buffer[..len], recv_info)
.context("recv failed")?;
}
if conn.is_closed() {
eprintln!("connection closed");
return Ok(());
} else if conn.is_established() {
if !debug_established {
debug_established = true;
eprintln!("connection established! debug info:");
eprintln!(" cert chain:");
let chain = conn.peer_cert_chain().context("no peer cert chain")?;
for (i, cert) in chain.into_iter().enumerate().rev() {
eprintln!(" cert {i}:");
let cert = X509::from_der(cert).context("failed to deserialize der")?;
let subject_name = cert
.subject_name()
.entries()
.map(|entry| entry.data().as_utf8().unwrap().to_string())
.collect::<Vec<_>>()
.join("///");
for gen_name in cert
.subject_alt_names()
.map(IntoIterator::into_iter)
.into_iter()
.flatten()
{
eprintln!(" gen_name: {:?}", gen_name.dnsname());
}
eprintln!(" subject name: {}", subject_name);
}
}
f(&mut conn)?;
}
'write: loop {
let (write, send_info) = match conn.send(&mut tx_buffer) {
Ok(v) => v,
Err(quiche::Error::Done) => {
// eprint!("{} done writing\r", conn.trace_id());
break 'write;
}
Err(e) => {
conn.close(false, 0x1, b"fail").ok();
return Err(e).context("send failed");
}
};
eprintln!("sending {write} bytes");
let v = socket
.send_to(&tx_buffer[..write], send_info.to)
.await
.context("send_to failed")?;
eprintln!("sent {v} bytes of {write}");
}
if let Some(i) = conn.timeout_instant() {
if !i.elapsed().is_zero() {
conn.on_timeout();
}
}
}
}