/* Sunbeam Network Relay v0.3 Elizabeth Evelene Amelia Diode June 2019 */ use std::collections::{VecDeque,HashSet,HashMap}; use std::env::args; use std::fs::File; use std::io; use std::io::prelude::*; use std::net::{TcpListener,TcpStream,UdpSocket,SocketAddr,IpAddr}; use std::process::exit; use std::sync::mpsc; use std::thread; use std::time::{Duration,Instant}; static IDLE_THRESH:u64 = 256; // cycles of doing nothing before throttling back to slightly slower execution speed static IDLE_SCALING:u64 = 1; // after crossing IDLE_THRESH, increase cycle delay by this many microseconds per cycle static IDLE_MAX:u64 = 65535; // maximum number of idle cycles to count up before stopping to count // IDLE_MAX*IDLE_SCALING-IDLE_THRESH = maximum cycle delay in microseconds static RECONNECT_WAIT:u64 = 1000; // time in ms to wait between reconnect attempts static BLOCKFILE_NAME:&str = ".nosunbeam"; // file to check for as the client blacklist static ALLOWFILE_NAME:&str = ".yesunbeam"; // file to check for as the client whitelist, for Whitelist-mode connectors static PORT_PREFIXES:[char;2] = ['u','t']; static PORT_SUFFIXES:[char;2] = ['p','w']; static HEARTBEAT_WAIT:u64 = 6000; static HEARTBEAT_DROP:u64 = 60000; #[derive(PartialEq)] #[derive(Clone)] enum PortPrivacy { Public, Whitelist, Private, } #[derive(PartialEq)] enum PortProtocol { Udp, Tcp, Both, } enum Connector { Host { tcp:Option, udp:Option, }, Guest { address:SocketAddr, tcp:Option, udp:Option, }, } struct TcpHost { port:u16, privacy:PortPrivacy, listener:TcpListener, clients:VecDeque, } struct UdpHost { port:u16, privacy:PortPrivacy, socket:UdpSocket, clients:HashMap, } struct TcpGuest { stream:TcpStream, } struct UdpGuest { socket:UdpSocket, lastheartbeat:Instant, } struct TcpClient { address:SocketAddr, stream:TcpStream, } struct UdpClient { address:SocketAddr, lastheartbeat:Instant, } fn load_rules(rulefilepath:&str) -> HashSet { let mut rules:HashSet = HashSet::new(); match File::open(rulefilepath) { Err(why) => match why.kind() { io::ErrorKind::NotFound => (), _ => { eprintln!("-!- failed to open {} to read IP address ruleing rules: {}",rulefilepath,why); exit(1); }, }, Ok(mut file) => { let mut rulestring = String::new(); match file.read_to_string(&mut rulestring) { Err(why) => { eprintln!("-!- failed to read {} for IP address ruleing rules: {}",rulefilepath,why); exit(1); }, Ok(_) => (), }; for line in rulestring.lines() { match line.parse::() { Err(_why) => { eprintln!("-!- could not parse '{}' in {} as an IP address",line,rulefilepath); exit(1); }, Ok(address) => rules.insert(address), }; } }, }; return rules; } fn main() { let argv = args().collect::>(); if argv.len() < 2 { eprintln!("-!- usage: sunbeam [flags] [protocol][local port][privacy]... [protocol][remote resource address]:[remote port]..."); eprintln!(); eprintln!(" flags:"); eprintln!(" -l --loopback - also relay data to clients connecting on the same port"); eprintln!(" -m --mirror - also relay data back to its source"); eprintln!(); eprintln!(" prefix ports or addresses with a letter to indicate transport protocol:"); eprintln!(" t - tcp"); eprintln!(" u - udp"); eprintln!(" none - both tcp and udp"); eprintln!(); eprintln!(" suffix ports with a letter to indicate privacy:"); eprintln!(" p - local-only"); eprintln!(" w - addresses in .yesunbeam whitelist only"); eprintln!(" none - all except addresses in .nosunbeam blacklist"); exit(1); } let mut loopbackmode:bool = false; let mut mirrormode:bool = false; let mut addresses:Vec<(PortProtocol,SocketAddr)> = Vec::new(); let mut ports:Vec<(PortProtocol,PortPrivacy,u16)> = Vec::new(); let mut flags:Vec = Vec::new(); let mut switches:Vec<&str> = Vec::new(); for arg in argv[1..].iter() { if arg.starts_with("--") { switches.push(arg.trim_matches('-')); continue; } else if arg.starts_with("-") { for c in arg.trim_matches('-').chars() { flags.push(c); } continue; } let strippedarg = arg.trim_end_matches(&PORT_SUFFIXES[..]).trim_start_matches(&PORT_PREFIXES[..]); match strippedarg.parse::() { Err(_why) => { match strippedarg.parse::() { Err(_why) => { eprintln!("-!- could not parse `{}` as a socket address or port number",arg); }, Ok(address) => { let protocol = match arg.chars().next() { Some('t') => PortProtocol::Tcp, Some('u') => PortProtocol::Udp, _ => PortProtocol::Both, }; addresses.push((protocol,address)); }, }; }, Ok(n) => { let privacy = match arg.chars().last() { Some('w') => PortPrivacy::Whitelist, Some('p') => PortPrivacy::Private, _ => PortPrivacy::Public, }; let protocol = match arg.chars().next() { Some('t') => PortProtocol::Tcp, Some('u') => PortProtocol::Udp, _ => PortProtocol::Both, }; ports.push((protocol,privacy,n)); } }; } for flag in flags.iter() { match flag { 'l' => loopbackmode = true, 'm' => mirrormode = true, c => { eprintln!("-!- unrecognized flag character `{}`",c); exit(1); }, }; } for switch in switches.iter() { match switch { &"loopback" => loopbackmode = true, &"mirror" => mirrormode = true, s => { eprintln!("-!- unrecognized switch `{}`",s); exit(1); }, }; } loopbackmode |= (addresses.len() == 0 && ports.len() == 1) || (addresses.len() > 0 && ports.len() == 0); if loopbackmode { println!("-i- running in loopback mode (relaying data between all clients)"); } if mirrormode { println!("-i- running in mirror mode (also relaying data back to source client)"); } let blocklist = load_rules(BLOCKFILE_NAME); let allowlist = load_rules(ALLOWFILE_NAME); let heartbeat_holdoff = Duration::from_millis(HEARTBEAT_WAIT); let heartbeat_timeout = Duration::from_millis(HEARTBEAT_DROP); let mut connectors:VecDeque = VecDeque::new(); while let Some((protocol,privacy,port)) = ports.pop() { let bindaddress:&str = match privacy { PortPrivacy::Private => "127.0.0.1", _ => "[::]", }; let privstring:&str = match privacy { PortPrivacy::Private => "local loopback addresses only", PortPrivacy::Whitelist => "addresses listed in .yesunbeam only", PortPrivacy::Public => "all addresses not listed in .nosunbeam", }; let mut tcp:Option = None; let mut udp:Option = None; if protocol == PortProtocol::Tcp || protocol == PortProtocol::Both { match TcpListener::bind(&format!("{}:{}",bindaddress,port)) { Err(why) => { eprintln!("-!- failed to bind TCP listener: {}",why); exit(1); }, Ok(listener) => { listener.set_nonblocking(true).expect("cannot set listener to nonblocking"); println!("-i- tcp port {}: accepting requests from {}",port,privstring); tcp = Some(TcpHost { port:port, privacy:privacy.clone(), listener:listener, clients:VecDeque::new(), }); }, }; } if protocol == PortProtocol::Udp || protocol == PortProtocol::Both { match UdpSocket::bind(&format!("{}:{}",bindaddress,port)) { Err(why) => { eprintln!("-!- failed to bind UDP socket: {}",why); exit(1); }, Ok(socket) => { socket.set_nonblocking(true).expect("cannot set socket to nonblocking"); println!("-i- udp port {}: accepting requests from {}",port,privstring); udp = Some(UdpHost { port:port, privacy:privacy.clone(), socket:socket, clients:HashMap::new(), }); }, }; } connectors.push_back(Connector::Host { tcp:tcp, udp:udp, }); } let mut udppackets:HashMap> = HashMap::new(); let guestudpsocket:UdpSocket = match UdpSocket::bind("[::]:0") { Err(why) => { eprintln!("-!- failed to bind UDP socket to any port: {}",why); exit(1); }, Ok(socket) => socket, }; guestudpsocket.set_nonblocking(true).expect("cannot set source socket to nonblocking"); let mut broken_tcpsources:VecDeque = VecDeque::new(); let mut returning_tcpsources:HashMap = HashMap::new(); while let Some((protocol,address)) = addresses.pop() { udppackets.insert(address.clone(),VecDeque::new()); let mut tcp:Option = None; let mut udp:Option = None; if protocol == PortProtocol::Tcp || protocol == PortProtocol::Both { match TcpStream::connect(&address) { Err(why) => { eprintln!("-!- failed to connect to remote resource ({}): {}. trying to reconnect...",address,why); broken_tcpsources.push_back(address); }, Ok(stream) => { stream.set_nonblocking(true).expect("cannot set source stream to nonblocking"); stream.set_nodelay(true).expect("cannot set source stream to nodelay"); tcp = Some(TcpGuest { stream:stream }); }, }; } if protocol == PortProtocol::Udp || protocol == PortProtocol::Both { let socket = match UdpSocket::bind("[::]:0") { Err(why) => { eprintln!("-!- failed to bind UDP socket to dynamic port: {}",why); exit(1); }, Ok(s) => s, }; let _ = socket.send_to(&[0x0a],&address); udp = Some(UdpGuest { socket:socket, lastheartbeat:Instant::now(), }); } connectors.push_back(Connector::Guest { address:address.clone(), tcp:tcp, udp:udp, }); } let (maintx,mainrx) = mpsc::channel(); let mut idlecycles:u64 = 0; let mut tcp_buffer:[u8;65535] = [0x00;65535]; let mut tcp_nrecv:usize = 0; let mut udp_buffer:[u8;65535] = [0x00;65535]; let mut udp_nrecv:usize = 0; let mut sourceaddress:SocketAddr = "[::]:0".parse().expect("failed to parse socket address"); loop { if idlecycles < IDLE_MAX { idlecycles += 1; } match guestudpsocket.recv_from(&mut udp_buffer) { Err(why) => match why.kind() { io::ErrorKind::WouldBlock => (), io::ErrorKind::Interrupted => (), _ => { eprintln!("-!- failed to receive packets from UDP socket: {}",why); exit(1); }, }, Ok((nrecv,srcaddr)) => { if let Some(packets) = udppackets.get_mut(&srcaddr) { packets.push_back((udp_buffer.clone(),nrecv)); } }, }; if let Some(mut source) = connectors.pop_front() { match &mut source { Connector::Guest {address,tcp,udp} => { if let Some(stream) = returning_tcpsources.remove(&address) { *tcp = Some(TcpGuest { stream:stream, }); } let mut broken:bool = false; if let Some(ref mut guest) = tcp { match guest.stream.read(&mut tcp_buffer) { Err(why) => match why.kind() { io::ErrorKind::WouldBlock => (), _ => { eprintln!("-!- failed to receive data from remote resource ({}): {}. trying to reconnect...",address,why); broken_tcpsources.push_back(address.clone()); broken = true; }, }, Ok(n) => if n == 0 { eprintln!("-!- remote host closed the connection. trying to reconnect..."); broken_tcpsources.push_back(address.clone()); broken = true; } else { if mirrormode && n > 0 { let _ = guest.stream.write_all(&tcp_buffer[..n]); } tcp_nrecv = n; sourceaddress = address.clone(); }, }; } if broken { *tcp = None; } if let Some(ref mut guest) = udp { if guest.lastheartbeat.elapsed() > heartbeat_holdoff { let _ = guest.socket.send_to(&[0x0a],*address); } match guest.socket.recv_from(&mut udp_buffer) { Err(why) => { eprintln!("-!- failed to receive UDP packets: {}",why); }, Ok((n,srcaddr)) => { if &srcaddr == address { if mirrormode && n > 0 { let _ = guest.socket.send_to(&udp_buffer[..n],&srcaddr); } udp_nrecv = n; sourceaddress = srcaddr.clone(); } }, }; } }, // guest connector Connector::Host {tcp,udp} => { if let Some(ref mut host) = tcp { match host.listener.accept() { Err(why) => match why.kind() { io::ErrorKind::WouldBlock => (), _ => { eprintln!("-!- failed to accept new connection on port {}: {}",host.port,why); }, }, Ok((stream,address)) => { if blocklist.contains(&address.ip()) { println!("-x- refuse {}/tcp: {} (listed in {})",host.port,address,BLOCKFILE_NAME); } else if host.privacy == PortPrivacy::Whitelist && !allowlist.contains(&address.ip()) { println!("-x- refuse {}/tcp: {} (not listed in {})",host.port,address,ALLOWFILE_NAME); } else { println!("--> connect {}/tcp: {}",host.port,address); stream.set_nonblocking(true).expect("cannot set stream to nonblocking"); stream.set_nodelay(true).expect("cannot set stream to nodelay"); host.clients.push_back(TcpClient { address:address, stream:stream, }); } }, }; if let Some(mut client) = host.clients.pop_front() { match client.stream.read(&mut tcp_buffer) { Err(why) => match why.kind() { io::ErrorKind::WouldBlock => host.clients.push_back(client), _ => { println!("<-- disconnect {}: {} ({})",host.port,client.address,why); }, }, Ok(n) => if n == 0 { println!("<-- disconnect {}: {} (closed by remote endpoint)",host.port,client.address); } else { if mirrormode && n > 0 { let _ = client.stream.write_all(&tcp_buffer[..n]); } tcp_nrecv = n; host.clients.push_back(client); }, }; } } if let Some(ref mut host) = udp { match host.socket.recv_from(&mut udp_buffer) { Err(why) => match why.kind() { io::ErrorKind::WouldBlock => (), _ => { eprintln!("-!- failed to receive UDP packets on port {}: {}",host.port,why); }, }, Ok((n,srcaddr)) => { if blocklist.contains(&srcaddr.ip()) { println!("-x- refuse {}/udp: {} (listed in {})",host.port,srcaddr,BLOCKFILE_NAME); } else if host.privacy == PortPrivacy::Whitelist && !srcaddr.ip().is_loopback() && !allowlist.contains(&srcaddr.ip()) { println!("-x- refuse {}/udp: {} (not listed in {})",host.port,srcaddr,ALLOWFILE_NAME); } else { if mirrormode && n > 0 { let _ = host.socket.send_to(&udp_buffer[..n],&srcaddr); } udp_nrecv = n; sourceaddress = srcaddr; let newclient = UdpClient { address:srcaddr.clone(), lastheartbeat:Instant::now(), }; match host.clients.insert(srcaddr.clone(),newclient) { Some(_) => (), None => println!("--> join {}/udp: {}",host.port,srcaddr), }; } }, }; let mut deadclients:Vec = Vec::new(); for client in host.clients.values() { if client.lastheartbeat.elapsed() > heartbeat_timeout { deadclients.push(client.address.clone()); println!("<-- drop {}/udp: {}",host.port,client.address); } } for addr in deadclients.iter() { host.clients.remove(&addr); } } if loopbackmode { if let Some(ref mut host) = tcp { for client in host.clients.iter_mut() { if client.address == sourceaddress { continue; } if tcp_nrecv > 0 { let _ = client.stream.write_all(&tcp_buffer[..tcp_nrecv]); } if udp_nrecv > 0 { let _ = client.stream.write_all(&udp_buffer[..udp_nrecv]); } } } if let Some(ref mut host) = udp { for client in host.clients.values() { if client.address == sourceaddress { continue; } if tcp_nrecv > 0 { let _ = host.socket.send_to(&tcp_buffer[..tcp_nrecv],&client.address); } if udp_nrecv > 0 { let _ = host.socket.send_to(&udp_buffer[..udp_nrecv],&client.address); } } } } }, // host connector }; for connector in connectors.iter_mut() { match connector { Connector::Host {tcp,udp} => { if let Some(ref mut host) = tcp { for client in host.clients.iter_mut() { if tcp_nrecv > 0 { let _ = client.stream.write_all(&tcp_buffer[..tcp_nrecv]); } if udp_nrecv > 0 { let _ = client.stream.write_all(&udp_buffer[..udp_nrecv]); } } } if let Some(ref mut host) = udp { for client in host.clients.values() { if tcp_nrecv > 0 { let _ = host.socket.send_to(&tcp_buffer[..tcp_nrecv],&client.address); } if udp_nrecv > 0 { let _ = host.socket.send_to(&udp_buffer[..udp_nrecv],&client.address); } } } }, Connector::Guest {address,tcp,udp} => { if let Some(ref mut guest) = tcp { if tcp_nrecv > 0 { let _ = guest.stream.write_all(&tcp_buffer[..tcp_nrecv]); } if udp_nrecv > 0 { let _ = guest.stream.write_all(&udp_buffer[..udp_nrecv]); } } if let Some(ref mut guest) = udp { if tcp_nrecv > 0 { let _ = guest.socket.send_to(&tcp_buffer[..tcp_nrecv],*address); } if udp_nrecv > 0 { let _ = guest.socket.send_to(&udp_buffer[..udp_nrecv],*address); } } }, }; } if tcp_nrecv > 0 || udp_nrecv > 0 { idlecycles = 0; } tcp_nrecv = 0; udp_nrecv = 0; connectors.push_back(source); } if let Some(threadaddress) = broken_tcpsources.pop_front() { let threadtx = mpsc::Sender::clone(&maintx); thread::spawn(move || { loop { match TcpStream::connect(&threadaddress) { Err(_why) => thread::sleep(Duration::from_millis(RECONNECT_WAIT)), Ok(stream) => { stream.set_nonblocking(true).expect("cannot set stream to nonblocking"); stream.set_nodelay(true).expect("cannot set stream to nodelay"); println!("-i- connection to remote resource at {} reestablished",threadaddress); let _ = threadtx.send((stream,threadaddress)); break; }, }; } }); } if let Ok((stream,address)) = mainrx.try_recv() { returning_tcpsources.insert(address,stream); } if idlecycles > IDLE_THRESH { thread::sleep(Duration::from_micros(idlecycles*IDLE_SCALING-IDLE_THRESH)); } } }