From 35f7b35ff5352c90dca527e3a80bae58deca91d6 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Sat, 9 Apr 2022 06:04:05 -0700 Subject: [PATCH] perf(phantun) spawn multiple threads for UDP send/receive --- phantun/Cargo.toml | 1 + phantun/src/bin/client.rs | 108 +++++++++++++++++++++----------- phantun/src/bin/server.rs | 127 ++++++++++++++++++++++++++++---------- 3 files changed, 167 insertions(+), 69 deletions(-) diff --git a/phantun/Cargo.toml b/phantun/Cargo.toml index 1f5e4c2..b5b69d9 100644 --- a/phantun/Cargo.toml +++ b/phantun/Cargo.toml @@ -15,6 +15,7 @@ clap = { version = "3.0", features = ["cargo"] } socket2 = { version = "0.4", features = ["all"] } fake-tcp = { path = "../fake-tcp", version = "0.2" } tokio = { version = "1.14", features = ["full"] } +tokio-util = "0.7" log = "0.4" pretty_env_logger = "0.4" tokio-tun = "0.5" diff --git a/phantun/src/bin/client.rs b/phantun/src/bin/client.rs index c31ead6..8fb5278 100644 --- a/phantun/src/bin/client.rs +++ b/phantun/src/bin/client.rs @@ -8,9 +8,10 @@ use std::net::{Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; -use tokio::sync::RwLock; +use tokio::sync::{Notify, RwLock}; use tokio::time; use tokio_tun::TunBuilder; +use tokio_util::sync::CancellationToken; const UDP_TTL: Duration = Duration::from_secs(180); @@ -119,6 +120,8 @@ async fn main() { .parse() .expect("bad peer address for Tun interface"); + let num_cpus = num_cpus::get(); + let tun = TunBuilder::new() .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. .tap(false) // false (default): TUN, true: TAP. @@ -126,7 +129,7 @@ async fn main() { .up() // or set it up manually using `sudo ip link set up`. .address(tun_local) .destination(tun_peer) - .try_build_mq(num_cpus::get()) + .try_build_mq(num_cpus) .unwrap(); info!("Created TUN device {}", tun[0].name()); @@ -168,52 +171,85 @@ async fn main() { assert!(connections.write().await.insert(addr, sock.clone()).is_none()); debug!("inserted fake TCP socket into connection table"); - let connections = connections.clone(); - // spawn "fastpath" UDP socket and task, this will offload main task // from forwarding UDP packets - tokio::spawn(async move { - let mut buf_udp = [0u8; MAX_PACKET_LEN]; - let mut buf_tcp = [0u8; MAX_PACKET_LEN]; - let udp_sock = new_udp_reuseport(local_addr); - udp_sock.connect(addr).await.unwrap(); + let packet_received = Arc::new(Notify::new()); + let quit = CancellationToken::new(); + + for i in 0..num_cpus { + let sock = sock.clone(); + let quit = quit.child_token(); + let packet_received = packet_received.clone(); + + tokio::spawn(async move { + let mut buf_udp = [0u8; MAX_PACKET_LEN]; + let mut buf_tcp = [0u8; MAX_PACKET_LEN]; + let udp_sock = new_udp_reuseport(local_addr); + udp_sock.connect(addr).await.unwrap(); + + loop { + tokio::select! { + Ok(size) = udp_sock.recv(&mut buf_udp) => { + if sock.send(&buf_udp[..size]).await.is_none() { + debug!("removed fake TCP socket from connections table"); + quit.cancel(); + return; + } + + packet_received.notify_one(); + }, + res = sock.recv(&mut buf_tcp) => { + match res { + Some(size) => { + if size > 0 { + if let Err(e) = udp_sock.send(&buf_tcp[..size]).await { + error!("Unable to send UDP packet to {}: {}, closing connection", e, addr); + quit.cancel(); + return; + } + } + }, + None => { + debug!("removed fake TCP socket from connections table"); + quit.cancel(); + return; + }, + } + + packet_received.notify_one(); + }, + _ = quit.cancelled() => { + debug!("worker {} terminated", i); + return; + }, + }; + } + }); + } + + let connections = connections.clone(); + tokio::spawn(async move { loop { let read_timeout = time::sleep(UDP_TTL); + let packet_received_fut = packet_received.notified(); tokio::select! { - Ok(size) = udp_sock.recv(&mut buf_udp) => { - if sock.send(&buf_udp[..size]).await.is_none() { - connections.write().await.remove(&addr); - debug!("removed fake TCP socket from connections table"); - return; - } - }, - res = sock.recv(&mut buf_tcp) => { - match res { - Some(size) => { - if size > 0 { - if let Err(e) = udp_sock.send(&buf_tcp[..size]).await { - connections.write().await.remove(&addr); - error!("Unable to send UDP packet to {}: {}, closing connection", e, addr); - return; - } - } - }, - None => { - connections.write().await.remove(&addr); - debug!("removed fake TCP socket from connections table"); - return; - }, - } - }, _ = read_timeout => { info!("No traffic seen in the last {:?}, closing connection", UDP_TTL); connections.write().await.remove(&addr); debug!("removed fake TCP socket from connections table"); + + quit.cancel(); return; - } - }; + }, + _ = quit.cancelled() => { + connections.write().await.remove(&addr); + debug!("removed fake TCP socket from connections table"); + return; + }, + _ = packet_received_fut => {}, + } } }); }, diff --git a/phantun/src/bin/server.rs b/phantun/src/bin/server.rs index 6ab68a9..1ab2864 100644 --- a/phantun/src/bin/server.rs +++ b/phantun/src/bin/server.rs @@ -1,13 +1,36 @@ use clap::{crate_version, Arg, Command}; use fake_tcp::packet::MAX_PACKET_LEN; use fake_tcp::Stack; -use log::{error, info}; -use std::net::Ipv4Addr; +use log::{debug, error, info}; +use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::Arc; use tokio::net::UdpSocket; +use tokio::sync::Notify; use tokio::time::{self, Duration}; use tokio_tun::TunBuilder; +use tokio_util::sync::CancellationToken; const UDP_TTL: Duration = Duration::from_secs(180); +fn new_udp_reuseport(addr: SocketAddr) -> UdpSocket { + let udp_sock = socket2::Socket::new( + if addr.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }, + socket2::Type::DGRAM, + None, + ) + .unwrap(); + udp_sock.set_reuse_port(true).unwrap(); + // from tokio-rs/mio/blob/master/src/sys/unix/net.rs + udp_sock.set_cloexec(true).unwrap(); + udp_sock.set_nonblocking(true).unwrap(); + udp_sock.bind(&socket2::SockAddr::from(addr)).unwrap(); + let udp_sock: std::net::UdpSocket = udp_sock.into(); + udp_sock.try_into().unwrap() +} + #[tokio::main] async fn main() { pretty_env_logger::init(); @@ -88,6 +111,8 @@ async fn main() { .parse() .expect("bad peer address for Tun interface"); + let num_cpus = num_cpus::get(); + let tun = TunBuilder::new() .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. .tap(false) // false (default): TUN, true: TAP. @@ -95,7 +120,7 @@ async fn main() { .up() // or set it up manually using `sudo ip link set up`. .address(tun_local) .destination(tun_peer) - .try_build_mq(num_cpus::get()) + .try_build_mq(num_cpus) .unwrap(); info!("Created TUN device {}", tun[0].name()); @@ -110,46 +135,82 @@ async fn main() { let mut buf_tcp = [0u8; MAX_PACKET_LEN]; loop { - let sock = stack.accept().await; + let sock = Arc::new(stack.accept().await); info!("New connection: {}", sock); - tokio::spawn(async move { - let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() { - "0.0.0.0:0" - } else { - "[::]:0" - }) - .await - .unwrap(); - udp_sock.connect(remote_addr).await.unwrap(); + let packet_received = Arc::new(Notify::new()); + let quit = CancellationToken::new(); + let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + }) + .await + .unwrap(); + let local_addr = udp_sock.local_addr().unwrap(); + drop(udp_sock); + for i in 0..num_cpus { + let sock = sock.clone(); + let quit = quit.child_token(); + let packet_received = packet_received.clone(); + let udp_sock = new_udp_reuseport(local_addr); + + tokio::spawn(async move { + udp_sock.connect(remote_addr).await.unwrap(); + + loop { + tokio::select! { + Ok(size) = udp_sock.recv(&mut buf_udp) => { + if sock.send(&buf_udp[..size]).await.is_none() { + quit.cancel(); + return; + } + + packet_received.notify_one(); + }, + res = sock.recv(&mut buf_tcp) => { + match res { + Some(size) => { + if size > 0 { + if let Err(e) = udp_sock.send(&buf_tcp[..size]).await { + error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr); + quit.cancel(); + return; + } + } + }, + None => { + quit.cancel(); + return; + }, + } + + packet_received.notify_one(); + }, + _ = quit.cancelled() => { + debug!("worker {} terminated", i); + return; + }, + }; + } + }); + } + + tokio::spawn(async move { loop { let read_timeout = time::sleep(UDP_TTL); + let packet_received_fut = packet_received.notified(); tokio::select! { - Ok(size) = udp_sock.recv(&mut buf_udp) => { - if sock.send(&buf_udp[..size]).await.is_none() { - return; - } - }, - res = sock.recv(&mut buf_tcp) => { - match res { - Some(size) => { - if size > 0 { - if let Err(e) = udp_sock.send(&buf_tcp[..size]).await { - error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr); - return; - } - } - }, - None => { return; }, - } - }, _ = read_timeout => { info!("No traffic seen in the last {:?}, closing connection", UDP_TTL); + + quit.cancel(); return; - } - }; + }, + _ = packet_received_fut => {}, + } } }); }