perf(fake-tcp) reduce the number of clone() calls in hot path

This commit is contained in:
Datong Sun 2021-11-18 20:31:57 -08:00
parent 91988520e5
commit 583cdbe300
3 changed files with 23 additions and 39 deletions

View File

@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::watch;
use tokio::sync::Mutex as AsyncMutex; use tokio::sync::Mutex as AsyncMutex;
use tokio::time; use tokio::time;
use tokio_tun::Tun; use tokio_tun::Tun;
@ -58,6 +57,7 @@ pub enum State {
SynSent, SynSent,
SynReceived, SynReceived,
Established, Established,
Closed,
} }
pub struct Socket { pub struct Socket {
@ -69,8 +69,6 @@ pub struct Socket {
seq: AtomicU32, seq: AtomicU32,
ack: AtomicU32, ack: AtomicU32,
state: State, state: State,
closing_tx: watch::Sender<()>,
closing_rx: watch::Receiver<()>,
} }
impl Socket { impl Socket {
@ -83,7 +81,6 @@ impl Socket {
state: State, state: State,
) -> (Socket, Sender<Bytes>) { ) -> (Socket, Sender<Bytes>) {
let (incoming_tx, incoming_rx) = mpsc::channel(MPSC_BUFFER_LEN); let (incoming_tx, incoming_rx) = mpsc::channel(MPSC_BUFFER_LEN);
let (closing_tx, closing_rx) = watch::channel(());
( (
Socket { Socket {
@ -95,8 +92,6 @@ impl Socket {
seq: AtomicU32::new(0), seq: AtomicU32::new(0),
ack: AtomicU32::new(ack.unwrap_or(0)), ack: AtomicU32::new(ack.unwrap_or(0)),
state, state,
closing_tx,
closing_rx,
}, },
incoming_tx, incoming_tx,
) )
@ -114,8 +109,6 @@ impl Socket {
} }
pub async fn send(&self, payload: &[u8]) -> Option<()> { pub async fn send(&self, payload: &[u8]) -> Option<()> {
let mut closing = self.closing_rx.clone();
match self.state { match self.state {
State::Established => { State::Established => {
let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload)); let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload));
@ -126,53 +119,40 @@ impl Socket {
res.unwrap(); res.unwrap();
Some(()) Some(())
}, },
_ = closing.changed() => {
None
}
} }
} }
State::Closed => None,
_ => unreachable!(), _ => unreachable!(),
} }
} }
pub async fn recv(&self, buf: &mut [u8]) -> Option<usize> { pub async fn recv(&self, buf: &mut [u8]) -> Option<usize> {
let mut closing = self.closing_rx.clone();
match self.state { match self.state {
State::Established => { State::Established => {
let mut incoming = self.incoming.lock().await; let mut incoming = self.incoming.lock().await;
tokio::select! { incoming.recv().await.and_then(|raw_buf| {
Some(raw_buf) = incoming.recv() => { let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf);
let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf);
if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 {
info!("Connection {} reset by peer", self); info!("Connection {} reset by peer", self);
self.close(); return None;
return None;
}
let payload = tcp_packet.payload();
self.ack
.store(tcp_packet.get_sequence().wrapping_add(1), Ordering::Relaxed);
buf[..payload.len()].copy_from_slice(payload);
Some(payload.len())
},
_ = closing.changed() => {
None
} }
}
let payload = tcp_packet.payload();
self.ack
.store(tcp_packet.get_sequence().wrapping_add(1), Ordering::Relaxed);
buf[..payload.len()].copy_from_slice(payload);
Some(payload.len())
})
} }
State::Closed => None,
_ => unreachable!(), _ => unreachable!(),
} }
} }
pub fn close(&self) {
self.closing_tx.send(()).unwrap();
}
async fn accept(mut self) { async fn accept(mut self) {
for _ in 0..RETRIES { for _ in 0..RETRIES {
match self.state { match self.state {
@ -272,6 +252,8 @@ impl Socket {
impl Drop for Socket { impl Drop for Socket {
fn drop(&mut self) { fn drop(&mut self) {
self.state = State::Closed;
let tuple = AddrTuple::new(self.local_addr, self.remote_addr); let tuple = AddrTuple::new(self.local_addr, self.remote_addr);
// dissociates ourself from the dispatch map // dissociates ourself from the dispatch map
assert!(self.shared.tuples.write().unwrap().remove(&tuple).is_some()); assert!(self.shared.tuples.write().unwrap().remove(&tuple).is_some());
@ -282,7 +264,7 @@ impl Drop for Socket {
if let Err(e) = self.tun.try_send(&buf) { if let Err(e) = self.tun.try_send(&buf) {
warn!("Unable to send RST to remote end: {}", e); warn!("Unable to send RST to remote end: {}", e);
} }
self.close();
info!("Fake TCP connection to {} closed", self); info!("Fake TCP connection to {} closed", self);
} }
} }

View File

@ -108,6 +108,7 @@ async fn main() {
} else { } else {
panic!("only IPv4 remote address is supported"); panic!("only IPv4 remote address is supported");
}; };
info!("Remote address is: {}", remote_addr);
let tun_local: Ipv4Addr = matches let tun_local: Ipv4Addr = matches
.value_of("tun_local") .value_of("tun_local")

View File

@ -77,6 +77,7 @@ async fn main() {
.expect("bad remote address or host") .expect("bad remote address or host")
.next() .next()
.expect("unable to resolve remote host name"); .expect("unable to resolve remote host name");
info!("Remote address is: {}", remote_addr);
let tun_local: Ipv4Addr = matches let tun_local: Ipv4Addr = matches
.value_of("tun_local") .value_of("tun_local")