Removes unsafes and unwraps, sets default tcp connections to 1

This commit is contained in:
Saber Haj Rabiee
2022-12-12 00:58:10 -08:00
parent ea9b6575fc
commit 13dfdaac98
5 changed files with 95 additions and 46 deletions

View File

@@ -192,7 +192,7 @@ impl Socket {
match self.state {
State::Established => {
self.incoming.recv_async().await.ok().and_then(|raw_buf| {
let (_v4_packet, tcp_packet) = parse_ip_packet(&raw_buf).unwrap();
let (_v4_packet, tcp_packet) = parse_ip_packet(&raw_buf)?;
if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 {
info!("Connection {} reset by peer", self);
@@ -229,15 +229,27 @@ impl Socket {
State::Idle => {
let buf = self.build_tcp_packet(tcp::TcpFlags::SYN | tcp::TcpFlags::ACK, None);
// ACK set by constructor
self.tun.send(&buf).await.unwrap();
if let Err(err) = self.tun.send(&buf).await {
error!("Sent SYN + ACK error: {err}");
return;
}
self.state = State::SynReceived;
info!("Sent SYN + ACK to client");
}
State::SynReceived => {
let res = time::timeout(TIMEOUT, self.incoming.recv_async()).await;
if let Ok(buf) = res {
let buf = buf.unwrap();
let (_v4_packet, tcp_packet) = parse_ip_packet(&buf).unwrap();
let buf = match buf {
Ok(buf) => buf,
Err(err) => {
error!("incoming channel recv_async error: {err}");
return;
}
};
let (_v4_packet, tcp_packet) = match parse_ip_packet(&buf) {
Some(packet) => packet,
None => return,
};
if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 {
return;
@@ -273,15 +285,24 @@ impl Socket {
match self.state {
State::Idle => {
let buf = self.build_tcp_packet(tcp::TcpFlags::SYN, None);
self.tun.send(&buf).await.unwrap();
if let Err(err) = self.tun.send(&buf).await {
error!("Send SYN error: {err}");
return None;
}
self.state = State::SynSent;
info!("Sent SYN to server");
}
State::SynSent => {
match time::timeout(TIMEOUT, self.incoming.recv_async()).await {
Ok(buf) => {
let buf = buf.unwrap();
let (_v4_packet, tcp_packet) = parse_ip_packet(&buf).unwrap();
let buf = match buf {
Ok(buf) => buf,
Err(err) => {
error!("incoming channel error: {err}");
return None;
}
};
let (_v4_packet, tcp_packet) = parse_ip_packet(&buf)?;
if (tcp_packet.get_flags() & tcp::TcpFlags::RST) != 0 {
return None;
@@ -298,7 +319,10 @@ impl Socket {
// send ACK to finish handshake
let buf = self.build_tcp_packet(tcp::TcpFlags::ACK, None);
self.tun.send(&buf).await.unwrap();
if let Err(err) = self.tun.send(&buf).await {
error!("Send ACK error: {err}");
return None;
}
self.state = State::Established;
@@ -327,7 +351,9 @@ impl Drop for Socket {
// dissociates ourself from the dispatch map
assert!(self.shared.tuples.remove(&tuple).is_some());
// purge cache
self.shared.tuples_purge.send(tuple).unwrap();
if let Err(err) = self.shared.tuples_purge.send(tuple) {
error!("Send error in tuples_purge: {err}");
}
let buf = build_tcp_packet(
self.local_addr,
@@ -425,7 +451,7 @@ impl Stack {
Entry::Vacant(v) => {
let tun_index = self.shared.tun_index.fetch_add(1, Ordering::Relaxed)
% self.shared.tun.len();
let tun = unsafe { self.shared.tun.get_unchecked(tun_index).clone() };
let tun = self.shared.tun[tun_index].clone();
let (sock, incoming) = Socket::new(
self.shared.clone(),
tun,
@@ -434,7 +460,7 @@ impl Stack {
None,
State::Idle,
);
v.insert(incoming.clone());
v.insert(incoming);
sock
}
};
@@ -456,7 +482,13 @@ impl Stack {
tokio::select! {
size = tun.recv(&mut buf) => {
let size = size.unwrap();
let size = match size {
Ok(size) => size,
Err(err) => {
error!("Couldn't read tun buf: {err}");
continue;
}
};
buf.truncate(size);
let buf = buf.freeze();
@@ -483,8 +515,10 @@ impl Stack {
if let Some(c) = sender {
trace!("Storing connection information into local tuples");
tuples.insert(tuple, c.clone());
c.send_async(buf).await.unwrap();
continue;
if let Err(err) = c.send_async(buf).await {
error!("Couldn't send to shared tuples channel: {err}");
}
continue
}
}
@@ -519,10 +553,10 @@ impl Stack {
None,
);
let tun_index = shared.tun_index.fetch_add(1, Ordering::Relaxed) % shared.tun.len();
let tun = unsafe {
shared.tun.get_unchecked(tun_index)
};
tun.try_send(&buf).unwrap();
let tun = shared.tun[tun_index].clone();
if let Err(err) = tun.try_send(&buf) {
error!("tun send error: {err}");
}
}
} else if (tcp_packet.get_flags() & tcp::TcpFlags::RST) == 0 {
info!("Unknown TCP packet from {}, sending RST", remote_addr);
@@ -535,10 +569,10 @@ impl Stack {
None,
);
let tun_index = shared.tun_index.fetch_add(1, Ordering::Relaxed) % shared.tun.len();
let tun = unsafe {
shared.tun.get_unchecked(tun_index)
};
tun.try_send(&buf).unwrap();
let tun = shared.tun[tun_index].clone();
if let Err(err) = tun.try_send(&buf) {
error!("tun send error: {err}");
}
}
}
None => {
@@ -547,7 +581,13 @@ impl Stack {
}
},
tuple = tuples_purge.recv() => {
let tuple = tuple.unwrap();
let tuple = match tuple {
Ok(tuple) => tuple,
Err(err) => {
error!("tuples_purge recv error: {err}");
continue;
}
};
tuples.remove(&tuple);
trace!("Removed cached tuple: {:?}", tuple);
}