From 10a9af365801fa79735c13f91dd8d75fc166c782 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Thu, 16 Sep 2021 11:25:28 -0700 Subject: [PATCH] feat(phantom) initial phantom commit --- .gitignore | 2 + Cargo.toml | 13 ++ src/bin/client.rs | 106 ++++++++++++ src/bin/server.rs | 86 ++++++++++ src/fake_tcp/mod.rs | 360 +++++++++++++++++++++++++++++++++++++++++ src/fake_tcp/packet.rs | 70 ++++++++ src/lib.rs | 1 + 7 files changed, 638 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/bin/client.rs create mode 100644 src/bin/server.rs create mode 100644 src/fake_tcp/mod.rs create mode 100644 src/fake_tcp/packet.rs create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a0e5ec7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "phantom" +version = "0.1.0" +edition = "2018" + +[dependencies] +bytes = "1" +pnet = "0.28.0" +tokio-tun = "0.3.15" +tokio = { version = "1.11.0", features = ["full"] } +lru_time_cache = "0.11.11" +rand = { version = "0.8.4", features = ["small_rng"] } +clap = "2.33.3" diff --git a/src/bin/client.rs b/src/bin/client.rs new file mode 100644 index 0000000..3d53ea1 --- /dev/null +++ b/src/bin/client.rs @@ -0,0 +1,106 @@ +use clap::{App, Arg}; +use lru_time_cache::LruCache; +use phantom::fake_tcp::packet::MAX_PACKET_LEN; +use phantom::fake_tcp::{Socket, Stack}; +use std::net::{SocketAddr, SocketAddrV4}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use tokio::net::UdpSocket; +use tokio::sync::Mutex; +use tokio::time; +use tokio_tun::TunBuilder; + +const UDP_TTL: Duration = Duration::from_secs(300); + +#[tokio::main] +async fn main() { + let matches = App::new("Phantom Client") + .version("1.0") + .author("Dndx") + .arg( + Arg::with_name("local") + .short("l") + .long("local") + .required(true) + .value_name("IP:PORT") + .help("Sets the listening socket address") + .takes_value(true), + ) + .arg( + Arg::with_name("remote") + .short("r") + .long("remote") + .required(true) + .value_name("IP:PORT") + .help("Sets the connecting socket address") + .takes_value(true), + ) + .get_matches(); + + let local_addr: SocketAddrV4 = matches + .value_of("local") + .unwrap() + .parse() + .expect("bad local address"); + let remote_addr: SocketAddrV4 = matches + .value_of("remote") + .unwrap() + .parse() + .expect("bad remote address"); + + let tun = TunBuilder::new() + .name("") // if name is empty, then it is set by kernel. + .tap(false) // false (default): TUN, true: TAP. + .packet_info(false) // false: IFF_NO_PI, default is true. + .up() // or set it up manually using `sudo ip link set up`. + .address("192.168.200.1".parse().unwrap()) + .destination("192.168.200.2".parse().unwrap()) + .try_build() + .unwrap(); + + let udp_sock = Arc::new(UdpSocket::bind(local_addr).await.unwrap()); + let connections = Mutex::new(LruCache::>::with_expiry_duration( + UDP_TTL, + )); + + thread::sleep(Duration::from_secs(5)); + let mut stack = Stack::new(tun); + + let main_loop = tokio::spawn(async move { + let mut buf_r = [0u8; MAX_PACKET_LEN]; + let mut cleanup_timer = time::interval(Duration::from_secs(5)); + + loop { + tokio::select! { + Ok((size, SocketAddr::V4(addr))) = udp_sock.recv_from(&mut buf_r) => { + if let Some(sock) = connections.lock().await.get_mut(&addr) { + sock.send(&buf_r[..size]).await; + continue; + } + + let mut sock = Arc::new(stack.connect(remote_addr).await); + sock.send(&buf_r[..size]).await; + assert!(connections.lock().await.insert(addr, sock.clone()).is_none()); + let udp_sock = udp_sock.clone(); + + tokio::spawn(async move { + loop { + let mut buf_r = [0u8; MAX_PACKET_LEN]; + let size = sock.recv(&mut buf_r).await; + + if size > 0 { + udp_sock.send_to(&buf_r[..size], addr).await.unwrap(); + } + } + }); + }, + _ = cleanup_timer.tick() => { + connections.lock().await.iter(); + }, + } + } + }); + + tokio::join!(main_loop); +} diff --git a/src/bin/server.rs b/src/bin/server.rs new file mode 100644 index 0000000..d150fbb --- /dev/null +++ b/src/bin/server.rs @@ -0,0 +1,86 @@ +use clap::{App, Arg, SubCommand}; +use phantom::fake_tcp::packet::MAX_PACKET_LEN; +use phantom::fake_tcp::Stack; +use std::net::SocketAddrV4; +use std::{thread, time}; +use tokio::net::UdpSocket; +use tokio_tun::TunBuilder; + +#[tokio::main] +async fn main() { + let matches = App::new("Phantom Server") + .version("1.0") + .author("Dndx") + .arg( + Arg::with_name("local") + .short("l") + .long("local") + .required(true) + .value_name("PORT") + .help("Sets the listening port") + .takes_value(true), + ) + .arg( + Arg::with_name("remote") + .short("r") + .long("remote") + .required(true) + .value_name("IP:PORT") + .help("Sets the connecting socket address") + .takes_value(true), + ) + .get_matches(); + + let local_port: u16 = matches + .value_of("local") + .unwrap() + .parse() + .expect("bad local port"); + let remote_addr: SocketAddrV4 = matches + .value_of("remote") + .unwrap() + .parse() + .expect("bad remote address"); + + let tun = TunBuilder::new() + .name("") // if name is empty, then it is set by kernel. + .tap(false) // false (default): TUN, true: TAP. + .packet_info(false) // false: IFF_NO_PI, default is true. + .up() // or set it up manually using `sudo ip link set up`. + .address("192.168.201.1".parse().unwrap()) + .destination("192.168.201.2".parse().unwrap()) + .try_build() + .unwrap(); + + //thread::sleep(time::Duration::from_secs(5)); + let mut stack = Stack::new(tun); + stack.listen(local_port); + + let main_loop = tokio::spawn(async move { + let mut buf_udp = [0u8; MAX_PACKET_LEN]; + let mut buf_tcp = [0u8; MAX_PACKET_LEN]; + + loop { + let sock = stack.accept().await; + tokio::spawn(async move { + let udp_sock = UdpSocket::bind("0.0.0.0:0").await.unwrap(); + udp_sock.connect(remote_addr).await.unwrap(); + + loop { + tokio::select! { + Ok(size) = udp_sock.recv(&mut buf_udp) => { + sock.send(&buf_udp[..size]).await; + }, + size = sock.recv(&mut buf_tcp) => { + if size > 0 { + udp_sock.send(&buf_tcp[..size]).await.unwrap(); + } + } + }; + } + }); + } + }); + + tokio::join!(main_loop); +} diff --git a/src/fake_tcp/mod.rs b/src/fake_tcp/mod.rs new file mode 100644 index 0000000..81fe6ad --- /dev/null +++ b/src/fake_tcp/mod.rs @@ -0,0 +1,360 @@ +pub mod packet; + +use bytes::{Bytes, BytesMut}; +use packet::*; +use pnet::packet::{tcp, Packet}; +use rand::prelude::*; +use std::cell::RefCell; +use std::cmp::max; +use std::collections::{HashMap, HashSet}; +use std::io::{Error, Result}; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::mpsc::{self, error::TrySendError, Receiver, Sender}; +use tokio::sync::Mutex as AsyncMutex; +use tokio::{io, time}; +use tokio_tun::Tun; + +const TIMEOUT: time::Duration = time::Duration::from_secs(5); +const MPSC_BUFFER_LEN: usize = 128; + +#[derive(Debug, Hash, Eq, PartialEq)] +pub struct AddrTuple { + local_addr: SocketAddrV4, + remote_addr: SocketAddrV4, +} + +impl AddrTuple { + fn new(local_addr: SocketAddrV4, remote_addr: SocketAddrV4) -> AddrTuple { + AddrTuple { + local_addr, + remote_addr, + } + } +} + +#[derive(Debug)] +struct Shared { + tuples: Mutex>>>, + listening: Mutex>, + outgoing: Sender, + ready: Sender, +} + +pub struct Stack { + shared: Arc, + local_ip: Ipv4Addr, + ready: Receiver, +} + +#[derive(Debug)] +pub enum State { + Idle, + SynSent, + SynReceived, + Established, +} + +#[derive(Debug)] +pub enum Mode { + Client, + Server, +} + +#[derive(Debug)] +pub struct Socket { + mode: Mode, + shared: Arc, + incoming: AsyncMutex>, + local_addr: SocketAddrV4, + remote_addr: SocketAddrV4, + seq: AtomicU32, + ack: AtomicU32, + state: State, +} + +impl Socket { + fn new( + mode: Mode, + shared: Arc, + local_addr: SocketAddrV4, + remote_addr: SocketAddrV4, + ack: Option, + state: State, + ) -> (Socket, Sender) { + let (incoming_tx, incoming_rx) = mpsc::channel(MPSC_BUFFER_LEN); + + ( + Socket { + mode, + shared, + incoming: AsyncMutex::new(incoming_rx), + local_addr, + remote_addr, + seq: AtomicU32::new(0), + ack: AtomicU32::new(ack.unwrap_or(0)), + state, + }, + incoming_tx, + ) + } + + fn build_tcp_packet(&self, flags: u16, payload: Option<&[u8]>) -> Bytes { + return build_tcp_packet( + self.local_addr, + self.remote_addr, + self.seq.load(Ordering::Relaxed), + self.ack.load(Ordering::Relaxed), + flags, + payload, + ); + } + + pub async fn send(&self, payload: &[u8]) { + match self.state { + State::Established => { + let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, Some(payload)); + self.seq.fetch_add(buf.len() as u32, Ordering::Relaxed); + self.shared.outgoing.send(buf).await.unwrap(); + } + _ => unreachable!(), + } + } + + pub async fn recv(&self, buf: &mut [u8]) -> usize { + match self.state { + State::Established => { + let raw_buf = self.incoming.lock().await.recv().await.unwrap(); + let (_v4_packet, tcp_packet) = parse_ipv4_packet(&raw_buf); + let payload = tcp_packet.payload(); + + self.ack + .fetch_max(tcp_packet.get_sequence() + 1, Ordering::Relaxed); + + buf[..payload.len()].copy_from_slice(payload); + + payload.len() + } + _ => unreachable!(), + } + } + + async fn accept(mut self) { + loop { + match self.state { + State::Idle => { + let buf = self.build_tcp_packet(tcp::TcpFlags::SYN | tcp::TcpFlags::ACK, None); + // ACK set by constructor + self.shared.outgoing.send(buf).await.unwrap(); + self.state = State::SynReceived; + } + State::SynReceived => { + let res = time::timeout(TIMEOUT, self.incoming.lock().await.recv()).await; + if let Ok(buf) = res { + let buf = buf.unwrap(); + let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf); + + if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { + return; + } + + if tcp_packet.get_flags() == tcp::TcpFlags::ACK + && tcp_packet.get_acknowledgement() + == self.seq.load(Ordering::Relaxed) + 1 + { + // found our ACK + self.seq.fetch_add(1, Ordering::Relaxed); + self.state = State::Established; + + println!("Connection from {:?} established", self.remote_addr); + let ready = self.shared.ready.clone(); + ready.send(self).await.unwrap(); + return; + } + } else { + println!("waiting for SYN + ACK timed out, dropping connection"); + return; + } + } + _ => unreachable!(), + } + } + } + + async fn connect(&mut self) { + loop { + match self.state { + State::Idle => { + let buf = self.build_tcp_packet(tcp::TcpFlags::SYN, None); + self.shared.outgoing.send(buf).await.unwrap(); + self.state = State::SynSent; + } + State::SynSent => { + match time::timeout(TIMEOUT, self.incoming.lock().await.recv()).await { + Ok(buf) => { + let buf = buf.unwrap(); + let (_v4_packet, tcp_packet) = parse_ipv4_packet(&buf); + + if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 { + return; + } + + if tcp_packet.get_flags() == tcp::TcpFlags::SYN | tcp::TcpFlags::ACK + && tcp_packet.get_acknowledgement() + == self.seq.load(Ordering::Relaxed) + 1 + { + // found our SYN + ACK + self.seq.fetch_add(1, Ordering::Relaxed); + self.ack + .store(tcp_packet.get_sequence() + 1, Ordering::Relaxed); + + // send ACK to finish handshake + let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, None); + self.shared.outgoing.send(buf).await.unwrap(); + + self.state = State::Established; + + println!("Connection to {:?} established", self.remote_addr); + return; + } + } + Err(_) => { + println!("waiting for SYN + ACK timed out, going back to Idle"); + self.state = State::Idle; + } + } + } + _ => unreachable!(), + } + } + } +} + +impl Drop for Socket { + fn drop(&mut self) { + // dissociates ourself from the dispatch map + assert!(self + .shared + .tuples + .lock() + .unwrap() + .remove(&AddrTuple::new(self.local_addr, self.remote_addr)) + .is_some()); + + let buf = self.build_tcp_packet(tcp::TcpFlags::RST, None); + self.shared.outgoing.try_send(buf).unwrap(); + } +} + +impl Stack { + pub fn new(tun: Tun) -> Stack { + let (outgoing_tx, outgoing_rx) = mpsc::channel(MPSC_BUFFER_LEN); + let (ready_tx, ready_rx) = mpsc::channel(MPSC_BUFFER_LEN); + let shared = Arc::new(Shared { + tuples: Mutex::new(HashMap::new()), + outgoing: outgoing_tx, + listening: Mutex::new(HashSet::new()), + ready: ready_tx, + }); + let local_ip = tun.destination().unwrap(); + + tokio::spawn(Stack::dispatch(tun, outgoing_rx, shared.clone())); + Stack { + shared, + local_ip, + ready: ready_rx, + } + } + + pub fn listen(&mut self, port: u16) { + assert!(self.shared.listening.lock().unwrap().insert(port)); + } + + pub async fn accept(&mut self) -> Socket { + self.ready.recv().await.unwrap() + } + + pub async fn connect(&mut self, addr: SocketAddrV4) -> Socket { + let mut rng = SmallRng::from_entropy(); + let local_port: u16 = rng.gen_range(1024..65535); + let local_addr = SocketAddrV4::new(self.local_ip, local_port); + let tuple = AddrTuple::new(local_addr, addr); + let (mut sock, incoming) = Socket::new( + Mode::Client, + self.shared.clone(), + local_addr, + addr, + None, + State::Idle, + ); + + { + let mut tuples = self.shared.tuples.lock().unwrap(); + assert!(tuples.insert(tuple, Arc::new(incoming.clone())).is_none()); + } + + sock.connect().await; + sock + } + + async fn dispatch(tun: Tun, mut outgoing: Receiver, shared: Arc) { + let (mut tun_r, mut tun_w) = io::split(tun); + + loop { + let mut buf = BytesMut::with_capacity(MAX_PACKET_LEN); + + tokio::select! { + buf = outgoing.recv() => { + let buf = buf.unwrap(); + tun_w.write_all(&buf).await.unwrap(); + }, + s = tun_r.read_buf(&mut buf) => { + s.unwrap(); + let buf = buf.freeze(); + if buf[0] >> 4 != 4 { + // not an IPv4 packet + continue; + } + + let (ip_packet, tcp_packet) = parse_ipv4_packet(&buf); + let local_addr = SocketAddrV4::new(ip_packet.get_destination(), tcp_packet.get_destination()); + let remote_addr = SocketAddrV4::new(ip_packet.get_source(), tcp_packet.get_source()); + + let tuple = AddrTuple::new(local_addr, remote_addr); + + let sender; + { + let mut tuples = shared.tuples.lock().unwrap(); + sender = tuples.get(&tuple).map(|c| c.clone()); + } + + if let Some(c) = sender { + c.send(buf).await.unwrap(); + continue; + } + + if tcp_packet.get_flags() == tcp::TcpFlags::SYN && shared.listening.lock().unwrap().contains(&tcp_packet.get_destination()) { + // SYN seen on listening socket + if tcp_packet.get_sequence() == 0 { + let (sock, incoming) = Socket::new(Mode::Server, shared.clone(), local_addr, remote_addr, Some(tcp_packet.get_sequence() + 1), State::Idle); + assert!(shared.tuples.lock().unwrap().insert(tuple, Arc::new(incoming)).is_none()); + tokio::spawn(sock.accept()); + } else { + let buf = build_tcp_packet( + local_addr, + remote_addr, + 0, + tcp_packet.get_acknowledgement() + 1, + tcp::TcpFlags::RST, + None, + ); + shared.outgoing.try_send(buf).unwrap(); + } + } + } + } + } + } +} diff --git a/src/fake_tcp/packet.rs b/src/fake_tcp/packet.rs new file mode 100644 index 0000000..eef4128 --- /dev/null +++ b/src/fake_tcp/packet.rs @@ -0,0 +1,70 @@ +use bytes::{Bytes, BytesMut}; +use pnet::packet::{ip, ipv4, tcp, Packet, PacketSize}; +use std::convert::TryInto; +use std::net::SocketAddrV4; + +const IPV4_HEADER_LEN: usize = 20; +const TCP_HEADER_LEN: usize = 20; +pub const MAX_PACKET_LEN: usize = 1500; + +pub fn build_tcp_packet( + local_addr: SocketAddrV4, + remote_addr: SocketAddrV4, + seq: u32, + ack: u32, + flags: u16, + payload: Option<&[u8]>, +) -> Bytes { + let wscale = (flags & tcp::TcpFlags::SYN) != 0; + let tcp_total_len = TCP_HEADER_LEN + if wscale {4} else {0} // nop + wscale + + payload.map_or(0, |payload| payload.len()); + let total_len = IPV4_HEADER_LEN + tcp_total_len; + let mut buf = BytesMut::with_capacity(total_len); + buf.resize(total_len, 0); + + let mut v4_buf = buf.split_to(IPV4_HEADER_LEN); + let mut tcp_buf = buf.split_to(tcp_total_len); + assert_eq!(0, buf.len()); + + let mut v4 = ipv4::MutableIpv4Packet::new(&mut v4_buf).unwrap(); + v4.set_version(4); + v4.set_header_length(IPV4_HEADER_LEN as u8 / 4); + v4.set_next_level_protocol(ip::IpNextHeaderProtocols::Tcp); + v4.set_ttl(32); + v4.set_source(*local_addr.ip()); + v4.set_destination(*remote_addr.ip()); + v4.set_total_length(total_len.try_into().unwrap()); + v4.set_flags(ipv4::Ipv4Flags::DontFragment); + v4.set_checksum(ipv4::checksum(&v4.to_immutable())); + + let mut tcp = tcp::MutableTcpPacket::new(&mut tcp_buf).unwrap(); + tcp.set_window(0xffff); + tcp.set_source(local_addr.port()); + tcp.set_destination(remote_addr.port()); + tcp.set_sequence(seq); + tcp.set_acknowledgement(ack); + tcp.set_flags(flags); + tcp.set_data_offset(TCP_HEADER_LEN as u8 / 4 + if wscale { 1 } else { 0 }); + if wscale { + let wscale = tcp::TcpOption::wscale(14); + tcp.set_options(&vec![tcp::TcpOption::nop(), wscale]); + } + + if let Some(payload) = payload { + tcp.set_payload(payload); + } + + let checksum = tcp::ipv4_checksum(&tcp.to_immutable(), local_addr.ip(), remote_addr.ip()); + tcp.set_checksum(checksum); + + v4_buf.unsplit(tcp_buf); + + return v4_buf.freeze(); +} + +pub fn parse_ipv4_packet<'b>(buf: &'b Bytes) -> (ipv4::Ipv4Packet<'b>, tcp::TcpPacket<'b>) { + let v4 = ipv4::Ipv4Packet::new(&buf).unwrap(); + let tcp = tcp::TcpPacket::new(&buf[IPV4_HEADER_LEN..]).unwrap(); + + (v4, tcp) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..186356f --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +pub mod fake_tcp;