overhauled absolutely everything and now it also supports UDP
This commit is contained in:
parent
a618abf463
commit
5a68305fee
37
README.md
37
README.md
|
@ -1,27 +1,25 @@
|
|||
# Sunbeam TCP Relay v0.2
|
||||
# Sunbeam TCP Relay v0.3
|
||||
#### Elizabeth Evelene Amelia Diode, June 2019
|
||||
#### AGPL v3
|
||||
|
||||
---
|
||||
|
||||
### Oh no, what is it this time?
|
||||
Sunbeam is a simple and fast multi-endpoint TCP relay for
|
||||
Sunbeam is a simple and fast multi-endpoint TCP/UDP relay for
|
||||
for um
|
||||
we swear we had a perfectly reasonable use in mind, but we honestly can't remember it now.
|
||||
|
||||
---
|
||||
|
||||
### What it does
|
||||
Sunbeam is a little tiny server that passes data between TCP connections.
|
||||
Sunbeam is a little tiny server that relays data between clients.
|
||||
|
||||
Run at the command line, Sunbeam takes any number of local ports (numbers between 1 and 65535)
|
||||
and remote addresses (strings like [ip address]:[port]). For each address argument passed,
|
||||
Sunbeam attempts to connect to that address, and for each port argument, it listens for incoming
|
||||
connections on that port locally. Whenever it receives data over any connection, that data is
|
||||
promptly retransmitted to all other connections, save for two exceptions:
|
||||
|
||||
- Data is not passed between clients that connect to Sunbeam on the same port.
|
||||
- Data is not passed between servers that Sunbeam connects to.
|
||||
promptly retransmitted to all other connections, with the exception that data is not passed
|
||||
between clients that connect to Sunbeam on the same port.
|
||||
|
||||
For example, Sunbeam can be used as a relay for an audio stream by giving it the address of the
|
||||
stream server and a local port. Clients connecting to the local port will receive the stream
|
||||
|
@ -34,15 +32,27 @@ can also pass the `-l` flag to require Sunbeam to run in loopback mode.
|
|||
|
||||
General usage looks like the following:
|
||||
`sunbeam [-l] [local port] [remote address]:[port]`
|
||||
|
||||
|
||||
Sunbeam is capable of using both TCP and UDP for sending and receiving data, and can pass data
|
||||
between TCP and UDP streams. By default, Sunbeam attempts to use both protocols for every port
|
||||
and address argument passed to it (e.g. if port 4444 is specified at the command line, it will
|
||||
bind to both UDP port 4444 and TCP port 4444, and if an address is given, it will connect via TCP
|
||||
and also begin sending UDP keepalives (packets containing only `\n`) to that address). You can
|
||||
specify that Sunbeam should only use one protocol or the other by prefixing the argument with a
|
||||
letter - `t` for TCP or `u` for UDP.
|
||||
|
||||
Unless Sunbeam is running in loopback mode, data sent to a given port number over UDP will not be
|
||||
relayed to clients connecting to the same port number via TCP, nor vice versa - ports with the
|
||||
same number are treated as the same port.
|
||||
|
||||
---
|
||||
|
||||
### How to block IP addresses
|
||||
It's 2019, and a server that can't filter incoming connections is a server that belongs to The
|
||||
Enemy. That's why Sunbeam, minimal as it is, includes this functionality nonetheless.
|
||||
It's 2019, and a server that can't filter incoming connections is a server that belongs to **The
|
||||
Enemy**. That's why Sunbeam, minimal as it is, includes this functionality nonetheless.
|
||||
You can create the file `.nosunbeam` in the directory where Sunbeam runs. Sunbeam will read it,
|
||||
parse each line as an IP address (v4 or v6 are both fine), and immediately drop any incoming
|
||||
connections from those addresses.
|
||||
TCP connections or UDP packets from those addresses.
|
||||
|
||||
---
|
||||
|
||||
|
@ -60,7 +70,12 @@ difference being that `p` causes the listener socket to bind to `127.0.0.1` inst
|
|||
address, while `w` keeps the binding public but checks all incoming connections against a hash
|
||||
table of allowed addresses and drops them if they are not found. `p` is likely a little bit
|
||||
faster, but only `w` will print a notification to the console whenever a connection is rejected.
|
||||
|
||||
If a port is suffixed with both `w` and `p`, whichever is listed last will take precedence.
|
||||
|
||||
Addresses specified on the command line for Sunbeam to contact as a client are not subject to the
|
||||
blacklist or whitelist, since they are static during runtime and it is assumed that the user will
|
||||
not attempt to connect to a hostile host.
|
||||
|
||||
---
|
||||
|
||||
|
|
586
sunbeam.rs
586
sunbeam.rs
|
@ -1,13 +1,19 @@
|
|||
use std::collections::{VecDeque,HashSet};
|
||||
/*
|
||||
Sunbeam Network Relay v0.3
|
||||
Elizabeth Evelene Amelia Diode
|
||||
June 2019
|
||||
*/
|
||||
|
||||
use std::collections::{VecDeque,HashSet,HashMap};
|
||||
use std::env::args;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::io::prelude::*;
|
||||
use std::net::{TcpListener,TcpStream,SocketAddr,IpAddr};
|
||||
use std::net::{TcpListener,TcpStream,UdpSocket,SocketAddr,IpAddr};
|
||||
use std::process::exit;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration,Instant};
|
||||
|
||||
static IDLE_THRESH:u64 = 256; // cycles of doing nothing before throttling back to slightly slower execution speed
|
||||
static IDLE_SCALING:u64 = 1; // after crossing IDLE_THRESH, increase cycle delay by this many microseconds per cycle
|
||||
|
@ -16,19 +22,75 @@ static IDLE_MAX:u64 = 65535; // maximum number of idle cycles to count up befo
|
|||
|
||||
static RECONNECT_WAIT:u64 = 1000; // time in ms to wait between reconnect attempts
|
||||
|
||||
static BUFFERSIZE_INITIAL:usize = 511; // how big the buffer capacity should be to start: bigger = maybe faster but more memory usage
|
||||
|
||||
static LOOPBACK_FLAG:&str = "-l"; // flag passed to sunbeam to activate loopback mode
|
||||
static BLOCKFILE_NAME:&str = ".nosunbeam"; // file to check for as the client blacklist
|
||||
static ALLOWFILE_NAME:&str = ".yesunbeam"; // file to check for as the client whitelist, for Whitelist-mode connectors
|
||||
|
||||
static PORT_PREFIXES:[char;2] = ['u','t'];
|
||||
static PORT_SUFFIXES:[char;2] = ['p','w'];
|
||||
static HEARTBEAT_WAIT:u64 = 6000;
|
||||
static HEARTBEAT_DROP:u64 = 60000;
|
||||
|
||||
#[derive(PartialEq)]
|
||||
#[derive(Clone)]
|
||||
enum PortPrivacy {
|
||||
Public,
|
||||
Whitelist,
|
||||
Private,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum PortProtocol {
|
||||
Udp,
|
||||
Tcp,
|
||||
Both,
|
||||
}
|
||||
|
||||
enum Connector {
|
||||
Host {
|
||||
tcp:Option<TcpHost>,
|
||||
udp:Option<UdpHost>,
|
||||
},
|
||||
Guest {
|
||||
address:SocketAddr,
|
||||
tcp:Option<TcpGuest>,
|
||||
udp:Option<UdpGuest>,
|
||||
},
|
||||
}
|
||||
|
||||
struct TcpHost {
|
||||
port:u16,
|
||||
privacy:PortPrivacy,
|
||||
listener:TcpListener,
|
||||
clients:VecDeque<TcpClient>,
|
||||
}
|
||||
|
||||
struct UdpHost {
|
||||
port:u16,
|
||||
privacy:PortPrivacy,
|
||||
socket:UdpSocket,
|
||||
clients:HashMap<SocketAddr,UdpClient>,
|
||||
}
|
||||
|
||||
struct TcpGuest {
|
||||
stream:TcpStream,
|
||||
}
|
||||
|
||||
struct UdpGuest {
|
||||
socket:UdpSocket,
|
||||
lastheartbeat:Instant,
|
||||
}
|
||||
|
||||
struct TcpClient {
|
||||
address:SocketAddr,
|
||||
stream:TcpStream,
|
||||
}
|
||||
|
||||
struct UdpClient {
|
||||
address:SocketAddr,
|
||||
lastheartbeat:Instant,
|
||||
}
|
||||
|
||||
fn load_rules(rulefilepath:&str) -> HashSet<IpAddr> {
|
||||
let mut rules:HashSet<IpAddr> = HashSet::new();
|
||||
match File::open(rulefilepath) {
|
||||
|
@ -65,38 +127,58 @@ fn load_rules(rulefilepath:&str) -> HashSet<IpAddr> {
|
|||
fn main() {
|
||||
let argv = args().collect::<Vec<String>>();
|
||||
if argv.len() < 2 {
|
||||
eprintln!("-!- usage: sunbeam [local port]... [remote resource address]:[remote port]...");
|
||||
eprintln!("-!- usage: sunbeam [protocol][local port][privacy]... [protocol][remote resource address]:[remote port]...");
|
||||
eprintln!();
|
||||
eprintln!(" prefix ports or addresses with a letter to indicate transport protocol:");
|
||||
eprintln!(" t - tcp");
|
||||
eprintln!(" u - udp");
|
||||
eprintln!(" none - both tcp and udp");
|
||||
eprintln!();
|
||||
eprintln!(" suffix ports with a letter to indicate privacy:");
|
||||
eprintln!(" p - local-only");
|
||||
eprintln!(" w - addresses in .yesunbeam whitelist only");
|
||||
eprintln!(" none - all except addresses in .nosunbeam blacklist");
|
||||
exit(1);
|
||||
}
|
||||
let mut loopbackmode:bool = false;
|
||||
let mut addresses:Vec<SocketAddr> = Vec::new();
|
||||
let mut ports:Vec<(PortPrivacy,u16)> = Vec::new();
|
||||
let mut addresses:Vec<(PortProtocol,SocketAddr)> = Vec::new();
|
||||
let mut ports:Vec<(PortProtocol,PortPrivacy,u16)> = Vec::new();
|
||||
|
||||
for arg in argv[1..].iter() {
|
||||
if arg == LOOPBACK_FLAG {
|
||||
loopbackmode = true;
|
||||
} else {
|
||||
match arg.trim_end_matches(&['p','w'][..]).parse::<u16>() {
|
||||
Err(_why) => {
|
||||
match arg.parse::<SocketAddr>() {
|
||||
Err(_why) => {
|
||||
eprintln!("-!- could not parse `{}` as a socket address or port number",arg);
|
||||
},
|
||||
Ok(address) => addresses.push(address),
|
||||
};
|
||||
},
|
||||
Ok(n) => match arg.chars().last() {
|
||||
Some('w') => {
|
||||
println!("-i- accepting only connections whitelisted in {} on port {}",ALLOWFILE_NAME,n);
|
||||
ports.push((PortPrivacy::Whitelist,n));
|
||||
},
|
||||
Some('p') => {
|
||||
println!("-i- accepting only local connections on port {}",n);
|
||||
ports.push((PortPrivacy::Private,n));
|
||||
},
|
||||
_ => ports.push((PortPrivacy::Public,n)),
|
||||
},
|
||||
};
|
||||
}
|
||||
let strippedarg = arg.trim_end_matches(&PORT_SUFFIXES[..]).trim_start_matches(&PORT_PREFIXES[..]);
|
||||
match strippedarg.parse::<u16>() {
|
||||
Err(_why) => {
|
||||
match strippedarg.parse::<SocketAddr>() {
|
||||
Err(_why) => {
|
||||
eprintln!("-!- could not parse `{}` as a socket address or port number",arg);
|
||||
},
|
||||
Ok(address) => {
|
||||
let protocol = match arg.chars().next() {
|
||||
Some('t') => PortProtocol::Tcp,
|
||||
Some('u') => PortProtocol::Udp,
|
||||
_ => PortProtocol::Both,
|
||||
};
|
||||
addresses.push((protocol,address));
|
||||
},
|
||||
};
|
||||
},
|
||||
Ok(n) => {
|
||||
let privacy = match arg.chars().last() {
|
||||
Some('w') => PortPrivacy::Whitelist,
|
||||
Some('p') => PortPrivacy::Private,
|
||||
_ => PortPrivacy::Public,
|
||||
};
|
||||
let protocol = match arg.chars().next() {
|
||||
Some('t') => PortProtocol::Tcp,
|
||||
Some('u') => PortProtocol::Udp,
|
||||
_ => PortProtocol::Both,
|
||||
};
|
||||
ports.push((protocol,privacy,n));
|
||||
}
|
||||
};
|
||||
}
|
||||
loopbackmode |= (addresses.len() == 0 && ports.len() == 1) || (addresses.len() > 0 && ports.len() == 0);
|
||||
if loopbackmode {
|
||||
|
@ -106,140 +188,364 @@ fn main() {
|
|||
let blocklist = load_rules(BLOCKFILE_NAME);
|
||||
let allowlist = load_rules(ALLOWFILE_NAME);
|
||||
|
||||
let mut connectors:VecDeque<(PortPrivacy,u16,TcpListener,VecDeque<(TcpStream,SocketAddr)>)> = VecDeque::new();
|
||||
while let Some((privacy,port)) = ports.pop() {
|
||||
let heartbeat_holdoff = Duration::from_millis(HEARTBEAT_WAIT);
|
||||
let heartbeat_timeout = Duration::from_millis(HEARTBEAT_DROP);
|
||||
|
||||
let mut connectors:VecDeque<Connector> = VecDeque::new();
|
||||
|
||||
while let Some((protocol,privacy,port)) = ports.pop() {
|
||||
let bindaddress:&str = match privacy {
|
||||
PortPrivacy::Private => "127.0.0.1",
|
||||
_ => "[::]",
|
||||
};
|
||||
match TcpListener::bind(&format!("{}:{}",bindaddress,port)) {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to bind TCP listener: {}",why);
|
||||
exit(1);
|
||||
},
|
||||
Ok(listener) => {
|
||||
listener.set_nonblocking(true).expect("cannot set listener to nonblocking");
|
||||
connectors.push_back((privacy,port,listener,VecDeque::new()));
|
||||
},
|
||||
let privstring:&str = match privacy {
|
||||
PortPrivacy::Private => "local loopback addresses only",
|
||||
PortPrivacy::Whitelist => "addresses listed in .yesunbeam only",
|
||||
PortPrivacy::Public => "all addresses not listed in .nosunbeam",
|
||||
};
|
||||
let mut tcp:Option<TcpHost> = None;
|
||||
let mut udp:Option<UdpHost> = None;
|
||||
if protocol == PortProtocol::Tcp || protocol == PortProtocol::Both {
|
||||
match TcpListener::bind(&format!("{}:{}",bindaddress,port)) {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to bind TCP listener: {}",why);
|
||||
exit(1);
|
||||
},
|
||||
Ok(listener) => {
|
||||
listener.set_nonblocking(true).expect("cannot set listener to nonblocking");
|
||||
println!("-i- tcp port {}: accepting requests from {}",port,privstring);
|
||||
tcp = Some(TcpHost {
|
||||
port:port,
|
||||
privacy:privacy.clone(),
|
||||
listener:listener,
|
||||
clients:VecDeque::new(),
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
if protocol == PortProtocol::Udp || protocol == PortProtocol::Both {
|
||||
match UdpSocket::bind(&format!("{}:{}",bindaddress,port)) {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to bind UDP socket: {}",why);
|
||||
exit(1);
|
||||
},
|
||||
Ok(socket) => {
|
||||
socket.set_nonblocking(true).expect("cannot set socket to nonblocking");
|
||||
println!("-i- udp port {}: accepting requests from {}",port,privstring);
|
||||
udp = Some(UdpHost {
|
||||
port:port,
|
||||
privacy:privacy.clone(),
|
||||
socket:socket,
|
||||
clients:HashMap::new(),
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
connectors.push_back(Connector::Host {
|
||||
tcp:tcp,
|
||||
udp:udp,
|
||||
});
|
||||
}
|
||||
|
||||
let mut udppackets:HashMap<SocketAddr,VecDeque<([u8;65535],usize)>> = HashMap::new();
|
||||
let guestudpsocket:UdpSocket = match UdpSocket::bind("[::]:0") {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to bind UDP socket to any port: {}",why);
|
||||
exit(1);
|
||||
},
|
||||
Ok(socket) => socket,
|
||||
};
|
||||
guestudpsocket.set_nonblocking(true).expect("cannot set source socket to nonblocking");
|
||||
|
||||
let mut broken_tcpsources:VecDeque<SocketAddr> = VecDeque::new();
|
||||
let mut returning_tcpsources:HashMap<SocketAddr,TcpStream> = HashMap::new();
|
||||
|
||||
while let Some((protocol,address)) = addresses.pop() {
|
||||
udppackets.insert(address.clone(),VecDeque::new());
|
||||
let mut tcp:Option<TcpGuest> = None;
|
||||
let mut udp:Option<UdpGuest> = None;
|
||||
if protocol == PortProtocol::Tcp || protocol == PortProtocol::Both {
|
||||
match TcpStream::connect(&address) {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to connect to remote resource ({}): {}. trying to reconnect...",address,why);
|
||||
broken_tcpsources.push_back(address);
|
||||
},
|
||||
Ok(stream) => {
|
||||
stream.set_nonblocking(true).expect("cannot set source stream to nonblocking");
|
||||
stream.set_nodelay(true).expect("cannot set source stream to nodelay");
|
||||
tcp = Some(TcpGuest {
|
||||
stream:stream
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
if protocol == PortProtocol::Udp || protocol == PortProtocol::Both {
|
||||
let socket = match UdpSocket::bind("[::]:0") {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to bind UDP socket to dynamic port: {}",why);
|
||||
exit(1);
|
||||
},
|
||||
Ok(s) => s,
|
||||
};
|
||||
let _ = socket.send_to(&[0x0a],&address);
|
||||
udp = Some(UdpGuest {
|
||||
socket:socket,
|
||||
lastheartbeat:Instant::now(),
|
||||
});
|
||||
}
|
||||
connectors.push_back(Connector::Guest {
|
||||
address:address.clone(),
|
||||
tcp:tcp,
|
||||
udp:udp,
|
||||
});
|
||||
}
|
||||
|
||||
let (maintx,mainrx) = mpsc::channel();
|
||||
|
||||
let mut idlecycles:u64 = 0;
|
||||
let mut bottle:Vec<u8> = Vec::with_capacity(BUFFERSIZE_INITIAL);
|
||||
|
||||
let mut sources:VecDeque<(TcpStream,SocketAddr)> = VecDeque::new();
|
||||
let mut brokensources:VecDeque<SocketAddr> = VecDeque::new();
|
||||
for address in addresses.iter() {
|
||||
match TcpStream::connect(&address) {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to connect to remote resource ({}): {}. trying to reconnect...",address,why);
|
||||
brokensources.push_back(address.clone());
|
||||
},
|
||||
Ok(stream) => {
|
||||
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
|
||||
stream.set_nodelay(true).expect("cannot set stream to nodelay");
|
||||
sources.push_back((stream,address.clone()));
|
||||
},
|
||||
};
|
||||
}
|
||||
let mut idlecycles:u64 = 0;
|
||||
|
||||
let mut tcp_buffer:[u8;65535] = [0x00;65535];
|
||||
let mut tcp_nrecv:usize = 0;
|
||||
|
||||
let mut udp_buffer:[u8;65535] = [0x00;65535];
|
||||
let mut udp_nrecv:usize = 0;
|
||||
|
||||
let mut sourceaddress:SocketAddr = "[::]:0".parse().expect("failed to parse socket address");
|
||||
|
||||
loop {
|
||||
|
||||
if idlecycles < IDLE_MAX {
|
||||
idlecycles += 1;
|
||||
}
|
||||
|
||||
bottle.clear();
|
||||
if let Some((mut source,address)) = sources.pop_front() {
|
||||
match source.read_to_end(&mut bottle) {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => {
|
||||
if loopbackmode {
|
||||
for (othersource,_otheraddress) in sources.iter_mut() {
|
||||
let _ = othersource.write_all(&bottle);
|
||||
|
||||
match guestudpsocket.recv_from(&mut udp_buffer) {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => (),
|
||||
io::ErrorKind::Interrupted => (),
|
||||
_ => {
|
||||
eprintln!("-!- failed to receive packets from UDP socket: {}",why);
|
||||
exit(1);
|
||||
},
|
||||
},
|
||||
Ok((nrecv,srcaddr)) => {
|
||||
if let Some(packets) = udppackets.get_mut(&srcaddr) {
|
||||
packets.push_back((udp_buffer.clone(),nrecv));
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(mut source) = connectors.pop_front() {
|
||||
match &mut source {
|
||||
Connector::Guest {address,tcp,udp} => {
|
||||
if let Some(stream) = returning_tcpsources.remove(&address) {
|
||||
*tcp = Some(TcpGuest {
|
||||
stream:stream,
|
||||
});
|
||||
}
|
||||
let mut broken:bool = false;
|
||||
if let Some(ref mut guest) = tcp {
|
||||
match guest.stream.read(&mut tcp_buffer) {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => (),
|
||||
_ => {
|
||||
eprintln!("-!- failed to receive data from remote resource ({}): {}. trying to reconnect...",address,why);
|
||||
broken_tcpsources.push_back(address.clone());
|
||||
broken = true;
|
||||
},
|
||||
},
|
||||
Ok(n) => if n == 0 {
|
||||
eprintln!("-!- remote host closed the connection. trying to reconnect...");
|
||||
broken_tcpsources.push_back(address.clone());
|
||||
broken = true;
|
||||
} else {
|
||||
tcp_nrecv = n;
|
||||
sourceaddress = address.clone();
|
||||
},
|
||||
};
|
||||
}
|
||||
if broken {
|
||||
*tcp = None;
|
||||
}
|
||||
if let Some(ref mut guest) = udp {
|
||||
if guest.lastheartbeat.elapsed() > heartbeat_holdoff {
|
||||
let _ = guest.socket.send_to(&[0x0a],*address);
|
||||
}
|
||||
match guest.socket.recv_from(&mut udp_buffer) {
|
||||
Err(why) => {
|
||||
eprintln!("-!- failed to receive UDP packets: {}",why);
|
||||
},
|
||||
Ok((nrecv,srcaddr)) => {
|
||||
if &srcaddr == address {
|
||||
udp_nrecv = nrecv;
|
||||
sourceaddress = srcaddr.clone();
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}, // guest connector
|
||||
Connector::Host {tcp,udp} => {
|
||||
if let Some(ref mut host) = tcp {
|
||||
match host.listener.accept() {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => (),
|
||||
_ => {
|
||||
eprintln!("-!- failed to accept new connection on port {}: {}",host.port,why);
|
||||
},
|
||||
},
|
||||
Ok((stream,address)) => {
|
||||
if blocklist.contains(&address.ip()) {
|
||||
println!("-x- refuse {}/tcp: {} (listed in {})",host.port,address,BLOCKFILE_NAME);
|
||||
} else if host.privacy == PortPrivacy::Whitelist && !allowlist.contains(&address.ip()) {
|
||||
println!("-x- refuse {}/tcp: {} (not listed in {})",host.port,address,ALLOWFILE_NAME);
|
||||
} else {
|
||||
println!("--> connect {}/tcp: {}",host.port,address);
|
||||
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
|
||||
stream.set_nodelay(true).expect("cannot set stream to nodelay");
|
||||
host.clients.push_back(TcpClient {
|
||||
address:address,
|
||||
stream:stream,
|
||||
});
|
||||
}
|
||||
},
|
||||
};
|
||||
if let Some(mut client) = host.clients.pop_front() {
|
||||
match client.stream.read(&mut tcp_buffer) {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => host.clients.push_back(client),
|
||||
_ => {
|
||||
println!("<-- disconnect {}: {} ({})",host.port,client.address,why);
|
||||
},
|
||||
},
|
||||
Ok(n) => if n == 0 {
|
||||
println!("<-- disconnect {}: {} (closed by remote endpoint)",host.port,client.address);
|
||||
} else {
|
||||
tcp_nrecv = n;
|
||||
host.clients.push_back(client);
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
if let Some(ref mut host) = udp {
|
||||
match host.socket.recv_from(&mut udp_buffer) {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => (),
|
||||
_ => {
|
||||
eprintln!("-!- failed to receive UDP packets on port {}: {}",host.port,why);
|
||||
},
|
||||
},
|
||||
Ok((n,srcaddr)) => {
|
||||
if blocklist.contains(&srcaddr.ip()) {
|
||||
println!("-x- refuse {}/udp: {} (listed in {})",host.port,srcaddr,BLOCKFILE_NAME);
|
||||
} else if host.privacy == PortPrivacy::Whitelist && !srcaddr.ip().is_loopback() && !allowlist.contains(&srcaddr.ip()) {
|
||||
println!("-x- refuse {}/udp: {} (not listed in {})",host.port,srcaddr,ALLOWFILE_NAME);
|
||||
} else {
|
||||
udp_nrecv = n;
|
||||
sourceaddress = srcaddr;
|
||||
let newclient = UdpClient {
|
||||
address:srcaddr.clone(),
|
||||
lastheartbeat:Instant::now(),
|
||||
};
|
||||
match host.clients.insert(srcaddr.clone(),newclient) {
|
||||
Some(_) => (),
|
||||
None => println!("--> join {}/udp: {}",host.port,srcaddr),
|
||||
};
|
||||
}
|
||||
},
|
||||
};
|
||||
let mut deadclients:Vec<SocketAddr> = Vec::new();
|
||||
for client in host.clients.values() {
|
||||
if client.lastheartbeat.elapsed() > heartbeat_timeout {
|
||||
deadclients.push(client.address.clone());
|
||||
println!("<-- drop {}/udp: {}",host.port,client.address);
|
||||
}
|
||||
}
|
||||
sources.push_back((source,address));
|
||||
},
|
||||
_ => {
|
||||
eprintln!("-!- failed to receive data from remote resource ({}): {}. trying to reconnect...",address,why);
|
||||
brokensources.push_back(address);
|
||||
},
|
||||
},
|
||||
Ok(_) => {
|
||||
eprintln!("-!- remote resource closed the connection. trying to reconnect...");
|
||||
brokensources.push_back(address);
|
||||
},
|
||||
};
|
||||
if bottle.len() > 0 {
|
||||
idlecycles = 0;
|
||||
}
|
||||
for (_privacy,_port,_listener,otherconnects) in connectors.iter_mut() {
|
||||
for (otherstream,_otheraddress) in otherconnects.iter_mut() {
|
||||
let _ = otherstream.write_all(&bottle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bottle.clear();
|
||||
if let Some((privacy,port,listener,mut connections)) = connectors.pop_front() {
|
||||
match listener.accept() {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => (),
|
||||
_ => {
|
||||
eprintln!("-!- failed to accept new connection: {}",why);
|
||||
},
|
||||
},
|
||||
Ok((stream,address)) => {
|
||||
if blocklist.contains(&address.ip()) {
|
||||
println!("-x- refuse {}: {} (listed in {})",port,address,BLOCKFILE_NAME);
|
||||
} else if privacy == PortPrivacy::Whitelist && !address.ip().is_loopback() && !allowlist.contains(&address.ip()) {
|
||||
println!("-x- refuse {}: {} (not listed in {})",port,address,ALLOWFILE_NAME);
|
||||
} else {
|
||||
println!("--> connect {}: {}",port,address);
|
||||
stream.set_nonblocking(true).expect("cannot set stream to nonblocking");
|
||||
stream.set_nodelay(true).expect("cannot set stream to nodelay");
|
||||
connections.push_back((stream,address));
|
||||
for addr in deadclients.iter() {
|
||||
host.clients.remove(&addr);
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
if let Some((mut stream,address)) = connections.pop_front() {
|
||||
match stream.read_to_end(&mut bottle) {
|
||||
Err(why) => match why.kind() {
|
||||
io::ErrorKind::WouldBlock => {
|
||||
if loopbackmode {
|
||||
for (otherstream,_otheraddress) in connections.iter_mut() {
|
||||
let _ = otherstream.write_all(&bottle);
|
||||
if loopbackmode {
|
||||
if let Some(ref mut host) = tcp {
|
||||
for client in host.clients.iter_mut() {
|
||||
if client.address == sourceaddress {
|
||||
continue;
|
||||
}
|
||||
if tcp_nrecv > 0 {
|
||||
let _ = client.stream.write_all(&tcp_buffer[..tcp_nrecv]);
|
||||
}
|
||||
if udp_nrecv > 0 {
|
||||
let _ = client.stream.write_all(&udp_buffer[..udp_nrecv]);
|
||||
}
|
||||
}
|
||||
connections.push_back((stream,address));
|
||||
},
|
||||
_ => {
|
||||
println!("<-- disconnect {}: {} ({})",port,address,why);
|
||||
},
|
||||
}
|
||||
if let Some(ref mut host) = udp {
|
||||
for client in host.clients.values() {
|
||||
if client.address == sourceaddress {
|
||||
continue;
|
||||
}
|
||||
if tcp_nrecv > 0 {
|
||||
let _ = host.socket.send_to(&tcp_buffer[..tcp_nrecv],&client.address);
|
||||
}
|
||||
if udp_nrecv > 0 {
|
||||
let _ = host.socket.send_to(&udp_buffer[..udp_nrecv],&client.address);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}, // host connector
|
||||
};
|
||||
for connector in connectors.iter_mut() {
|
||||
match connector {
|
||||
Connector::Host {tcp,udp} => {
|
||||
if let Some(ref mut host) = tcp {
|
||||
for client in host.clients.iter_mut() {
|
||||
if tcp_nrecv > 0 {
|
||||
let _ = client.stream.write_all(&tcp_buffer[..tcp_nrecv]);
|
||||
}
|
||||
if udp_nrecv > 0 {
|
||||
let _ = client.stream.write_all(&udp_buffer[..udp_nrecv]);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(ref mut host) = udp {
|
||||
for client in host.clients.values() {
|
||||
if tcp_nrecv > 0 {
|
||||
let _ = host.socket.send_to(&tcp_buffer[..tcp_nrecv],&client.address);
|
||||
}
|
||||
if udp_nrecv > 0 {
|
||||
let _ = host.socket.send_to(&udp_buffer[..udp_nrecv],&client.address);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(_) => {
|
||||
println!("<-- disconnect {}: {} (closed by remote endpoint)",port,address);
|
||||
Connector::Guest {address,tcp,udp} => {
|
||||
if let Some(ref mut guest) = tcp {
|
||||
if tcp_nrecv > 0 {
|
||||
let _ = guest.stream.write_all(&tcp_buffer[..tcp_nrecv]);
|
||||
}
|
||||
if udp_nrecv > 0 {
|
||||
let _ = guest.stream.write_all(&udp_buffer[..udp_nrecv]);
|
||||
}
|
||||
}
|
||||
if let Some(ref mut guest) = udp {
|
||||
if tcp_nrecv > 0 {
|
||||
let _ = guest.socket.send_to(&tcp_buffer[..tcp_nrecv],*address);
|
||||
}
|
||||
if udp_nrecv > 0 {
|
||||
let _ = guest.socket.send_to(&udp_buffer[..udp_nrecv],*address);
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
if bottle.len() > 0 {
|
||||
idlecycles = 0;
|
||||
}
|
||||
for (_privacy,_port,_listener,otherconnects) in connectors.iter_mut() {
|
||||
for (otherstream,_otheraddress) in otherconnects.iter_mut() {
|
||||
let _ = otherstream.write_all(&bottle);
|
||||
}
|
||||
}
|
||||
for (otherstream,_otheraddress) in sources.iter_mut() {
|
||||
let _ = otherstream.write_all(&bottle);
|
||||
}
|
||||
}
|
||||
connectors.push_back((privacy,port,listener,connections));
|
||||
if tcp_nrecv > 0 || udp_nrecv > 0 {
|
||||
idlecycles = 0;
|
||||
}
|
||||
tcp_nrecv = 0;
|
||||
udp_nrecv = 0;
|
||||
connectors.push_back(source);
|
||||
}
|
||||
|
||||
if let Some(threadaddress) = brokensources.pop_front() {
|
||||
if let Some(threadaddress) = broken_tcpsources.pop_front() {
|
||||
let threadtx = mpsc::Sender::clone(&maintx);
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
|
@ -256,8 +562,8 @@ fn main() {
|
|||
}
|
||||
});
|
||||
}
|
||||
if let Ok(reconnection) = mainrx.try_recv() {
|
||||
sources.push_back(reconnection);
|
||||
if let Ok((stream,address)) = mainrx.try_recv() {
|
||||
returning_tcpsources.insert(address,stream);
|
||||
}
|
||||
|
||||
if idlecycles > IDLE_THRESH {
|
||||
|
|
Loading…
Reference in New Issue