perf(fake-tcp) use flume to avoid locking in receiver, improved single

connection performance by 300%
This commit is contained in:
Datong Sun 2022-04-10 04:57:43 -07:00
parent 55da4d6a62
commit 581d80d08c
2 changed files with 17 additions and 17 deletions

View File

@ -22,3 +22,4 @@ rand = { version = "0.8", features = ["small_rng"] }
log = "0.4" log = "0.4"
internet-checksum = "0.2" internet-checksum = "0.2"
tokio-tun = "0.5" tokio-tun = "0.5"
flume = "0.10"

View File

@ -53,14 +53,14 @@ use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time; use tokio::time;
use tokio_tun::Tun; use tokio_tun::Tun;
const TIMEOUT: time::Duration = time::Duration::from_secs(1); const TIMEOUT: time::Duration = time::Duration::from_secs(1);
const RETRIES: usize = 6; const RETRIES: usize = 6;
const MPSC_BUFFER_LEN: usize = 512; const MPMC_BUFFER_LEN: usize = 512;
const MPSC_BUFFER_LEN: usize = 128;
const MAX_UNACKED_LEN: u32 = 128 * 1024 * 1024; // 128MB const MAX_UNACKED_LEN: u32 = 128 * 1024 * 1024; // 128MB
#[derive(Hash, Eq, PartialEq, Clone, Debug)] #[derive(Hash, Eq, PartialEq, Clone, Debug)]
@ -79,17 +79,17 @@ impl AddrTuple {
} }
struct Shared { struct Shared {
tuples: RwLock<HashMap<AddrTuple, Sender<Bytes>>>, tuples: RwLock<HashMap<AddrTuple, flume::Sender<Bytes>>>,
listening: RwLock<HashSet<u16>>, listening: RwLock<HashSet<u16>>,
tun: Vec<Arc<Tun>>, tun: Vec<Arc<Tun>>,
ready: Sender<Socket>, ready: mpsc::Sender<Socket>,
tuples_purge: broadcast::Sender<AddrTuple>, tuples_purge: broadcast::Sender<AddrTuple>,
} }
pub struct Stack { pub struct Stack {
shared: Arc<Shared>, shared: Arc<Shared>,
local_ip: Ipv4Addr, local_ip: Ipv4Addr,
ready: Receiver<Socket>, ready: mpsc::Receiver<Socket>,
} }
pub enum State { pub enum State {
@ -102,7 +102,7 @@ pub enum State {
pub struct Socket { pub struct Socket {
shared: Arc<Shared>, shared: Arc<Shared>,
tun: Arc<Tun>, tun: Arc<Tun>,
incoming: AsyncMutex<Receiver<Bytes>>, incoming: flume::Receiver<Bytes>,
local_addr: SocketAddrV4, local_addr: SocketAddrV4,
remote_addr: SocketAddrV4, remote_addr: SocketAddrV4,
seq: AtomicU32, seq: AtomicU32,
@ -126,14 +126,14 @@ impl Socket {
remote_addr: SocketAddrV4, remote_addr: SocketAddrV4,
ack: Option<u32>, ack: Option<u32>,
state: State, state: State,
) -> (Socket, Sender<Bytes>) { ) -> (Socket, flume::Sender<Bytes>) {
let (incoming_tx, incoming_rx) = mpsc::channel(MPSC_BUFFER_LEN); let (incoming_tx, incoming_rx) = flume::bounded(MPMC_BUFFER_LEN);
( (
Socket { Socket {
shared, shared,
tun, tun,
incoming: AsyncMutex::new(incoming_rx), incoming: incoming_rx,
local_addr, local_addr,
remote_addr, remote_addr,
seq: AtomicU32::new(0), seq: AtomicU32::new(0),
@ -187,8 +187,7 @@ impl Socket {
pub async fn recv(&self, buf: &mut [u8]) -> Option<usize> { pub async fn recv(&self, buf: &mut [u8]) -> Option<usize> {
match self.state { match self.state {
State::Established => { State::Established => {
let mut incoming = self.incoming.lock().await; self.incoming.recv_async().await.ok().and_then(|raw_buf| {
incoming.recv().await.and_then(|raw_buf| {
let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf); let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf);
if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 {
@ -231,7 +230,7 @@ impl Socket {
info!("Sent SYN + ACK to client"); info!("Sent SYN + ACK to client");
} }
State::SynReceived => { State::SynReceived => {
let res = time::timeout(TIMEOUT, self.incoming.lock().await.recv()).await; let res = time::timeout(TIMEOUT, self.incoming.recv_async()).await;
if let Ok(buf) = res { if let Ok(buf) = res {
let buf = buf.unwrap(); let buf = buf.unwrap();
let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf); let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf);
@ -275,7 +274,7 @@ impl Socket {
info!("Sent SYN to server"); info!("Sent SYN to server");
} }
State::SynSent => { State::SynSent => {
match time::timeout(TIMEOUT, self.incoming.lock().await.recv()).await { match time::timeout(TIMEOUT, self.incoming.recv_async()).await {
Ok(buf) => { Ok(buf) => {
let buf = buf.unwrap(); let buf = buf.unwrap();
let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf); let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf);
@ -426,7 +425,7 @@ impl Stack {
shared: Arc<Shared>, shared: Arc<Shared>,
mut tuples_purge: broadcast::Receiver<AddrTuple>, mut tuples_purge: broadcast::Receiver<AddrTuple>,
) { ) {
let mut tuples: HashMap<AddrTuple, Sender<Bytes>> = HashMap::new(); let mut tuples: HashMap<AddrTuple, flume::Sender<Bytes>> = HashMap::new();
loop { loop {
let mut buf = BytesMut::with_capacity(MAX_PACKET_LEN); let mut buf = BytesMut::with_capacity(MAX_PACKET_LEN);
@ -450,7 +449,7 @@ impl Stack {
let tuple = AddrTuple::new(local_addr, remote_addr); let tuple = AddrTuple::new(local_addr, remote_addr);
if let Some(c) = tuples.get(&tuple) { if let Some(c) = tuples.get(&tuple) {
if c.send(buf).await.is_err() { if c.send_async(buf).await.is_err() {
trace!("Cache hit, but receiver already closed, dropping packet"); trace!("Cache hit, but receiver already closed, dropping packet");
} }
@ -469,7 +468,7 @@ impl Stack {
if let Some(c) = sender { if let Some(c) = sender {
trace!("Storing connection information into local tuples"); trace!("Storing connection information into local tuples");
tuples.insert(tuple, c.clone()); tuples.insert(tuple, c.clone());
c.send(buf).await.unwrap(); c.send_async(buf).await.unwrap();
continue; continue;
} }
} }