perf(phantun) spawn multiple threads for UDP send/receive

This commit is contained in:
Datong Sun 2022-04-09 06:04:05 -07:00
parent dff0c4ca28
commit 35f7b35ff5
3 changed files with 167 additions and 69 deletions

View File

@ -15,6 +15,7 @@ clap = { version = "3.0", features = ["cargo"] }
socket2 = { version = "0.4", features = ["all"] } socket2 = { version = "0.4", features = ["all"] }
fake-tcp = { path = "../fake-tcp", version = "0.2" } fake-tcp = { path = "../fake-tcp", version = "0.2" }
tokio = { version = "1.14", features = ["full"] } tokio = { version = "1.14", features = ["full"] }
tokio-util = "0.7"
log = "0.4" log = "0.4"
pretty_env_logger = "0.4" pretty_env_logger = "0.4"
tokio-tun = "0.5" tokio-tun = "0.5"

View File

@ -8,9 +8,10 @@ use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::sync::RwLock; use tokio::sync::{Notify, RwLock};
use tokio::time; use tokio::time;
use tokio_tun::TunBuilder; use tokio_tun::TunBuilder;
use tokio_util::sync::CancellationToken;
const UDP_TTL: Duration = Duration::from_secs(180); const UDP_TTL: Duration = Duration::from_secs(180);
@ -119,6 +120,8 @@ async fn main() {
.parse() .parse()
.expect("bad peer address for Tun interface"); .expect("bad peer address for Tun interface");
let num_cpus = num_cpus::get();
let tun = TunBuilder::new() let tun = TunBuilder::new()
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
.tap(false) // false (default): TUN, true: TAP. .tap(false) // false (default): TUN, true: TAP.
@ -126,7 +129,7 @@ async fn main() {
.up() // or set it up manually using `sudo ip link set <tun-name> up`. .up() // or set it up manually using `sudo ip link set <tun-name> up`.
.address(tun_local) .address(tun_local)
.destination(tun_peer) .destination(tun_peer)
.try_build_mq(num_cpus::get()) .try_build_mq(num_cpus)
.unwrap(); .unwrap();
info!("Created TUN device {}", tun[0].name()); info!("Created TUN device {}", tun[0].name());
@ -168,52 +171,85 @@ async fn main() {
assert!(connections.write().await.insert(addr, sock.clone()).is_none()); assert!(connections.write().await.insert(addr, sock.clone()).is_none());
debug!("inserted fake TCP socket into connection table"); debug!("inserted fake TCP socket into connection table");
let connections = connections.clone();
// spawn "fastpath" UDP socket and task, this will offload main task // spawn "fastpath" UDP socket and task, this will offload main task
// from forwarding UDP packets // from forwarding UDP packets
tokio::spawn(async move {
let mut buf_udp = [0u8; MAX_PACKET_LEN];
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
let udp_sock = new_udp_reuseport(local_addr);
udp_sock.connect(addr).await.unwrap();
let packet_received = Arc::new(Notify::new());
let quit = CancellationToken::new();
for i in 0..num_cpus {
let sock = sock.clone();
let quit = quit.child_token();
let packet_received = packet_received.clone();
tokio::spawn(async move {
let mut buf_udp = [0u8; MAX_PACKET_LEN];
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
let udp_sock = new_udp_reuseport(local_addr);
udp_sock.connect(addr).await.unwrap();
loop {
tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
debug!("removed fake TCP socket from connections table");
quit.cancel();
return;
}
packet_received.notify_one();
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
error!("Unable to send UDP packet to {}: {}, closing connection", e, addr);
quit.cancel();
return;
}
}
},
None => {
debug!("removed fake TCP socket from connections table");
quit.cancel();
return;
},
}
packet_received.notify_one();
},
_ = quit.cancelled() => {
debug!("worker {} terminated", i);
return;
},
};
}
});
}
let connections = connections.clone();
tokio::spawn(async move {
loop { loop {
let read_timeout = time::sleep(UDP_TTL); let read_timeout = time::sleep(UDP_TTL);
let packet_received_fut = packet_received.notified();
tokio::select! { tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");
return;
}
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
connections.write().await.remove(&addr);
error!("Unable to send UDP packet to {}: {}, closing connection", e, addr);
return;
}
}
},
None => {
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");
return;
},
}
},
_ = read_timeout => { _ = read_timeout => {
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL); info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);
connections.write().await.remove(&addr); connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table"); debug!("removed fake TCP socket from connections table");
quit.cancel();
return; return;
} },
}; _ = quit.cancelled() => {
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");
return;
},
_ = packet_received_fut => {},
}
} }
}); });
}, },

View File

@ -1,13 +1,36 @@
use clap::{crate_version, Arg, Command}; use clap::{crate_version, Arg, Command};
use fake_tcp::packet::MAX_PACKET_LEN; use fake_tcp::packet::MAX_PACKET_LEN;
use fake_tcp::Stack; use fake_tcp::Stack;
use log::{error, info}; use log::{debug, error, info};
use std::net::Ipv4Addr; use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::sync::Notify;
use tokio::time::{self, Duration}; use tokio::time::{self, Duration};
use tokio_tun::TunBuilder; use tokio_tun::TunBuilder;
use tokio_util::sync::CancellationToken;
const UDP_TTL: Duration = Duration::from_secs(180); const UDP_TTL: Duration = Duration::from_secs(180);
fn new_udp_reuseport(addr: SocketAddr) -> UdpSocket {
let udp_sock = socket2::Socket::new(
if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
},
socket2::Type::DGRAM,
None,
)
.unwrap();
udp_sock.set_reuse_port(true).unwrap();
// from tokio-rs/mio/blob/master/src/sys/unix/net.rs
udp_sock.set_cloexec(true).unwrap();
udp_sock.set_nonblocking(true).unwrap();
udp_sock.bind(&socket2::SockAddr::from(addr)).unwrap();
let udp_sock: std::net::UdpSocket = udp_sock.into();
udp_sock.try_into().unwrap()
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
@ -88,6 +111,8 @@ async fn main() {
.parse() .parse()
.expect("bad peer address for Tun interface"); .expect("bad peer address for Tun interface");
let num_cpus = num_cpus::get();
let tun = TunBuilder::new() let tun = TunBuilder::new()
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
.tap(false) // false (default): TUN, true: TAP. .tap(false) // false (default): TUN, true: TAP.
@ -95,7 +120,7 @@ async fn main() {
.up() // or set it up manually using `sudo ip link set <tun-name> up`. .up() // or set it up manually using `sudo ip link set <tun-name> up`.
.address(tun_local) .address(tun_local)
.destination(tun_peer) .destination(tun_peer)
.try_build_mq(num_cpus::get()) .try_build_mq(num_cpus)
.unwrap(); .unwrap();
info!("Created TUN device {}", tun[0].name()); info!("Created TUN device {}", tun[0].name());
@ -110,46 +135,82 @@ async fn main() {
let mut buf_tcp = [0u8; MAX_PACKET_LEN]; let mut buf_tcp = [0u8; MAX_PACKET_LEN];
loop { loop {
let sock = stack.accept().await; let sock = Arc::new(stack.accept().await);
info!("New connection: {}", sock); info!("New connection: {}", sock);
tokio::spawn(async move { let packet_received = Arc::new(Notify::new());
let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() { let quit = CancellationToken::new();
"0.0.0.0:0" let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() {
} else { "0.0.0.0:0"
"[::]:0" } else {
}) "[::]:0"
.await })
.unwrap(); .await
udp_sock.connect(remote_addr).await.unwrap(); .unwrap();
let local_addr = udp_sock.local_addr().unwrap();
drop(udp_sock);
for i in 0..num_cpus {
let sock = sock.clone();
let quit = quit.child_token();
let packet_received = packet_received.clone();
let udp_sock = new_udp_reuseport(local_addr);
tokio::spawn(async move {
udp_sock.connect(remote_addr).await.unwrap();
loop {
tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
quit.cancel();
return;
}
packet_received.notify_one();
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr);
quit.cancel();
return;
}
}
},
None => {
quit.cancel();
return;
},
}
packet_received.notify_one();
},
_ = quit.cancelled() => {
debug!("worker {} terminated", i);
return;
},
};
}
});
}
tokio::spawn(async move {
loop { loop {
let read_timeout = time::sleep(UDP_TTL); let read_timeout = time::sleep(UDP_TTL);
let packet_received_fut = packet_received.notified();
tokio::select! { tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
return;
}
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr);
return;
}
}
},
None => { return; },
}
},
_ = read_timeout => { _ = read_timeout => {
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL); info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);
quit.cancel();
return; return;
} },
}; _ = packet_received_fut => {},
}
} }
}); });
} }