From 581d80d08c8152c8a6f870d6d57014008c72e2c1 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Sun, 10 Apr 2022 04:57:43 -0700 Subject: [PATCH] perf(fake-tcp) use flume to avoid locking in receiver, improved single connection performance by 300% --- fake-tcp/Cargo.toml | 1 + fake-tcp/src/lib.rs | 33 ++++++++++++++++----------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/fake-tcp/Cargo.toml b/fake-tcp/Cargo.toml index 39cc4d3..73d71e1 100644 --- a/fake-tcp/Cargo.toml +++ b/fake-tcp/Cargo.toml @@ -22,3 +22,4 @@ rand = { version = "0.8", features = ["small_rng"] } log = "0.4" internet-checksum = "0.2" tokio-tun = "0.5" +flume = "0.10" diff --git a/fake-tcp/src/lib.rs b/fake-tcp/src/lib.rs index 67bf3ea..27e4805 100644 --- a/fake-tcp/src/lib.rs +++ b/fake-tcp/src/lib.rs @@ -53,14 +53,14 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use tokio::sync::broadcast; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::mpsc; use tokio::time; use tokio_tun::Tun; const TIMEOUT: time::Duration = time::Duration::from_secs(1); const RETRIES: usize = 6; -const MPSC_BUFFER_LEN: usize = 512; +const MPMC_BUFFER_LEN: usize = 512; +const MPSC_BUFFER_LEN: usize = 128; const MAX_UNACKED_LEN: u32 = 128 * 1024 * 1024; // 128MB #[derive(Hash, Eq, PartialEq, Clone, Debug)] @@ -79,17 +79,17 @@ impl AddrTuple { } struct Shared { - tuples: RwLock>>, + tuples: RwLock>>, listening: RwLock>, tun: Vec>, - ready: Sender, + ready: mpsc::Sender, tuples_purge: broadcast::Sender, } pub struct Stack { shared: Arc, local_ip: Ipv4Addr, - ready: Receiver, + ready: mpsc::Receiver, } pub enum State { @@ -102,7 +102,7 @@ pub enum State { pub struct Socket { shared: Arc, tun: Arc, - incoming: AsyncMutex>, + incoming: flume::Receiver, local_addr: SocketAddrV4, remote_addr: SocketAddrV4, seq: AtomicU32, @@ -126,14 +126,14 @@ impl Socket { remote_addr: SocketAddrV4, ack: Option, state: State, - ) -> (Socket, Sender) { - let (incoming_tx, incoming_rx) = mpsc::channel(MPSC_BUFFER_LEN); + ) -> (Socket, flume::Sender) { + let (incoming_tx, incoming_rx) = flume::bounded(MPMC_BUFFER_LEN); ( Socket { shared, tun, - incoming: AsyncMutex::new(incoming_rx), + incoming: incoming_rx, local_addr, remote_addr, seq: AtomicU32::new(0), @@ -187,8 +187,7 @@ impl Socket { pub async fn recv(&self, buf: &mut [u8]) -> Option { match self.state { State::Established => { - let mut incoming = self.incoming.lock().await; - incoming.recv().await.and_then(|raw_buf| { + self.incoming.recv_async().await.ok().and_then(|raw_buf| { let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf); if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { @@ -231,7 +230,7 @@ impl Socket { info!("Sent SYN + ACK to client"); } State::SynReceived => { - let res = time::timeout(TIMEOUT, self.incoming.lock().await.recv()).await; + let res = time::timeout(TIMEOUT, self.incoming.recv_async()).await; if let Ok(buf) = res { let buf = buf.unwrap(); let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf); @@ -275,7 +274,7 @@ impl Socket { info!("Sent SYN to server"); } State::SynSent => { - match time::timeout(TIMEOUT, self.incoming.lock().await.recv()).await { + match time::timeout(TIMEOUT, self.incoming.recv_async()).await { Ok(buf) => { let buf = buf.unwrap(); let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf); @@ -426,7 +425,7 @@ impl Stack { shared: Arc, mut tuples_purge: broadcast::Receiver, ) { - let mut tuples: HashMap> = HashMap::new(); + let mut tuples: HashMap> = HashMap::new(); loop { let mut buf = BytesMut::with_capacity(MAX_PACKET_LEN); @@ -450,7 +449,7 @@ impl Stack { let tuple = AddrTuple::new(local_addr, remote_addr); if let Some(c) = tuples.get(&tuple) { - if c.send(buf).await.is_err() { + if c.send_async(buf).await.is_err() { trace!("Cache hit, but receiver already closed, dropping packet"); } @@ -469,7 +468,7 @@ impl Stack { if let Some(c) = sender { trace!("Storing connection information into local tuples"); tuples.insert(tuple, c.clone()); - c.send(buf).await.unwrap(); + c.send_async(buf).await.unwrap(); continue; } }