fix(fake-tcp) fix an issue where RST generated is not following

the proper RFC requirement.

Send ACK every 128MB in lieu of data packets.
This commit is contained in:
Datong Sun 2022-04-10 01:30:28 -07:00
parent 1c35635091
commit 95dfd8ab54
3 changed files with 37 additions and 17 deletions

View File

@ -61,6 +61,7 @@ use tokio_tun::Tun;
const TIMEOUT: time::Duration = time::Duration::from_secs(1); const TIMEOUT: time::Duration = time::Duration::from_secs(1);
const RETRIES: usize = 6; const RETRIES: usize = 6;
const MPSC_BUFFER_LEN: usize = 512; const MPSC_BUFFER_LEN: usize = 512;
const MAX_UNACKED_LEN: u32 = 128 * 1024 * 1024; // 128MB
#[derive(Hash, Eq, PartialEq, Clone, Debug)] #[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct AddrTuple { struct AddrTuple {
@ -106,6 +107,7 @@ pub struct Socket {
remote_addr: SocketAddrV4, remote_addr: SocketAddrV4,
seq: AtomicU32, seq: AtomicU32,
ack: AtomicU32, ack: AtomicU32,
last_ack: AtomicU32,
state: State, state: State,
} }
@ -136,6 +138,7 @@ impl Socket {
remote_addr, remote_addr,
seq: AtomicU32::new(0), seq: AtomicU32::new(0),
ack: AtomicU32::new(ack.unwrap_or(0)), ack: AtomicU32::new(ack.unwrap_or(0)),
last_ack: AtomicU32::new(ack.unwrap_or(0)),
state, state,
}, },
incoming_tx, incoming_tx,
@ -143,11 +146,14 @@ impl Socket {
} }
fn build_tcp_packet(&self, flags: u16, payload: Option<&[u8]>) -> Bytes { fn build_tcp_packet(&self, flags: u16, payload: Option<&[u8]>) -> Bytes {
let ack = self.ack.load(Ordering::Relaxed);
self.last_ack.store(ack, Ordering::Relaxed);
build_tcp_packet( build_tcp_packet(
self.local_addr, self.local_addr,
self.remote_addr, self.remote_addr,
self.seq.load(Ordering::Relaxed), self.seq.load(Ordering::Relaxed),
self.ack.load(Ordering::Relaxed), ack,
flags, flags,
payload, payload,
) )
@ -165,12 +171,7 @@ impl Socket {
State::Established => { State::Established => {
let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload)); let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload));
self.seq.fetch_add(payload.len() as u32, Ordering::Relaxed); self.seq.fetch_add(payload.len() as u32, Ordering::Relaxed);
self.tun.send(&buf).await.ok().and(Some(()))
tokio::select! {
res = self.tun.send(&buf) => {
res.ok().and(Some(()))
},
}
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -197,8 +198,18 @@ impl Socket {
let payload = tcp_packet.payload(); let payload = tcp_packet.payload();
self.ack let new_ack = tcp_packet.get_sequence().wrapping_add(payload.len() as u32);
.store(tcp_packet.get_sequence().wrapping_add(1), Ordering::Relaxed); let last_ask = self.last_ack.load(Ordering::Relaxed);
self.ack.store(new_ack, Ordering::Relaxed);
if new_ack.overflowing_sub(last_ask).0 > MAX_UNACKED_LEN {
let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, None);
if let Err(e) = self.tun.try_send(&buf) {
// This should not really happen as we have not sent anything for
// quite some time...
info!("Connection {} unable to send idling ACK back: {}", self, e)
}
}
buf[..payload.len()].copy_from_slice(payload); buf[..payload.len()].copy_from_slice(payload);
@ -315,7 +326,14 @@ impl Drop for Socket {
// purge cache // purge cache
self.shared.tuples_purge.send(tuple).unwrap(); self.shared.tuples_purge.send(tuple).unwrap();
let buf = self.build_tcp_packet(tcp::TcpFlags::RST, None); let buf = build_tcp_packet(
self.local_addr,
self.remote_addr,
self.seq.load(Ordering::Relaxed),
0,
tcp::TcpFlags::RST,
None,
);
if let Err(e) = self.tun.try_send(&buf) { if let Err(e) = self.tun.try_send(&buf) {
warn!("Unable to send RST to remote end: {}", e); warn!("Unable to send RST to remote end: {}", e);
} }
@ -486,8 +504,8 @@ impl Stack {
local_addr, local_addr,
remote_addr, remote_addr,
0, 0,
tcp_packet.get_sequence() + 1, tcp_packet.get_sequence() + tcp_packet.payload().len() as u32 + 1, // +1 because of SYN flag set
tcp::TcpFlags::RST, tcp::TcpFlags::RST | tcp::TcpFlags::ACK,
None, None,
); );
shared.tun[0].try_send(&buf).unwrap(); shared.tun[0].try_send(&buf).unwrap();
@ -498,8 +516,8 @@ impl Stack {
local_addr, local_addr,
remote_addr, remote_addr,
tcp_packet.get_acknowledgement(), tcp_packet.get_acknowledgement(),
0, tcp_packet.get_sequence() + tcp_packet.payload().len() as u32,
tcp::TcpFlags::RST, tcp::TcpFlags::RST | tcp::TcpFlags::ACK,
None, None,
); );
shared.tun[0].try_send(&buf).unwrap(); shared.tun[0].try_send(&buf).unwrap();
@ -508,7 +526,7 @@ impl Stack {
tuple = tuples_purge.recv() => { tuple = tuples_purge.recv() => {
let tuple = tuple.unwrap(); let tuple = tuple.unwrap();
tuples.remove(&tuple); tuples.remove(&tuple);
trace!("Removed cached tuple"); trace!("Removed cached tuple: {:?}", tuple);
} }
} }
} }

View File

@ -99,6 +99,7 @@ async fn main() {
.expect("bad peer address for Tun interface"); .expect("bad peer address for Tun interface");
let num_cpus = num_cpus::get(); let num_cpus = num_cpus::get();
info!("{} cores available", num_cpus);
let tun = TunBuilder::new() let tun = TunBuilder::new()
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
@ -157,7 +158,7 @@ async fn main() {
for i in 0..num_cpus { for i in 0..num_cpus {
let sock = sock.clone(); let sock = sock.clone();
let quit = quit.child_token(); let quit = quit.clone();
let packet_received = packet_received.clone(); let packet_received = packet_received.clone();
tokio::spawn(async move { tokio::spawn(async move {

View File

@ -94,6 +94,7 @@ async fn main() {
.expect("bad peer address for Tun interface"); .expect("bad peer address for Tun interface");
let num_cpus = num_cpus::get(); let num_cpus = num_cpus::get();
info!("{} cores available", num_cpus);
let tun = TunBuilder::new() let tun = TunBuilder::new()
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
@ -134,7 +135,7 @@ async fn main() {
for i in 0..num_cpus { for i in 0..num_cpus {
let sock = sock.clone(); let sock = sock.clone();
let quit = quit.child_token(); let quit = quit.clone();
let packet_received = packet_received.clone(); let packet_received = packet_received.clone();
let udp_sock = new_udp_reuseport(local_addr); let udp_sock = new_udp_reuseport(local_addr);