sunbeam/sunbeam.rs

268 lines
8.6 KiB
Rust
Raw Normal View History

use std::collections::{VecDeque,HashSet};
2019-06-01 06:06:27 +00:00
use std::env::args;
use std::fs::File;
2019-05-31 05:42:37 +00:00
use std::io;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream,SocketAddr,IpAddr};
2019-05-31 05:42:37 +00:00
use std::process::exit;
2019-06-01 06:06:27 +00:00
use std::sync::mpsc;
use std::thread;
2019-05-31 05:42:37 +00:00
use std::time::Duration;
2019-06-01 22:08:22 +00:00
static IDLE_THRESH:u64 = 256; // cycles of doing nothing before throttling back to slightly slower execution speed
static IDLE_SCALING:u64 = 1; // after crossing IDLE_THRESH, increase cycle delay by this many microseconds per cycle
static IDLE_MAX:u64 = 65535; // maximum number of idle cycles to count up before stopping to count
// IDLE_MAX*IDLE_SCALING-IDLE_THRESH = maximum cycle delay in microseconds
2019-06-01 06:06:27 +00:00
2019-06-01 22:08:22 +00:00
static RECONNECT_WAIT:u64 = 1000; // time in ms to wait between reconnect attempts
static BUFFERSIZE_INITIAL:usize = 511; // how big the buffer capacity should be to start: bigger = maybe faster but more memory usage
static LOOPBACK_FLAG:&str = "-l"; // flag passed to sunbeam to activate loopback mode
static BLOCKFILE_NAME:&str = ".nosunbeam"; // file to check for as the client blacklist
static ALLOWFILE_NAME:&str = ".yesunbeam"; // file to check for as the client whitelist, for Whitelist-mode connectors
2019-06-01 18:10:20 +00:00
#[derive(PartialEq)]
enum PortPrivacy {
Public,
Whitelist,
Private,
}
2019-05-31 05:42:37 +00:00
2019-06-02 02:56:03 +00:00
fn load_rules(rulefilepath:&str) -> HashSet<IpAddr> {
let mut rules:HashSet<IpAddr> = HashSet::new();
match File::open(rulefilepath) {
Err(why) => match why.kind() {
io::ErrorKind::NotFound => (),
_ => {
eprintln!("-!- failed to open {} to read IP address ruleing rules: {}",rulefilepath,why);
exit(1);
},
},
Ok(mut file) => {
let mut rulestring = String::new();
match file.read_to_string(&mut rulestring) {
Err(why) => {
eprintln!("-!- failed to read {} for IP address ruleing rules: {}",rulefilepath,why);
exit(1);
},
Ok(_) => (),
};
for line in rulestring.lines() {
match line.parse::<IpAddr>() {
Err(_why) => {
eprintln!("-!- could not parse '{}' in {} as an IP address",line,rulefilepath);
exit(1);
},
Ok(address) => rules.insert(address),
};
}
},
};
return rules;
}
2019-05-31 05:42:37 +00:00
fn main() {
2019-06-01 06:06:27 +00:00
let argv = args().collect::<Vec<String>>();
if argv.len() < 2 {
eprintln!("-!- usage: sunbeam [local port]... [remote resource address]:[remote port]...");
exit(1);
}
let mut loopbackmode:bool = false;
let mut addresses:Vec<SocketAddr> = Vec::new();
2019-06-01 18:10:20 +00:00
let mut ports:Vec<(PortPrivacy,u16)> = Vec::new();
2019-06-01 06:06:27 +00:00
for arg in argv[1..].iter() {
if arg == LOOPBACK_FLAG {
loopbackmode = true;
} else {
2019-06-01 18:10:20 +00:00
match arg.trim_end_matches(&['p','w'][..]).parse::<u16>() {
2019-06-01 06:06:27 +00:00
Err(_why) => {
2019-06-01 18:10:20 +00:00
match arg.parse::<SocketAddr>() {
2019-06-01 06:06:27 +00:00
Err(_why) => {
eprintln!("-!- could not parse `{}` as a socket address or port number",arg);
},
Ok(address) => addresses.push(address),
};
},
2019-06-02 03:09:02 +00:00
Ok(n) => match arg.chars().last() {
Some('w') => {
println!("-i- accepting only connections whitelisted in {} on port {}",ALLOWFILE_NAME,n);
ports.push((PortPrivacy::Whitelist,n));
},
Some('p') => {
println!("-i- accepting only local connections on port {}",n);
ports.push((PortPrivacy::Private,n));
},
_ => ports.push((PortPrivacy::Public,n)),
2019-06-01 18:10:20 +00:00
},
2019-06-01 06:06:27 +00:00
};
}
}
loopbackmode |= (addresses.len() == 0 && ports.len() == 1) || (addresses.len() > 0 && ports.len() == 0);
if loopbackmode {
println!("-i- running in loopback mode (relaying data between all connections)");
}
2019-06-02 02:56:03 +00:00
let blocklist = load_rules(BLOCKFILE_NAME);
let allowlist = load_rules(ALLOWFILE_NAME);
2019-06-01 18:10:20 +00:00
let mut connectors:VecDeque<(PortPrivacy,u16,TcpListener,VecDeque<(TcpStream,SocketAddr)>)> = VecDeque::new();
2019-06-02 02:56:03 +00:00
while let Some((privacy,port)) = ports.pop() {
2019-06-01 18:10:20 +00:00
let bindaddress:&str = match privacy {
PortPrivacy::Private => "127.0.0.1",
_ => "[::]",
};
match TcpListener::bind(&format!("{}:{}",bindaddress,port)) {
2019-05-31 05:42:37 +00:00
Err(why) => {
2019-06-01 06:06:27 +00:00
eprintln!("-!- failed to bind TCP listener: {}",why);
2019-05-31 05:42:37 +00:00
exit(1);
},
2019-06-01 06:06:27 +00:00
Ok(listener) => {
listener.set_nonblocking(true).expect("cannot set listener to nonblocking");
2019-06-02 02:56:03 +00:00
connectors.push_back((privacy,port,listener,VecDeque::new()));
2019-06-01 06:06:27 +00:00
},
2019-05-31 05:42:37 +00:00
};
2019-06-01 06:06:27 +00:00
}
let (maintx,mainrx) = mpsc::channel();
2019-05-31 05:42:37 +00:00
2019-06-01 22:08:22 +00:00
let mut idlecycles:u64 = 0;
2019-06-01 06:06:27 +00:00
let mut bottle:Vec<u8> = Vec::with_capacity(BUFFERSIZE_INITIAL);
let mut sources:VecDeque<(TcpStream,SocketAddr)> = VecDeque::new();
let mut brokensources:VecDeque<SocketAddr> = VecDeque::new();
for address in addresses.iter() {
match TcpStream::connect(&address) {
Err(why) => {
eprintln!("-!- failed to connect to remote resource ({}): {}. trying to reconnect...",address,why);
brokensources.push_back(address.clone());
},
Ok(stream) => {
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
sources.push_back((stream,address.clone()));
},
};
}
loop {
2019-06-02 02:56:03 +00:00
2019-06-01 22:08:22 +00:00
if idlecycles < IDLE_MAX {
2019-05-31 05:42:37 +00:00
idlecycles += 1;
2019-06-01 06:06:27 +00:00
}
bottle.clear();
if let Some((mut source,address)) = sources.pop_front() {
match source.read_to_end(&mut bottle) {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => {
if loopbackmode {
for (othersource,_otheraddress) in sources.iter_mut() {
let _ = othersource.write_all(&bottle);
}
}
sources.push_back((source,address));
},
_ => {
eprintln!("-!- failed to receive data from remote resource ({}): {}. trying to reconnect...",address,why);
brokensources.push_back(address);
},
},
Ok(_) => {
eprintln!("-!- remote resource closed the connection. trying to reconnect...");
brokensources.push_back(address);
},
};
if bottle.len() > 0 {
idlecycles = 0;
}
2019-06-01 18:10:20 +00:00
for (_privacy,_port,_listener,otherconnects) in connectors.iter_mut() {
2019-06-01 06:06:27 +00:00
for (otherstream,_otheraddress) in otherconnects.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
}
}
bottle.clear();
2019-06-01 18:10:20 +00:00
if let Some((privacy,port,listener,mut connections)) = connectors.pop_front() {
2019-05-31 05:42:37 +00:00
match listener.accept() {
Err(why) => match why.kind() {
io::ErrorKind::WouldBlock => (),
_ => {
2019-06-01 06:06:27 +00:00
eprintln!("-!- failed to accept new connection: {}",why);
2019-05-31 05:42:37 +00:00
},
},
Ok((stream,address)) => {
if blocklist.contains(&address.ip()) {
println!("-x- refuse {}: {} (listed in {})",port,address,BLOCKFILE_NAME);
2019-06-01 18:10:20 +00:00
} else if privacy == PortPrivacy::Whitelist && !address.ip().is_loopback() && !allowlist.contains(&address.ip()) {
println!("-x- refuse {}: {} (not listed in {})",port,address,ALLOWFILE_NAME);
} else {
println!("--> connect {}: {}",port,address);
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
connections.push_back((stream,address));
}
2019-05-31 05:42:37 +00:00
},
};
2019-06-01 06:06:27 +00:00
if let Some((mut stream,address)) = connections.pop_front() {
match stream.read_to_end(&mut bottle) {
2019-05-31 05:42:37 +00:00
Err(why) => match why.kind() {
2019-06-01 06:06:27 +00:00
io::ErrorKind::WouldBlock => {
if loopbackmode {
for (otherstream,_otheraddress) in connections.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
2019-05-31 05:42:37 +00:00
}
connections.push_back((stream,address));
2019-06-01 06:06:27 +00:00
},
_ => {
println!("<-- disconnect {}: {} ({})",port,address,why);
2019-05-31 05:42:37 +00:00
},
},
2019-06-01 06:06:27 +00:00
Ok(_) => {
println!("<-- disconnect {}: {} (closed by remote endpoint)",port,address);
2019-06-01 06:06:27 +00:00
},
};
if bottle.len() > 0 {
idlecycles = 0;
}
2019-06-01 18:10:20 +00:00
for (_privacy,_port,_listener,otherconnects) in connectors.iter_mut() {
2019-06-01 06:06:27 +00:00
for (otherstream,_otheraddress) in otherconnects.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
}
for (otherstream,_otheraddress) in sources.iter_mut() {
let _ = otherstream.write_all(&bottle);
}
2019-05-31 05:42:37 +00:00
}
2019-06-01 18:10:20 +00:00
connectors.push_back((privacy,port,listener,connections));
2019-05-31 05:42:37 +00:00
}
2019-06-01 06:06:27 +00:00
if let Some(threadaddress) = brokensources.pop_front() {
let threadtx = mpsc::Sender::clone(&maintx);
thread::spawn(move || {
loop {
match TcpStream::connect(&threadaddress) {
2019-06-01 22:08:22 +00:00
Err(_why) => thread::sleep(Duration::from_millis(RECONNECT_WAIT)),
2019-06-01 06:06:27 +00:00
Ok(stream) => {
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
stream.set_nodelay(true).expect("cannot set stream to nodelay");
println!("-i- connection to remote resource at {} reestablished",threadaddress);
let _ = threadtx.send((stream,threadaddress));
break;
},
};
}
});
}
if let Ok(reconnection) = mainrx.try_recv() {
sources.push_back(reconnection);
}
2019-05-31 05:42:37 +00:00
2019-06-01 22:08:22 +00:00
if idlecycles > IDLE_THRESH {
thread::sleep(Duration::from_micros(idlecycles*IDLE_SCALING-IDLE_THRESH));
2019-06-01 06:06:27 +00:00
}
}
2019-05-31 05:42:37 +00:00
}