sunbeam/sunbeam.rs

621 lines
18 KiB
Rust

/*
Sunbeam Network Relay v0.3
Elizabeth Evelene Amelia Diode
June 2019
*/
use std::collections::{VecDeque,HashSet,HashMap};
use std::env::args;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream,UdpSocket,SocketAddr,IpAddr};
use std::process::exit;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration,Instant};
static IDLE_THRESH:u64 = 256; // cycles of doing nothing before throttling back to slightly slower execution speed
static IDLE_SCALING:u64 = 1; // after crossing IDLE_THRESH, increase cycle delay by this many microseconds per cycle
static IDLE_MAX:u64 = 65535; // maximum number of idle cycles to count up before stopping to count
// IDLE_MAX*IDLE_SCALING-IDLE_THRESH = maximum cycle delay in microseconds
static RECONNECT_WAIT:u64 = 1000; // time in ms to wait between reconnect attempts
static BLOCKFILE_NAME:&str = ".nosunbeam"; // file to check for as the client blacklist
static ALLOWFILE_NAME:&str = ".yesunbeam"; // file to check for as the client whitelist, for Whitelist-mode connectors
static PORT_PREFIXES:[char;2] = ['u','t'];
static PORT_SUFFIXES:[char;2] = ['p','w'];
static HEARTBEAT_WAIT:u64 = 6000;
static HEARTBEAT_DROP:u64 = 60000;
#[derive(PartialEq)]
#[derive(Clone)]
enum PortPrivacy {
Public,
Whitelist,
Private,
}
#[derive(PartialEq)]
enum PortProtocol {
Udp,
Tcp,
Both,
}
enum Connector {
Host {
tcp:Option<TcpHost>,
udp:Option<UdpHost>,
},
Guest {
address:SocketAddr,
tcp:Option<TcpGuest>,
udp:Option<UdpGuest>,
},
}
struct TcpHost {
port:u16,
privacy:PortPrivacy,
listener:TcpListener,
clients:VecDeque<TcpClient>,
}
struct UdpHost {
port:u16,
privacy:PortPrivacy,
socket:UdpSocket,
clients:HashMap<SocketAddr,UdpClient>,
}
struct TcpGuest {
stream:TcpStream,
}
struct UdpGuest {
socket:UdpSocket,
lastheartbeat:Instant,
}
struct TcpClient {
address:SocketAddr,
stream:TcpStream,
}
struct UdpClient {
address:SocketAddr,
lastheartbeat:Instant,
}
fn load_rules(rulefilepath:&str) -> HashSet<IpAddr> {
let mut rules:HashSet<IpAddr> = HashSet::new();
match File::open(rulefilepath) {
Err(why) => match why.kind() {
io::ErrorKind::NotFound => (),
_ => {
eprintln!("-!- failed to open {} to read IP address ruleing rules: {}",rulefilepath,why);
exit(1);
},
},
Ok(mut file) => {
let mut rulestring = String::new();
match file.read_to_string(&mut rulestring) {
Err(why) => {
eprintln!("-!- failed to read {} for IP address ruleing rules: {}",rulefilepath,why);
exit(1);
},
Ok(_) => (),
};
for line in rulestring.lines() {
match line.parse::<IpAddr>() {
Err(_why) => {
eprintln!("-!- could not parse '{}' in {} as an IP address",line,rulefilepath);
exit(1);
},
Ok(address) => rules.insert(address),
};
}
},
};
return rules;
}
fn main() {
let argv = args().collect::<Vec<String>>();
if argv.len() < 2 {
eprintln!("-!- usage: sunbeam [flags] [protocol][local port][privacy]... [protocol][remote resource address]:[remote port]...");
eprintln!();
eprintln!(" flags:");
eprintln!(" -l --loopback - also relay data to clients connecting on the same port");
eprintln!(" -m --mirror - also relay data back to its source");
eprintln!();
eprintln!(" prefix ports or addresses with a letter to indicate transport protocol:");
eprintln!(" t - tcp");
eprintln!(" u - udp");
eprintln!(" none - both tcp and udp");
eprintln!();
eprintln!(" suffix ports with a letter to indicate privacy:");
eprintln!(" p - local-only");
eprintln!(" w - addresses in .yesunbeam whitelist only");
eprintln!(" none - all except addresses in .nosunbeam blacklist");
exit(1);
}
let mut loopbackmode:bool = false;
let mut mirrormode:bool = false;
let mut addresses:Vec<(PortProtocol,SocketAddr)> = Vec::new();
let mut ports:Vec<(PortProtocol,PortPrivacy,u16)> = Vec::new();
let mut flags:Vec<char> = Vec::new();
let mut switches:Vec<&str> = Vec::new();
for arg in argv[1..].iter() {
if arg.starts_with("--") {
switches.push(arg.trim_matches('-'));
continue;
} else if arg.starts_with("-") {
for c in arg.trim_matches('-').chars() {
flags.push(c);
}
continue;
}
let strippedarg = arg.trim_end_matches(&PORT_SUFFIXES[..]).trim_start_matches(&PORT_PREFIXES[..]);
match strippedarg.parse::<u16>() {
Err(_why) => {
match strippedarg.parse::<SocketAddr>() {
Err(_why) => {
eprintln!("-!- could not parse `{}` as a socket address or port number",arg);
},
Ok(address) => {
let protocol = match arg.chars().next() {
Some('t') => PortProtocol::Tcp,
Some('u') => PortProtocol::Udp,
_ => PortProtocol::Both,
};
addresses.push((protocol,address));
},
};
},
Ok(n) => {
let privacy = match arg.chars().last() {
Some('w') => PortPrivacy::Whitelist,
Some('p') => PortPrivacy::Private,
_ => PortPrivacy::Public,
};
let protocol = match arg.chars().next() {
Some('t') => PortProtocol::Tcp,
Some('u') => PortProtocol::Udp,
_ => PortProtocol::Both,
};
ports.push((protocol,privacy,n));
}
};
}
for flag in flags.iter() {
match flag {
'l' => loopbackmode = true,
'm' => mirrormode = true,
c => {
eprintln!("-!- unrecognized flag character `{}`",c);
exit(1);
},
};
}
for switch in switches.iter() {
match switch {
&"loopback" => loopbackmode = true,
&"mirror" => mirrormode = true,
s => {
eprintln!("-!- unrecognized switch `{}`",s);
exit(1);
},
};
}
loopbackmode |= (addresses.len() == 0 && ports.len() == 1) || (addresses.len() > 0 && ports.len() == 0);
if loopbackmode {
println!("-i- running in loopback mode (relaying data between all clients)");
}
if mirrormode {
println!("-i- running in mirror mode (also relaying data back to source client)");
}
let blocklist = load_rules(BLOCKFILE_NAME);
let allowlist = load_rules(ALLOWFILE_NAME);
let heartbeat_holdoff = Duration::from_millis(HEARTBEAT_WAIT);
let heartbeat_timeout = Duration::from_millis(HEARTBEAT_DROP);
let mut connectors:VecDeque<Connector> = VecDeque::new();
while let Some((protocol,privacy,port)) = ports.pop() {
let bindaddress:&str = match privacy {
PortPrivacy::Private => "127.0.0.1",
_ => "[::]",
};
let privstring:&str = match privacy {
PortPrivacy::Private => "local loopback addresses only",
PortPrivacy::Whitelist => "addresses listed in .yesunbeam only",
PortPrivacy::Public => "all addresses not listed in .nosunbeam",
};
let mut tcp:Option<TcpHost> = None;
let mut udp:Option<UdpHost> = None;
if protocol == PortProtocol::Tcp || protocol == PortProtocol::Both {
match TcpListener::bind(&format!("{}:{}",bindaddress,port)) {
Err(why) => {
eprintln!("-!- failed to bind TCP listener: {}",why);
exit(1);
},
Ok(listener) => {
listener.set_nonblocking(true).expect("cannot set listener to nonblocking");
println!("-i- tcp port {}: accepting requests from {}",port,privstring);
tcp = Some(TcpHost {
port:port,
privacy:privacy.clone(),
listener:listener,
clients:VecDeque::new(),
});
},
};
}
if protocol == PortProtocol::Udp || protocol == PortProtocol::Both {
match UdpSocket::bind(&format!("{}:{}",bindaddress,port)) {
Err(why) => {
eprintln!("-!- failed to bind UDP socket: {}",why);
exit(1);
},
Ok(socket) => {
socket.set_nonblocking(true).expect("cannot set socket to nonblocking");
println!("-i- udp port {}: accepting requests from {}",port,privstring);
udp = Some(UdpHost {
port:port,
privacy:privacy.clone(),
socket:socket,
clients:HashMap::new(),
});
},
};
}
connectors.push_back(Connector::Host {
tcp:tcp,
udp:udp,
});
}
let mut udppackets:HashMap<SocketAddr,VecDeque<([u8;65535],usize)>> = HashMap::new();
let guestudpsocket:UdpSocket = match UdpSocket::bind("[::]:0") {
Err(why) => {
eprintln!("-!- failed to bind UDP socket to any port: {}",why);
exit(1);
},
Ok(socket) => socket,
};
guestudpsocket.set_nonblocking(true).expect("cannot set source socket to nonblocking");
let mut broken_tcpsources:VecDeque<SocketAddr> = VecDeque::new();
let mut returning_tcpsources:HashMap<SocketAddr,TcpStream> = HashMap::new();
while let Some((protocol,address)) = addresses.pop() {
udppackets.insert(address.clone(),VecDeque::new());
let mut tcp:Option<TcpGuest> = None;
let mut udp:Option<UdpGuest> = None;
if protocol == PortProtocol::Tcp || protocol == PortProtocol::Both {
match TcpStream::connect(&address) {
Err(why) => {
eprintln!("-!- failed to connect to remote resource ({}): {}. trying to reconnect...",address,why);
broken_tcpsources.push_back(address);
},
Ok(stream) => {
stream.set_nonblocking(true).expect("cannot set source stream to nonblocking");
stream.set_nodelay(true).expect("cannot set source stream to nodelay");
tcp = Some(TcpGuest {
stream:stream
});
},
};
}
if protocol == PortProtocol::Udp || protocol == PortProtocol::Both {
let socket = match UdpSocket::bind("[::]:0") {
Err(why) => {
eprintln!("-!- failed to bind UDP socket to dynamic port: {}",why);
exit(1);
},
Ok(s) => s,
};
let _ = socket.send_to(&[0x0a],&address);
udp = Some(UdpGuest {
socket:socket,
lastheartbeat:Instant::now(),
});
}
connectors.push_back(Connector::Guest {
address:address.clone(),
tcp:tcp,
udp:udp,
});
}
let (maintx,mainrx) = mpsc::channel();
let mut idlecycles:u64 = 0;
let mut tcp_buffer:[u8;65535] = [0x00;65535];
let mut tcp_nrecv:usize = 0;
let mut udp_buffer:[u8;65535] = [0x00;65535];
let mut udp_nrecv:usize = 0;
let mut sourceaddress:SocketAddr = "[::]:0".parse().expect("failed to parse socket address");
loop {
if idlecycles < IDLE_MAX {
idlecycles += 1;
}
match guestudpsocket.recv_from(&mut udp_buffer) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => (),
io::ErrorKind::Interrupted => (),
_ => {
eprintln!("-!- failed to receive packets from UDP socket: {}",why);
exit(1);
},
},
Ok((nrecv,srcaddr)) => {
if let Some(packets) = udppackets.get_mut(&srcaddr) {
packets.push_back((udp_buffer.clone(),nrecv));
}
},
};
if let Some(mut source) = connectors.pop_front() {
match &mut source {
Connector::Guest {address,tcp,udp} => {
if let Some(stream) = returning_tcpsources.remove(&address) {
*tcp = Some(TcpGuest {
stream:stream,
});
}
let mut broken:bool = false;
if let Some(ref mut guest) = tcp {
match guest.stream.read(&mut tcp_buffer) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => (),
_ => {
eprintln!("-!- failed to receive data from remote resource ({}): {}. trying to reconnect...",address,why);
broken_tcpsources.push_back(address.clone());
broken = true;
},
},
Ok(n) => if n == 0 {
eprintln!("-!- remote host closed the connection. trying to reconnect...");
broken_tcpsources.push_back(address.clone());
broken = true;
} else {
if mirrormode && n > 0 {
let _ = guest.stream.write_all(&tcp_buffer[..n]);
}
tcp_nrecv = n;
sourceaddress = address.clone();
},
};
}
if broken {
*tcp = None;
}
if let Some(ref mut guest) = udp {
if guest.lastheartbeat.elapsed() > heartbeat_holdoff {
let _ = guest.socket.send_to(&[0x0a],*address);
}
match guest.socket.recv_from(&mut udp_buffer) {
Err(why) => {
eprintln!("-!- failed to receive UDP packets: {}",why);
},
Ok((n,srcaddr)) => {
if &srcaddr == address {
if mirrormode && n > 0 {
let _ = guest.socket.send_to(&udp_buffer[..n],&srcaddr);
}
udp_nrecv = n;
sourceaddress = srcaddr.clone();
}
},
};
}
}, // guest connector
Connector::Host {tcp,udp} => {
if let Some(ref mut host) = tcp {
match host.listener.accept() {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => (),
_ => {
eprintln!("-!- failed to accept new connection on port {}: {}",host.port,why);
},
},
Ok((stream,address)) => {
if blocklist.contains(&address.ip()) {
println!("-x- refuse {}/tcp: {} (listed in {})",host.port,address,BLOCKFILE_NAME);
} else if host.privacy == PortPrivacy::Whitelist && !allowlist.contains(&address.ip()) {
println!("-x- refuse {}/tcp: {} (not listed in {})",host.port,address,ALLOWFILE_NAME);
} else {
println!("--> connect {}/tcp: {}",host.port,address);
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
host.clients.push_back(TcpClient {
address:address,
stream:stream,
});
}
},
};
if let Some(mut client) = host.clients.pop_front() {
match client.stream.read(&mut tcp_buffer) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => host.clients.push_back(client),
_ => {
println!("<-- disconnect {}: {} ({})",host.port,client.address,why);
},
},
Ok(n) => if n == 0 {
println!("<-- disconnect {}: {} (closed by remote endpoint)",host.port,client.address);
} else {
if mirrormode && n > 0 {
let _ = client.stream.write_all(&tcp_buffer[..n]);
}
tcp_nrecv = n;
host.clients.push_back(client);
},
};
}
}
if let Some(ref mut host) = udp {
match host.socket.recv_from(&mut udp_buffer) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => (),
_ => {
eprintln!("-!- failed to receive UDP packets on port {}: {}",host.port,why);
},
},
Ok((n,srcaddr)) => {
if blocklist.contains(&srcaddr.ip()) {
println!("-x- refuse {}/udp: {} (listed in {})",host.port,srcaddr,BLOCKFILE_NAME);
} else if host.privacy == PortPrivacy::Whitelist && !srcaddr.ip().is_loopback() && !allowlist.contains(&srcaddr.ip()) {
println!("-x- refuse {}/udp: {} (not listed in {})",host.port,srcaddr,ALLOWFILE_NAME);
} else {
if mirrormode && n > 0 {
let _ = host.socket.send_to(&udp_buffer[..n],&srcaddr);
}
udp_nrecv = n;
sourceaddress = srcaddr;
let newclient = UdpClient {
address:srcaddr.clone(),
lastheartbeat:Instant::now(),
};
match host.clients.insert(srcaddr.clone(),newclient) {
Some(_) => (),
None => println!("--> join {}/udp: {}",host.port,srcaddr),
};
}
},
};
let mut deadclients:Vec<SocketAddr> = Vec::new();
for client in host.clients.values() {
if client.lastheartbeat.elapsed() > heartbeat_timeout {
deadclients.push(client.address.clone());
println!("<-- drop {}/udp: {}",host.port,client.address);
}
}
for addr in deadclients.iter() {
host.clients.remove(&addr);
}
}
if loopbackmode {
if let Some(ref mut host) = tcp {
for client in host.clients.iter_mut() {
if client.address == sourceaddress {
continue;
}
if tcp_nrecv > 0 {
let _ = client.stream.write_all(&tcp_buffer[..tcp_nrecv]);
}
if udp_nrecv > 0 {
let _ = client.stream.write_all(&udp_buffer[..udp_nrecv]);
}
}
}
if let Some(ref mut host) = udp {
for client in host.clients.values() {
if client.address == sourceaddress {
continue;
}
if tcp_nrecv > 0 {
let _ = host.socket.send_to(&tcp_buffer[..tcp_nrecv],&client.address);
}
if udp_nrecv > 0 {
let _ = host.socket.send_to(&udp_buffer[..udp_nrecv],&client.address);
}
}
}
}
}, // host connector
};
for connector in connectors.iter_mut() {
match connector {
Connector::Host {tcp,udp} => {
if let Some(ref mut host) = tcp {
for client in host.clients.iter_mut() {
if tcp_nrecv > 0 {
let _ = client.stream.write_all(&tcp_buffer[..tcp_nrecv]);
}
if udp_nrecv > 0 {
let _ = client.stream.write_all(&udp_buffer[..udp_nrecv]);
}
}
}
if let Some(ref mut host) = udp {
for client in host.clients.values() {
if tcp_nrecv > 0 {
let _ = host.socket.send_to(&tcp_buffer[..tcp_nrecv],&client.address);
}
if udp_nrecv > 0 {
let _ = host.socket.send_to(&udp_buffer[..udp_nrecv],&client.address);
}
}
}
},
Connector::Guest {address,tcp,udp} => {
if let Some(ref mut guest) = tcp {
if tcp_nrecv > 0 {
let _ = guest.stream.write_all(&tcp_buffer[..tcp_nrecv]);
}
if udp_nrecv > 0 {
let _ = guest.stream.write_all(&udp_buffer[..udp_nrecv]);
}
}
if let Some(ref mut guest) = udp {
if tcp_nrecv > 0 {
let _ = guest.socket.send_to(&tcp_buffer[..tcp_nrecv],*address);
}
if udp_nrecv > 0 {
let _ = guest.socket.send_to(&udp_buffer[..udp_nrecv],*address);
}
}
},
};
}
if tcp_nrecv > 0 || udp_nrecv > 0 {
idlecycles = 0;
}
tcp_nrecv = 0;
udp_nrecv = 0;
connectors.push_back(source);
}
if let Some(threadaddress) = broken_tcpsources.pop_front() {
let threadtx = mpsc::Sender::clone(&maintx);
thread::spawn(move || {
loop {
match TcpStream::connect(&threadaddress) {
Err(_why) => thread::sleep(Duration::from_millis(RECONNECT_WAIT)),
Ok(stream) => {
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
println!("-i- connection to remote resource at {} reestablished",threadaddress);
let _ = threadtx.send((stream,threadaddress));
break;
},
};
}
});
}
if let Ok((stream,address)) = mainrx.try_recv() {
returning_tcpsources.insert(address,stream);
}
if idlecycles > IDLE_THRESH {
thread::sleep(Duration::from_micros(idlecycles*IDLE_SCALING-IDLE_THRESH));
}
}
}