overhauled everything

This commit is contained in:
Ellie D 2019-06-01 01:06:27 -05:00
parent e53dadf3f5
commit 4e6ab0d40a
2 changed files with 176 additions and 58 deletions

View File

@ -7,24 +7,23 @@ Sunbeam is a simple and fast multi-endpoint TCP relay for
for um
we swear we had a perfectly reasonable use in mind, but we honestly can't remember it now.
Basically it just lets clients connect from anywhere, takes data from them, and redistributes
it to all the other connected clients. It's written with an asynchronous structure and
nonblocking I/O, meaning it should be pretty fast, ideally fast enough to handle audio streams
and stuff.
Run at the command line, Sunbeam takes any number of local ports (numbers between 1 and 65535)
and remote addresses (strings like [ip address]:[port]). For each address argument passed,
Sunbeam attempts to connect to that address, and for each port argument, it listens for incoming
connections on that port locally. Whenever it receives data over any connection, that data is
promptly retransmitted to all other connections, save for two exceptions:
- Data is not passed between clients that connect to Sunbeam on the same port.
- Data is not passed between servers that Sunbeam connects to.
For example, Sunbeam can be used as a relay for an audio stream by giving it the address of the
stream server and a local port. Clients connecting to the local port will receive the stream data,
but if one client sends data back to Sunbeam, it will not interfere with other clients' streams
(it will, however, be relayed back to the source).
This behavior can be overridden with "loopback mode" in which Sunbeam always relays data to all
clients and servers except the one which originated the data. Sunbeam automatically goes into
loopback mode if no ports are specified, or if only one port and no addresses are specified. You
can also pass the `-l` flag to require Sunbeam to run in loopback mode.
We originally envisioned this as a similar (and similarly-cursed) project to Epistlebox, which
would play the role of an online chat platform to complement the other's terrible imitation of
email. You can indeed use it for that purpose, e.g. by running
`nc [server address] 55555`
on each of the clients, typing messages, and pressing enter.
There is no way to tell which client a message originated from, so if you're planning on using
this for a group chat (a very bad idea, to be sure) then you'll have to constantly remind each
other who you are with each message.
If you use it for audio with multiple sources, then the streams will interleave, and you'll end
up with garbage. We don't really recommend doing that.
This program is called "sunbeam" because of a particular plot device that we thought was clever
This program is called "Sunbeam" because of a particular plot device that we thought was clever
in the sci-fi novel *The Three Body Problem* by Cixin Liu. A character discovers that, due to a
particular plasma-driven mechanism inside the sun, it's possible to use the sun as a gigantic
radio amplifier at certain frequencies - all you have to do is launch a radio beam into the

View File

@ -1,78 +1,197 @@
use std::collections::VecDeque;
use std::env::args;
use std::io;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream,SocketAddr};
use std::process::exit;
use std::collections::VecDeque;
use std::thread::sleep;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
static PORT:u16 = 55555;
static IDLE_THRESH:usize = 1024;
static IDLE_THRESH:usize = 65536;
static BUFFERSIZE_INITIAL:usize = 511;
static LOOPBACK_FLAG:&str = "-l";
fn main() {
let argv = args().collect::<Vec<String>>();
if argv.len() < 2 {
eprintln!("-!- usage: sunbeam [local port]... [remote resource address]:[remote port]...");
exit(1);
}
let mut loopbackmode:bool = false;
let mut addresses:Vec<SocketAddr> = Vec::new();
let mut ports:Vec<u16> = Vec::new();
for arg in argv[1..].iter() {
if arg == LOOPBACK_FLAG {
loopbackmode = true;
} else {
match arg.parse() {
Err(_why) => {
match arg.parse() {
Err(_why) => {
eprintln!("-!- could not parse `{}` as a socket address or port number",arg);
},
Ok(address) => addresses.push(address),
};
},
Ok(n) => ports.push(n),
};
}
}
loopbackmode |= (addresses.len() == 0 && ports.len() == 1) || (addresses.len() > 0 && ports.len() == 0);
if loopbackmode {
println!("-i- running in loopback mode (relaying data between all connections)");
}
'recovery:loop {
let listener = match TcpListener::bind(&format!("[::]:{}",PORT)) {
let mut connectors:VecDeque<(u16,TcpListener,VecDeque<(TcpStream,SocketAddr)>)> = VecDeque::new();
for port in ports.iter() {
match TcpListener::bind(&format!("[::]:{}",port)) {
Err(why) => {
eprintln!("failed to bind TCP listener: {}",why);
eprintln!("-!- failed to bind TCP listener: {}",why);
exit(1);
},
Ok(l) => l,
Ok(listener) => {
listener.set_nonblocking(true).expect("cannot set listener to nonblocking");
connectors.push_back((*port,listener,VecDeque::new()));
},
};
listener.set_nonblocking(true).expect("cannot set listener to nonblocking");
let mut connections:VecDeque<(TcpStream,SocketAddr)> = VecDeque::new();
let mut bottle:[u8;512] = [0x00;512];
let mut idlecycles:usize = 0;
}
'processor:loop {
let (maintx,mainrx) = mpsc::channel();
let mut idlecycles:usize = 0;
let mut bottle:Vec<u8> = Vec::with_capacity(BUFFERSIZE_INITIAL);
let mut sources:VecDeque<(TcpStream,SocketAddr)> = VecDeque::new();
let mut brokensources:VecDeque<SocketAddr> = VecDeque::new();
for address in addresses.iter() {
match TcpStream::connect(&address) {
Err(why) => {
eprintln!("-!- failed to connect to remote resource ({}): {}. trying to reconnect...",address,why);
brokensources.push_back(address.clone());
},
Ok(stream) => {
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
sources.push_back((stream,address.clone()));
},
};
}
loop {
if idlecycles < IDLE_THRESH {
idlecycles += 1;
}
bottle.clear();
if let Some((mut source,address)) = sources.pop_front() {
match source.read_to_end(&mut bottle) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => {
if loopbackmode {
for (othersource,_otheraddress) in sources.iter_mut() {
let _ = othersource.write_all(&bottle);
}
}
sources.push_back((source,address));
},
_ => {
eprintln!("-!- failed to receive data from remote resource ({}): {}. trying to reconnect...",address,why);
brokensources.push_back(address);
},
},
Ok(_) => {
eprintln!("-!- remote resource closed the connection. trying to reconnect...");
brokensources.push_back(address);
},
};
if bottle.len() > 0 {
idlecycles = 0;
}
for (_port,_listener,otherconnects) in connectors.iter_mut() {
for (otherstream,_otheraddress) in otherconnects.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
}
}
bottle.clear();
if let Some((port,listener,mut connections)) = connectors.pop_front() {
match listener.accept() {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => (),
_ => {
eprintln!("failed to accept new connection: {}",why);
break 'processor;
eprintln!("-!- failed to accept new connection: {}",why);
},
},
Ok((stream,address)) => {
println!("+ connection opened by {}",address);
println!("--> connection opened by {} on port {}",address,port);
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
connections.push_back((stream,address));
},
};
match connections.pop_front() {
None => sleep(Duration::from_millis(1)),
Some((mut stream,address)) => match stream.read(&mut bottle) {
if let Some((mut stream,address)) = connections.pop_front() {
match stream.read_to_end(&mut bottle) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => connections.push_back((stream,address)),
_ => println!("- connection to {} failed: read error: {}",address,why),
},
Ok(n) => match n {
0 => {
println!("- connection to {} closed: read reached end of stream",address);
},
n => {
for (otherstream,_otheraddress) in connections.iter_mut() {
let _ = otherstream.write_all(&bottle[..n]);
io::ErrorKind::WouldBlock => {
if loopbackmode {
for (otherstream,_otheraddress) in connections.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
}
connections.push_back((stream,address));
idlecycles = 0;
},
_ => {
println!("<-- closing connection to {} on port {}: {}",address,port,why);
},
},
},
};
if idlecycles > IDLE_THRESH {
sleep(Duration::from_millis(1));
Ok(_) => {
println!("<-- closing connection to {} on port {}: end of stream",address,port);
},
};
if bottle.len() > 0 {
idlecycles = 0;
}
for (_port,_listener,otherconnects) in connectors.iter_mut() {
for (otherstream,_otheraddress) in otherconnects.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
}
for (otherstream,_otheraddress) in sources.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
}
connectors.push_back((port,listener,connections));
}
} // 'recovery
if let Some(threadaddress) = brokensources.pop_front() {
let threadtx = mpsc::Sender::clone(&maintx);
thread::spawn(move || {
loop {
match TcpStream::connect(&threadaddress) {
Err(why) => {
eprintln!("-!- failed to reconnect to remote resource ({}): {}. trying to reconnect...",threadaddress,why);
thread::sleep(Duration::from_millis(1000));
},
Ok(stream) => {
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
println!("-i- connection to remote resource at {} reestablished",threadaddress);
let _ = threadtx.send((stream,threadaddress));
break;
},
};
}
});
}
if let Ok(reconnection) = mainrx.try_recv() {
sources.push_back(reconnection);
}
if idlecycles >= IDLE_THRESH {
thread::sleep(Duration::from_millis(1));
}
}
}