Multi-stream TCP and UDP, encryption and performance

This commit is contained in:
Saber Haj Rabiee
2022-11-10 15:23:09 -08:00
parent b674268863
commit 65e200b1f2
8 changed files with 566 additions and 245 deletions

View File

@@ -18,8 +18,9 @@ benchmark = []
bytes = "1"
pnet = "0.31"
tokio = { version = "1.14", features = ["full"] }
rand = { version = "0.8", features = ["small_rng"] }
log = "0.4"
internet-checksum = "0.2"
tokio-tun = "0.7"
flume = "0.10"
fxhash = "0.2.1"
dashmap = "5.4.0"

View File

@@ -43,22 +43,23 @@
pub mod packet;
use bytes::{Bytes, BytesMut};
use dashmap::{mapref::entry::Entry, DashMap, DashSet};
use fxhash::FxBuildHasher;
use log::{error, info, trace, warn};
use packet::*;
use pnet::packet::{tcp, Packet};
use rand::prelude::*;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::time;
use tokio_tun::Tun;
const TIMEOUT: time::Duration = time::Duration::from_secs(1);
const RETRIES: usize = 6;
const TIMEOUT: time::Duration = time::Duration::from_secs(3);
const RETRIES: usize = 2;
const MPMC_BUFFER_LEN: usize = 512;
const MPSC_BUFFER_LEN: usize = 128;
const MAX_UNACKED_LEN: u32 = 128 * 1024 * 1024; // 128MB
@@ -79,9 +80,10 @@ impl AddrTuple {
}
struct Shared {
tuples: RwLock<HashMap<AddrTuple, flume::Sender<Bytes>>>,
listening: RwLock<HashSet<u16>>,
tuples: DashMap<AddrTuple, flume::Sender<Bytes>, FxBuildHasher>,
listening: DashSet<u16, FxBuildHasher>,
tun: Vec<Arc<Tun>>,
tun_index: AtomicUsize,
ready: mpsc::Sender<Socket>,
tuples_purge: broadcast::Sender<AddrTuple>,
}
@@ -322,7 +324,7 @@ impl Drop for Socket {
fn drop(&mut self) {
let tuple = AddrTuple::new(self.local_addr, self.remote_addr);
// dissociates ourself from the dispatch map
assert!(self.shared.tuples.write().unwrap().remove(&tuple).is_some());
assert!(self.shared.tuples.remove(&tuple).is_some());
// purge cache
self.shared.tuples_purge.send(tuple).unwrap();
@@ -364,9 +366,10 @@ impl Stack {
let (ready_tx, ready_rx) = mpsc::channel(MPSC_BUFFER_LEN);
let (tuples_purge_tx, _tuples_purge_rx) = broadcast::channel(16);
let shared = Arc::new(Shared {
tuples: RwLock::new(HashMap::new()),
tuples: DashMap::default(),
tun: tun.clone(),
listening: RwLock::new(HashSet::new()),
tun_index: AtomicUsize::new(0),
listening: DashSet::default(),
ready: ready_tx,
tuples_purge: tuples_purge_tx.clone(),
});
@@ -389,7 +392,7 @@ impl Stack {
/// Listens for incoming connections on the given `port`.
pub fn listen(&mut self, port: u16) {
assert!(self.shared.listening.write().unwrap().insert(port));
assert!(self.shared.listening.insert(port));
}
/// Accepts an incoming connection.
@@ -399,33 +402,38 @@ impl Stack {
/// Connects to the remote end. `None` returned means
/// the connection attempt failed.
pub async fn connect(&mut self, addr: SocketAddr) -> Option<Socket> {
let mut rng = SmallRng::from_entropy();
let local_port: u16 = rng.gen_range(1024..65535);
let local_addr = SocketAddr::new(
if addr.is_ipv4() {
IpAddr::V4(self.local_ip)
} else {
IpAddr::V6(self.local_ip6.expect("IPv6 local address undefined"))
},
local_port,
);
let tuple = AddrTuple::new(local_addr, addr);
let (mut sock, incoming) = Socket::new(
self.shared.clone(),
self.shared.tun.choose(&mut rng).unwrap().clone(),
local_addr,
addr,
None,
State::Idle,
);
{
let mut tuples = self.shared.tuples.write().unwrap();
assert!(tuples.insert(tuple, incoming.clone()).is_none());
pub async fn connect(&self, addr: SocketAddr) -> Option<Socket> {
for local_port in 1024..u16::MAX {
let local_addr = SocketAddr::new(
if addr.is_ipv4() {
IpAddr::V4(self.local_ip)
} else {
IpAddr::V6(self.local_ip6.expect("IPv6 local address undefined"))
},
local_port,
);
let tuple = AddrTuple::new(local_addr, addr);
let mut sock = match self.shared.tuples.entry(tuple) {
Entry::Occupied(_) => continue,
Entry::Vacant(v) => {
let tun_index = self.shared.tun_index.fetch_add(1, Ordering::Relaxed)
% self.shared.tun.len();
let tun = unsafe { self.shared.tun.get_unchecked(tun_index).clone() };
let (sock, incoming) = Socket::new(
self.shared.clone(),
tun,
local_addr,
addr,
None,
State::Idle,
);
v.insert(incoming.clone());
sock
}
};
return sock.connect().await.map(|_| sock);
}
sock.connect().await.map(|_| sock)
None
}
async fn reader_task(
@@ -433,7 +441,8 @@ impl Stack {
shared: Arc<Shared>,
mut tuples_purge: broadcast::Receiver<AddrTuple>,
) {
let mut tuples: HashMap<AddrTuple, flume::Sender<Bytes>> = HashMap::new();
let mut tuples: HashMap<AddrTuple, flume::Sender<Bytes>, FxBuildHasher> =
HashMap::default();
loop {
let mut buf = BytesMut::zeroed(MAX_PACKET_LEN);
@@ -462,10 +471,7 @@ impl Stack {
// path below
} else {
trace!("Cache miss, checking the shared tuples table for connection");
let sender = {
let tuples = shared.tuples.read().unwrap();
tuples.get(&tuple).cloned()
};
let sender = shared.tuples.get(&tuple);
if let Some(c) = sender {
trace!("Storing connection information into local tuples");
@@ -478,8 +484,6 @@ impl Stack {
if tcp_packet.get_flags() == tcp::TcpFlags::SYN
&& shared
.listening
.read()
.unwrap()
.contains(&tcp_packet.get_destination())
{
// SYN seen on listening socket
@@ -494,8 +498,6 @@ impl Stack {
);
assert!(shared
.tuples
.write()
.unwrap()
.insert(tuple, incoming)
.is_none());
tokio::spawn(sock.accept());
@@ -509,7 +511,11 @@ impl Stack {
tcp::TcpFlags::RST | tcp::TcpFlags::ACK,
None,
);
shared.tun[0].try_send(&buf).unwrap();
let tun_index = shared.tun_index.fetch_add(1, Ordering::Relaxed) % shared.tun.len();
let tun = unsafe {
shared.tun.get_unchecked(tun_index)
};
tun.try_send(&buf).unwrap();
}
} else if (tcp_packet.get_flags() & tcp::TcpFlags::RST) == 0 {
info!("Unknown TCP packet from {}, sending RST", remote_addr);
@@ -521,7 +527,11 @@ impl Stack {
tcp::TcpFlags::RST | tcp::TcpFlags::ACK,
None,
);
shared.tun[0].try_send(&buf).unwrap();
let tun_index = shared.tun_index.fetch_add(1, Ordering::Relaxed) % shared.tun.len();
let tun = unsafe {
shared.tun.get_unchecked(tun_index)
};
tun.try_send(&buf).unwrap();
}
}
None => {