diff --git a/fake-tcp/src/lib.rs b/fake-tcp/src/lib.rs index 0465ff5..67bf3ea 100644 --- a/fake-tcp/src/lib.rs +++ b/fake-tcp/src/lib.rs @@ -61,6 +61,7 @@ use tokio_tun::Tun; const TIMEOUT: time::Duration = time::Duration::from_secs(1); const RETRIES: usize = 6; const MPSC_BUFFER_LEN: usize = 512; +const MAX_UNACKED_LEN: u32 = 128 * 1024 * 1024; // 128MB #[derive(Hash, Eq, PartialEq, Clone, Debug)] struct AddrTuple { @@ -106,6 +107,7 @@ pub struct Socket { remote_addr: SocketAddrV4, seq: AtomicU32, ack: AtomicU32, + last_ack: AtomicU32, state: State, } @@ -136,6 +138,7 @@ impl Socket { remote_addr, seq: AtomicU32::new(0), ack: AtomicU32::new(ack.unwrap_or(0)), + last_ack: AtomicU32::new(ack.unwrap_or(0)), state, }, incoming_tx, @@ -143,11 +146,14 @@ impl Socket { } fn build_tcp_packet(&self, flags: u16, payload: Option<&[u8]>) -> Bytes { + let ack = self.ack.load(Ordering::Relaxed); + self.last_ack.store(ack, Ordering::Relaxed); + build_tcp_packet( self.local_addr, self.remote_addr, self.seq.load(Ordering::Relaxed), - self.ack.load(Ordering::Relaxed), + ack, flags, payload, ) @@ -165,12 +171,7 @@ impl Socket { State::Established => { let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload)); self.seq.fetch_add(payload.len() as u32, Ordering::Relaxed); - - tokio::select! { - res = self.tun.send(&buf) => { - res.ok().and(Some(())) - }, - } + self.tun.send(&buf).await.ok().and(Some(())) } _ => unreachable!(), } @@ -197,8 +198,18 @@ impl Socket { let payload = tcp_packet.payload(); - self.ack - .store(tcp_packet.get_sequence().wrapping_add(1), Ordering::Relaxed); + let new_ack = tcp_packet.get_sequence().wrapping_add(payload.len() as u32); + let last_ask = self.last_ack.load(Ordering::Relaxed); + self.ack.store(new_ack, Ordering::Relaxed); + + if new_ack.overflowing_sub(last_ask).0 > MAX_UNACKED_LEN { + let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, None); + if let Err(e) = self.tun.try_send(&buf) { + // This should not really happen as we have not sent anything for + // quite some time... + info!("Connection {} unable to send idling ACK back: {}", self, e) + } + } buf[..payload.len()].copy_from_slice(payload); @@ -315,7 +326,14 @@ impl Drop for Socket { // purge cache self.shared.tuples_purge.send(tuple).unwrap(); - let buf = self.build_tcp_packet(tcp::TcpFlags::RST, None); + let buf = build_tcp_packet( + self.local_addr, + self.remote_addr, + self.seq.load(Ordering::Relaxed), + 0, + tcp::TcpFlags::RST, + None, + ); if let Err(e) = self.tun.try_send(&buf) { warn!("Unable to send RST to remote end: {}", e); } @@ -486,8 +504,8 @@ impl Stack { local_addr, remote_addr, 0, - tcp_packet.get_sequence() + 1, - tcp::TcpFlags::RST, + tcp_packet.get_sequence() + tcp_packet.payload().len() as u32 + 1, // +1 because of SYN flag set + tcp::TcpFlags::RST | tcp::TcpFlags::ACK, None, ); shared.tun[0].try_send(&buf).unwrap(); @@ -498,8 +516,8 @@ impl Stack { local_addr, remote_addr, tcp_packet.get_acknowledgement(), - 0, - tcp::TcpFlags::RST, + tcp_packet.get_sequence() + tcp_packet.payload().len() as u32, + tcp::TcpFlags::RST | tcp::TcpFlags::ACK, None, ); shared.tun[0].try_send(&buf).unwrap(); @@ -508,7 +526,7 @@ impl Stack { tuple = tuples_purge.recv() => { let tuple = tuple.unwrap(); tuples.remove(&tuple); - trace!("Removed cached tuple"); + trace!("Removed cached tuple: {:?}", tuple); } } } diff --git a/phantun/src/bin/client.rs b/phantun/src/bin/client.rs index 65d81f9..ef4bf22 100644 --- a/phantun/src/bin/client.rs +++ b/phantun/src/bin/client.rs @@ -99,6 +99,7 @@ async fn main() { .expect("bad peer address for Tun interface"); let num_cpus = num_cpus::get(); + info!("{} cores available", num_cpus); let tun = TunBuilder::new() .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. @@ -157,7 +158,7 @@ async fn main() { for i in 0..num_cpus { let sock = sock.clone(); - let quit = quit.child_token(); + let quit = quit.clone(); let packet_received = packet_received.clone(); tokio::spawn(async move { diff --git a/phantun/src/bin/server.rs b/phantun/src/bin/server.rs index e7f8a9c..4a32a24 100644 --- a/phantun/src/bin/server.rs +++ b/phantun/src/bin/server.rs @@ -94,6 +94,7 @@ async fn main() { .expect("bad peer address for Tun interface"); let num_cpus = num_cpus::get(); + info!("{} cores available", num_cpus); let tun = TunBuilder::new() .name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel. @@ -134,7 +135,7 @@ async fn main() { for i in 0..num_cpus { let sock = sock.clone(); - let quit = quit.child_token(); + let quit = quit.clone(); let packet_received = packet_received.clone(); let udp_sock = new_udp_reuseport(local_addr);