From 583cdbe3008f6dff84c3d628d96d5b8c28acbd9e Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Thu, 18 Nov 2021 20:31:57 -0800 Subject: [PATCH] perf(fake-tcp) reduce the number of `clone()` calls in hot path --- fake-tcp/src/lib.rs | 60 ++++++++++++++------------------------- phantun/src/bin/client.rs | 1 + phantun/src/bin/server.rs | 1 + 3 files changed, 23 insertions(+), 39 deletions(-) diff --git a/fake-tcp/src/lib.rs b/fake-tcp/src/lib.rs index 6cff4f4..3a1025d 100644 --- a/fake-tcp/src/lib.rs +++ b/fake-tcp/src/lib.rs @@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use tokio::sync::broadcast; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::watch; use tokio::sync::Mutex as AsyncMutex; use tokio::time; use tokio_tun::Tun; @@ -58,6 +57,7 @@ pub enum State { SynSent, SynReceived, Established, + Closed, } pub struct Socket { @@ -69,8 +69,6 @@ pub struct Socket { seq: AtomicU32, ack: AtomicU32, state: State, - closing_tx: watch::Sender<()>, - closing_rx: watch::Receiver<()>, } impl Socket { @@ -83,7 +81,6 @@ impl Socket { state: State, ) -> (Socket, Sender) { let (incoming_tx, incoming_rx) = mpsc::channel(MPSC_BUFFER_LEN); - let (closing_tx, closing_rx) = watch::channel(()); ( Socket { @@ -95,8 +92,6 @@ impl Socket { seq: AtomicU32::new(0), ack: AtomicU32::new(ack.unwrap_or(0)), state, - closing_tx, - closing_rx, }, incoming_tx, ) @@ -114,8 +109,6 @@ impl Socket { } pub async fn send(&self, payload: &[u8]) -> Option<()> { - let mut closing = self.closing_rx.clone(); - match self.state { State::Established => { let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload)); @@ -126,53 +119,40 @@ impl Socket { res.unwrap(); Some(()) }, - _ = closing.changed() => { - None - } } } + State::Closed => None, _ => unreachable!(), } } pub async fn recv(&self, buf: &mut [u8]) -> Option { - let mut closing = self.closing_rx.clone(); - match self.state { State::Established => { let mut incoming = self.incoming.lock().await; - tokio::select! { - Some(raw_buf) = incoming.recv() => { - let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf); + incoming.recv().await.and_then(|raw_buf| { + let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf); - if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { - info!("Connection {} reset by peer", self); - self.close(); - return None; - } - - let payload = tcp_packet.payload(); - - self.ack - .store(tcp_packet.get_sequence().wrapping_add(1), Ordering::Relaxed); - - buf[..payload.len()].copy_from_slice(payload); - - Some(payload.len()) - }, - _ = closing.changed() => { - None + if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { + info!("Connection {} reset by peer", self); + return None; } - } + + let payload = tcp_packet.payload(); + + self.ack + .store(tcp_packet.get_sequence().wrapping_add(1), Ordering::Relaxed); + + buf[..payload.len()].copy_from_slice(payload); + + Some(payload.len()) + }) } + State::Closed => None, _ => unreachable!(), } } - pub fn close(&self) { - self.closing_tx.send(()).unwrap(); - } - async fn accept(mut self) { for _ in 0..RETRIES { match self.state { @@ -272,6 +252,8 @@ impl Socket { impl Drop for Socket { fn drop(&mut self) { + self.state = State::Closed; + let tuple = AddrTuple::new(self.local_addr, self.remote_addr); // dissociates ourself from the dispatch map assert!(self.shared.tuples.write().unwrap().remove(&tuple).is_some()); @@ -282,7 +264,7 @@ impl Drop for Socket { if let Err(e) = self.tun.try_send(&buf) { warn!("Unable to send RST to remote end: {}", e); } - self.close(); + info!("Fake TCP connection to {} closed", self); } } diff --git a/phantun/src/bin/client.rs b/phantun/src/bin/client.rs index 3eed1cb..ac10883 100644 --- a/phantun/src/bin/client.rs +++ b/phantun/src/bin/client.rs @@ -108,6 +108,7 @@ async fn main() { } else { panic!("only IPv4 remote address is supported"); }; + info!("Remote address is: {}", remote_addr); let tun_local: Ipv4Addr = matches .value_of("tun_local") diff --git a/phantun/src/bin/server.rs b/phantun/src/bin/server.rs index 026079c..1d2b4d3 100644 --- a/phantun/src/bin/server.rs +++ b/phantun/src/bin/server.rs @@ -77,6 +77,7 @@ async fn main() { .expect("bad remote address or host") .next() .expect("unable to resolve remote host name"); + info!("Remote address is: {}", remote_addr); let tun_local: Ipv4Addr = matches .value_of("tun_local")