Skip to content

Commit

Permalink
Pop the chunk only when received the complete packet
Browse files Browse the repository at this point in the history
  • Loading branch information
marverlous811 committed Sep 20, 2024
1 parent 1c4d049 commit ad59ef1
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 35 deletions.
87 changes: 87 additions & 0 deletions crates/agent/src/handshake_chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::error::Error;

use protocol::cluster::AgentTunnelRequest;

#[derive(Debug, Default)]
pub struct HandshakedChunk {
data: Vec<u8>,
}

impl HandshakedChunk {
pub fn put_buf(&mut self, buf: &[u8]) {
self.data.extend_from_slice(buf);
}

pub fn pop_chunk(&self) -> Result<Option<(AgentTunnelRequest, Vec<u8>)>, Box<dyn Error>> {
let handshake_len = u16::from_be_bytes([self.data[0], self.data[1]]) as usize;
if handshake_len + 2 > self.data.len() {
return Ok(None);
}

match AgentTunnelRequest::try_from(&self.data[2..handshake_len + 2]) {
Ok(handshake) => {
let data = self.data[handshake_len + 2..].to_vec();
Ok(Some((handshake, data)))
}
Err(e) => Err(Box::new(e)),

Check warning on line 26 in crates/agent/src/handshake_chunk.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/handshake_chunk.rs#L26

Added line #L26 was not covered by tests
}
}
}

#[cfg(test)]
mod test {
use super::*;

fn build_sample() -> Vec<u8> {
let request = AgentTunnelRequest {
service: None,
tls: true,
domain: "ac5eb61578bebe98636148414efc482f.local.ha.8xff.io".to_string(),
};

let handshake_buf = Vec::from(&request);
let handshake_len = handshake_buf.len() as u16;
let mut buf = handshake_len.to_be_bytes().to_vec();
buf.append(&mut handshake_buf.clone());
buf.append([1, 2, 3].to_vec().as_mut());

// println!("buf: {} {handshake_len}", buf.len());

buf
}

#[test]
pub fn push_full_chunk() {
let mut chunk = HandshakedChunk::default();
let buf = build_sample();
chunk.put_buf(&buf);

let res = chunk.pop_chunk();
assert!(res.is_ok());
let (handshake, data) = res.unwrap().unwrap();
assert_eq!(
handshake.domain,
"ac5eb61578bebe98636148414efc482f.local.ha.8xff.io"
);
assert_eq!(data, [1, 2, 3]);
}

#[test]
pub fn push_part_chunk() {
let mut chunk = HandshakedChunk::default();
let buf = build_sample();

chunk.put_buf(&buf[0..2]);
let res = chunk.pop_chunk().unwrap();
assert!(res.is_none());
chunk.put_buf(&buf[2..]);
let res = chunk.pop_chunk();
assert!(res.is_ok());
let (handshake, data) = res.unwrap().unwrap();
assert_eq!(
handshake.domain,
"ac5eb61578bebe98636148414efc482f.local.ha.8xff.io"
);
assert_eq!(data, [1, 2, 3]);
}
}
90 changes: 55 additions & 35 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use protocol::cluster::AgentTunnelRequest;
use crate::local_tunnel::LocalTunnel;

mod connection;
mod handshake_chunk;
mod local_tunnel;

pub use connection::{
Expand All @@ -27,51 +28,73 @@ where
log::info!("sub_connection pipe to local_tunnel start");
let (mut reader1, mut writer1) = sub_connection.split();
let mut first_pkt = [0u8; 4096];
let (local_tunnel, first_pkt_start, first_pkt_end) = match reader1.read(&mut first_pkt).await {
Ok(first_pkt_len) => {
log::info!("first pkt size: {}", first_pkt_len);
if first_pkt_len < 2 {
log::error!("first pkt size is < 4 => close");
return;
}
let handshake_len = u16::from_be_bytes([first_pkt[0], first_pkt[1]]) as usize;
if handshake_len + 2 > first_pkt_len {
log::error!("first pkt size is < handshake {handshake_len} + 2 => close");

let mut chunk = handshake_chunk::HandshakedChunk::default();
let mut handshake: Option<AgentTunnelRequest> = None;
let mut data = Vec::new();

let timeout_duration = std::time::Duration::from_secs(10);

Check warning on line 36 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L31-L36

Added lines #L31 - L36 were not covered by tests

loop {
select! {
_ = async_std::task::sleep(timeout_duration).fuse() => {
log::error!("handshake timeout");

Check warning on line 41 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L39-L41

Added lines #L39 - L41 were not covered by tests
return;
}
match AgentTunnelRequest::try_from(&first_pkt[2..(handshake_len + 2)]) {
Ok(handshake) => {
if let Some(dest) =
registry.dest_for(handshake.tls, handshake.service, &handshake.domain)
{
log::info!("create tunnel to dest {}", dest);
(
LocalTcpTunnel::new(dest).await,
handshake_len + 2,
first_pkt_len,
)
} else {
log::warn!(
"dest for service {:?} tls {} domain {} not found",
handshake.service,
handshake.tls,
handshake.domain
);
res = reader1.read(&mut first_pkt).fuse() => match res {
Ok(n) => {
if n < 2 {
log::error!("first pkt size is < 4 => close");

Check warning on line 47 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L44-L47

Added lines #L44 - L47 were not covered by tests
return;
}
chunk.put_buf(&first_pkt[..n]);
let res = chunk.pop_chunk();
match res {
Ok(Some((req, mut payload))) => {
handshake = Some(req);
data.append(&mut payload);
break;

Check warning on line 56 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L50-L56

Added lines #L50 - L56 were not covered by tests
}
Ok(None) => {
continue;

Check warning on line 59 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L59

Added line #L59 was not covered by tests
}
Err(e) => {
log::error!("handshake parse error: {}", e);
return;

Check warning on line 63 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L61-L63

Added lines #L61 - L63 were not covered by tests
}
}
}
Err(e) => {
log::error!("handshake parse error: {}", e);
log::error!("read first pkt error: {}", e);

Check warning on line 68 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L68

Added line #L68 was not covered by tests
return;
}
}
}
Err(e) => {
log::error!("read first pkt error: {}", e);
}

let handshake = match handshake {
Some(handshake) => handshake,

Check warning on line 76 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L75-L76

Added lines #L75 - L76 were not covered by tests
None => {
log::error!("handshake is None");

Check warning on line 78 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L78

Added line #L78 was not covered by tests
return;
}
};

let local_tunnel = if let Some(dest) =
registry.dest_for(handshake.tls, handshake.service, &handshake.domain)

Check warning on line 84 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L83-L84

Added lines #L83 - L84 were not covered by tests
{
log::info!("create tunnel to dest {}", dest);
LocalTcpTunnel::new(dest).await

Check warning on line 87 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L86-L87

Added lines #L86 - L87 were not covered by tests
} else {
log::warn!(
"dest for service {:?} tls {} domain {} not found",

Check warning on line 90 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L89-L90

Added lines #L89 - L90 were not covered by tests
handshake.service,
handshake.tls,
handshake.domain
);
return;

Check warning on line 95 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L95

Added line #L95 was not covered by tests
};

let local_tunnel = match local_tunnel {
Ok(local_tunnel) => local_tunnel,
Err(e) => {
Expand All @@ -82,10 +105,7 @@ where

let (mut reader2, mut writer2) = local_tunnel.split();

if let Err(e) = writer2
.write_all(&first_pkt[first_pkt_start..first_pkt_end])
.await
{
if let Err(e) = writer2.write_all(&data).await {

Check warning on line 108 in crates/agent/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/agent/src/lib.rs#L108

Added line #L108 was not covered by tests
log::error!("write first pkt to local_tunnel error: {}", e);
return;
}
Expand Down

0 comments on commit ad59ef1

Please sign in to comment.