sleepytunny/src/lib.rs

276 lines
10 KiB
Rust

/*
* --------------------
* THIS FILE IS LICENSED UNDER THE FOLLOWING TERMS
*
* this code may not be used for any purpose. be gay, do crime
*
* THE FOLLOWING MESSAGE IS NOT A LICENSE
*
* <barrow@tilde.team> wrote this file.
* by reading this text, you are reading "TRANS RIGHTS".
* this file and the content within it is the gay agenda.
* if we meet some day, and you think this stuff is worth it,
* you can buy me a beer, tea, or something stronger.
* -Ezra Barrow
* --------------------
*/
#![deny(clippy::pedantic)]
#![warn(clippy::needless_pass_by_value, clippy::large_futures)]
#![allow(
clippy::redundant_else,
clippy::wildcard_imports,
clippy::too_many_lines,
clippy::missing_errors_doc
)]
use anyhow::Context;
use std::sync::Arc;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc::error::{SendError, TryRecvError},
};
use tokio_tun::Tun;
pub mod encoder;
use encoder::Encoder;
mod quic;
pub mod config;
use config::{Configuration, TlsKeys};
const CLIENT_DATA: u64 = 0x12;
const SERVER_DATA: u64 = 0x13;
pub async fn client_main(c: Arc<Configuration>, tls: TlsKeys) -> anyhow::Result<()> {
let tun = Tun::builder()
.name("sleepy")
.tap(false)
.packet_info(true)
.address(c.interface.address)
.netmask(c.interface.netmask)
.up()
.try_build()?;
let mut tun = Encoder::new(tun);
let (mut to_tun, mut from_quic) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let (to_quic, mut from_tun) = tokio::sync::mpsc::channel::<Vec<u8>>(8);
let client_loop = tokio::spawn(quic::client_loop(c.clone(), tls, {
let mut currently_sending: Option<(Vec<u8>, usize)> = None;
let mut rx_buffer = [0u8; 65535];
move |conn| {
let from_tun = &mut from_tun;
let to_tun = &mut to_tun;
let currently_sending = &mut currently_sending;
if !conn.is_established() {
return Ok(());
}
for stream_id in conn.readable() {
eprintln!("reading from {stream_id}");
while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut rx_buffer) {
eprintln!("{} recieved {} bytes", conn.trace_id(), read);
let stream_buf = &rx_buffer[..read];
eprintln!(
"{} stream {:x} has {} bytes (fin? {})",
conn.trace_id(),
stream_id,
stream_buf.len(),
fin
);
#[allow(clippy::single_match)]
match stream_id {
SERVER_DATA => {
let vec = Vec::from(stream_buf);
if let Err(SendError(_)) = to_tun.send(vec) {
conn.close(true, 0x0, b"exiting").ok();
return Ok(());
}
}
_ => {}
}
}
}
match conn.stream_writable(CLIENT_DATA, 1350) {
Ok(false) => {
return Ok(());
}
Err(quiche::Error::InvalidStreamState(_)) => {}
Err(e) => return Err(e.into()),
_ => {}
}
loop {
if let Some((buf, mut pos)) = currently_sending.take() {
eprintln!(" retrying packet send");
match conn.stream_send(CLIENT_DATA, &buf[pos..], false) {
Ok(written) => pos += written,
Err(quiche::Error::Done) => break,
Err(e) => {
return Err(e).context("stream send failed");
}
}
eprintln!(" sent {pos} bytes out of {}", buf.len());
if pos < buf.len() {
*currently_sending = Some((buf, pos));
}
} else {
let buf = match from_tun.try_recv() {
Ok(v) => v,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
conn.close(true, 0x0, b"exiting")?;
return Ok(());
}
};
eprintln!("sending packet on stream {CLIENT_DATA:x}");
let written = match conn.stream_send(CLIENT_DATA, &buf, false) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
return Err(e).context("stream send failed");
}
};
eprintln!(" sent {written} bytes out of {}", buf.len());
if written < buf.len() {
*currently_sending = Some((buf, written));
break;
}
}
}
Ok(())
}
}));
let mut buf = [0u8; 65535];
tokio::select! {
res = client_loop => {
res?
}
res = async {
loop {
tokio::select! {
res = tun.read(&mut buf) => {
eprintln!("sending packet");
let len = res?;
let vec = Vec::from(&buf[..len]);
to_quic.send(vec).await?;
},
Some(buf) = from_quic.recv() => {
eprintln!("recieved packet");
tun.write_all(&buf).await?;
},
}
}
} => res
}
}
pub async fn server_main(c: Arc<Configuration>, tun: Tun, tls: TlsKeys) -> anyhow::Result<()> {
let mut tun = Encoder::new(tun);
let (mut to_tun, mut from_quic) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let (to_quic, mut from_tun) = tokio::sync::mpsc::channel::<Vec<u8>>(8);
let server_loop = tokio::spawn(quic::server_loop(c.clone(), tls, {
let mut currently_sending: Option<(Vec<u8>, usize)> = None;
let mut rx_buffer = [0u8; 65535];
move |client| {
let from_tun = &mut from_tun;
let to_tun = &mut to_tun;
let currently_sending = &mut currently_sending;
if !client.conn.is_established() {
return Ok(());
}
for stream_id in client.conn.readable() {
while let Ok((read, fin)) = client.conn.stream_recv(stream_id, &mut rx_buffer) {
eprintln!("{} recieved {} bytes", client.conn.trace_id(), read);
let stream_buf = &rx_buffer[..read];
eprintln!(
"{} stream {:x} has {} bytes (fin? {})",
client.conn.trace_id(),
stream_id,
stream_buf.len(),
fin
);
#[allow(clippy::single_match)]
match stream_id {
CLIENT_DATA => {
let vec = Vec::from(stream_buf);
if let Err(SendError(_)) = to_tun.send(vec) {
client.conn.close(true, 0x0, b"exiting").ok();
return Ok(());
}
}
_ => {}
}
}
}
match client.conn.stream_writable(SERVER_DATA, 1350) {
Ok(false) => {
return Ok(());
}
Err(quiche::Error::InvalidStreamState(_)) => {}
Err(e) => return Err(e.into()),
_ => {}
}
loop {
if let Some((buf, mut pos)) = currently_sending.take() {
eprintln!(" retrying packet send");
match client.conn.stream_send(SERVER_DATA, &buf[pos..], false) {
Ok(written) => pos += written,
Err(quiche::Error::Done) => break,
Err(e) => {
return Err(e).context("stream send failed");
}
}
eprintln!(" sent {pos} bytes out of {}", buf.len());
if pos < buf.len() {
*currently_sending = Some((buf, pos));
}
} else {
let buf = match from_tun.try_recv() {
Ok(v) => v,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
client.conn.close(true, 0x0, b"exiting")?;
return Ok(());
}
};
eprintln!("sending packet on stream {SERVER_DATA:x}");
let written = match client.conn.stream_send(SERVER_DATA, &buf, false) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
return Err(e).context("stream send failed");
}
};
eprintln!(" sent {written} bytes out of {}", buf.len());
if written < buf.len() {
*currently_sending = Some((buf, written));
break;
}
}
}
Ok(())
}
}));
let mut buf = [0u8; 65535];
tokio::select! {
res = server_loop => {
res?
}
res = async {
loop {
tokio::select! {
res = tun.read(&mut buf) => {
eprintln!("sending packet");
let len = res?;
let vec = Vec::from(&buf[..len]);
to_quic.send(vec).await?;
},
Some(buf) = from_quic.recv() => {
eprintln!("recieved packet");
tun.write_all(&buf).await?;
},
}
}
} => res
}
}