mirror of
https://github.com/dndx/phantun.git
synced 2025-01-19 06:19:30 +08:00
perf(client) use different UDP sockets for individual UDP connections
for better load sharing between threads This removes the bottleneck with a single listening UDP socket.
This commit is contained in:
parent
ae52531288
commit
86c6a3f801
@ -13,3 +13,4 @@ rand = { version = "0.8.4", features = ["small_rng"] }
|
|||||||
clap = "2.33.3"
|
clap = "2.33.3"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pretty_env_logger = "0.4.0"
|
pretty_env_logger = "0.4.0"
|
||||||
|
socket2 = { version = "0.4.2", features = ["all"] }
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use lru_time_cache::{LruCache, TimedEntry};
|
|
||||||
use phantom::fake_tcp::packet::MAX_PACKET_LEN;
|
use phantom::fake_tcp::packet::MAX_PACKET_LEN;
|
||||||
use phantom::fake_tcp::{Socket, Stack};
|
use phantom::fake_tcp::{Socket, Stack};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::net::{SocketAddr, SocketAddrV4};
|
use std::net::{SocketAddr, SocketAddrV4};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -13,6 +14,17 @@ use tokio_tun::TunBuilder;
|
|||||||
|
|
||||||
const UDP_TTL: Duration = Duration::from_secs(180);
|
const UDP_TTL: Duration = Duration::from_secs(180);
|
||||||
|
|
||||||
|
fn new_udp_reuseport(addr: SocketAddrV4) -> UdpSocket {
|
||||||
|
let udp_sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None).unwrap();
|
||||||
|
udp_sock.set_reuse_port(true).unwrap();
|
||||||
|
// from tokio-rs/mio/blob/master/src/sys/unix/net.rs
|
||||||
|
udp_sock.set_cloexec(true).unwrap();
|
||||||
|
udp_sock.set_nonblocking(true).unwrap();
|
||||||
|
udp_sock.bind(&socket2::SockAddr::from(addr)).unwrap();
|
||||||
|
let udp_sock: std::net::UdpSocket = udp_sock.into();
|
||||||
|
udp_sock.try_into().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
@ -63,21 +75,22 @@ async fn main() {
|
|||||||
|
|
||||||
info!("Created TUN device {}", tun.name());
|
info!("Created TUN device {}", tun.name());
|
||||||
|
|
||||||
let udp_sock = Arc::new(UdpSocket::bind(local_addr).await.unwrap());
|
let udp_sock = Arc::new(new_udp_reuseport(local_addr));
|
||||||
let connections = Arc::new(RwLock::new(
|
let connections = Arc::new(RwLock::new(HashMap::<SocketAddrV4, Arc<Socket>>::new()));
|
||||||
LruCache::<SocketAddrV4, Arc<Socket>>::with_expiry_duration(UDP_TTL),
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut stack = Stack::new(tun);
|
let mut stack = Stack::new(tun);
|
||||||
|
|
||||||
let main_loop = tokio::spawn(async move {
|
let main_loop = tokio::spawn(async move {
|
||||||
let mut buf_r = [0u8; MAX_PACKET_LEN];
|
let mut buf_r = [0u8; MAX_PACKET_LEN];
|
||||||
let mut cleanup_timer = time::interval(Duration::from_secs(5));
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Ok((size, SocketAddr::V4(addr))) = udp_sock.recv_from(&mut buf_r) => {
|
Ok((size, SocketAddr::V4(addr))) = udp_sock.recv_from(&mut buf_r) => {
|
||||||
if let Some(sock) = connections.read().await.peek(&addr) {
|
// seen UDP packet to listening socket, this means:
|
||||||
|
// 1. It is a new UDP connection, or
|
||||||
|
// 2. It is some extra packets not filtered by more specific
|
||||||
|
// connected UDP socket yet
|
||||||
|
if let Some(sock) = connections.read().await.get(&addr) {
|
||||||
sock.send(&buf_r[..size]).await;
|
sock.send(&buf_r[..size]).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -90,44 +103,60 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let sock = Arc::new(sock.unwrap());
|
let sock = Arc::new(sock.unwrap());
|
||||||
|
// send first packet
|
||||||
let res = sock.send(&buf_r[..size]).await;
|
let res = sock.send(&buf_r[..size]).await;
|
||||||
if res.is_none() {
|
if res.is_none() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(connections.write().await.insert(addr, sock.clone()).is_none());
|
assert!(connections.write().await.insert(addr, sock.clone()).is_none());
|
||||||
debug!("inserted fake TCP socket into LruCache");
|
debug!("inserted fake TCP socket into connection table");
|
||||||
let udp_sock = udp_sock.clone();
|
|
||||||
|
|
||||||
let connections = connections.clone();
|
let connections = connections.clone();
|
||||||
|
|
||||||
|
// spawn "fastpath" UDP socket and task, this will offload main task
|
||||||
|
// from forwarding UDP packets
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let mut buf_udp = [0u8; MAX_PACKET_LEN];
|
||||||
|
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
|
||||||
|
let udp_sock = new_udp_reuseport(local_addr);
|
||||||
|
udp_sock.connect(addr).await.unwrap();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut buf_r = [0u8; MAX_PACKET_LEN];
|
let read_timeout = time::sleep(UDP_TTL);
|
||||||
match sock.recv(&mut buf_r).await {
|
|
||||||
Some(size) => {
|
tokio::select! {
|
||||||
udp_sock.send_to(&buf_r[..size], addr).await.unwrap();
|
Ok(size) = udp_sock.recv(&mut buf_udp) => {
|
||||||
|
if sock.send(&buf_udp[..size]).await.is_none() {
|
||||||
|
connections.write().await.remove(&addr);
|
||||||
|
debug!("removed fake TCP socket from connections table");
|
||||||
|
return;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
None => {
|
res = sock.recv(&mut buf_tcp) => {
|
||||||
|
match res {
|
||||||
|
Some(size) => {
|
||||||
|
if size > 0 {
|
||||||
|
udp_sock.send(&buf_tcp[..size]).await.unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
connections.write().await.remove(&addr);
|
||||||
|
debug!("removed fake TCP socket from connections table");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = read_timeout => {
|
||||||
|
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);
|
||||||
connections.write().await.remove(&addr);
|
connections.write().await.remove(&addr);
|
||||||
debug!("removed fake TCP socket from LruCache");
|
debug!("removed fake TCP socket from connections table");
|
||||||
return;
|
return;
|
||||||
},
|
}
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
_ = cleanup_timer.tick() => {
|
|
||||||
let mut total = 0;
|
|
||||||
|
|
||||||
for c in connections.write().await.notify_iter() {
|
|
||||||
if let TimedEntry::Expired(_addr, sock) = c {
|
|
||||||
sock.close();
|
|
||||||
total += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Cleaned {} stale connections", total);
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -19,7 +19,7 @@ use tokio_tun::Tun;
|
|||||||
|
|
||||||
const TIMEOUT: time::Duration = time::Duration::from_secs(1);
|
const TIMEOUT: time::Duration = time::Duration::from_secs(1);
|
||||||
const RETRIES: usize = 6;
|
const RETRIES: usize = 6;
|
||||||
const MPSC_BUFFER_LEN: usize = 128;
|
const MPSC_BUFFER_LEN: usize = 512;
|
||||||
|
|
||||||
#[derive(Debug, Hash, Eq, PartialEq)]
|
#[derive(Debug, Hash, Eq, PartialEq)]
|
||||||
pub struct AddrTuple {
|
pub struct AddrTuple {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user