Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Oct 22, 2024
2 parents 13f8c1b + 8c49211 commit e1d7854
Show file tree
Hide file tree
Showing 14 changed files with 37 additions and 28 deletions.
2 changes: 1 addition & 1 deletion bin/agent/src/connection/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl<RES: DeserializeOwned> QuicConnection<RES> {
log::info!("connecting to server {}:{}", url_host, url_port);
let remote = (url_host, url_port).to_socket_addrs()?.next().ok_or(anyhow!("DnsError"))?;

let mut endpoint = Endpoint::client("0.0.0.0:0".parse().expect(""))?;
let mut endpoint = Endpoint::client("0.0.0.0:0".parse().expect("Should parse local addr"))?;
endpoint.set_default_client_config(configure_client(server_certs, allow_quic_insecure)?);

// connect to server
Expand Down
2 changes: 1 addition & 1 deletion bin/agent/src/connection/quic/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum QuicTlsError {

pub fn configure_client(server_certs: &[CertificateDer], allow_quic_insecure: bool) -> Result<ClientConfig, QuicTlsError> {
let mut config = if allow_quic_insecure {
let provider = rustls::crypto::CryptoProvider::get_default().unwrap();
let provider = rustls::crypto::CryptoProvider::get_default().expect("Should get crypto provider");
ClientConfig::new(Arc::new(QuicClientConfig::try_from(
rustls::ClientConfig::builder()
.dangerous()
Expand Down
2 changes: 1 addition & 1 deletion bin/relayer/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ impl<S: AsyncRead + AsyncWrite + Send + Sync + 'static> AgentSession<S> {
pub async fn create_stream(&self) -> anyhow::Result<S> {
let (tx, rx) = oneshot::channel();
self.control_tx.send(AgentSessionControl::CreateStream(tx)).await?;
Ok(rx.await??)
rx.await?
}
}
2 changes: 1 addition & 1 deletion bin/relayer/src/agent/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn run_connection<VALIDATE: ClusterValidator<REQ>, REQ>(validate: Arc<VALI

let req = validate.validate_connect_req(&buf[0..buf_len])?;
let domain = validate.generate_domain(&req)?;
let agent_id = AgentId::from_domain(&domain);
let agent_id = AgentId::try_from_domain(&domain)?;
let session_id = AgentSessionId::rand();

log::info!("[AgentQuic] new connection validated with domain {domain} agent_id: {agent_id}, session uuid: {session_id}");
Expand Down
4 changes: 2 additions & 2 deletions bin/relayer/src/agent/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<VALIDATE: ClusterValidator<REQ>, REQ: DeserializeOwned + Send + Sync + 'sta
let (stream, remote) = incoming?;
tokio::spawn(run_connection(self.validate.clone(), stream, remote, self.internal_tx.clone()));
},
event = self.internal_rx.recv() => break Ok(event.expect("should work")),
event = self.internal_rx.recv() => break Ok(event.expect("should receive event from internal channel")),
}
}
}
Expand All @@ -72,7 +72,7 @@ async fn run_connection<VALIDATE: ClusterValidator<REQ>, REQ>(

let req = validate.validate_connect_req(&buf[0..buf_len])?;
let domain = validate.generate_domain(&req)?;
let agent_id = AgentId::from_domain(&domain);
let agent_id = AgentId::try_from_domain(&domain)?;
let session_id = AgentSessionId::rand();

log::info!("[AgentTcp] new connection validated with domain {domain} agent_id: {agent_id}, session uuid: {session_id}");
Expand Down
16 changes: 11 additions & 5 deletions bin/relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use proxy::{http::HttpDestinationDetector, rtsp::RtspDestinationDetector, tl

const ALIAS_SERVICE: u16 = 0;
const PROXY_TO_AGENT_SERVICE: u16 = 1;
const TUNNEL_TO_CLUSTER_SERIVCE: u16 = 2;
const TUNNEL_TO_CLUSTER_SERVICE: u16 = 2;

#[derive(Clone)]
pub struct TunnelServiceCtx {
Expand Down Expand Up @@ -122,9 +122,9 @@ where

let mut sdn_alias = AliasService::new(sdn.create_service(ALIAS_SERVICE.into()));
let sdn_alias_requester = sdn_alias.requester();
tokio::spawn(async move { while let Ok(_) = sdn_alias.recv().await {} });
tokio::spawn(async move { while sdn_alias.recv().await.is_ok() {} });
let sdn_proxy_service = sdn.create_service(PROXY_TO_AGENT_SERVICE.into());
let sdn_tunnel_service = sdn.create_service(TUNNEL_TO_CLUSTER_SERIVCE.into());
let sdn_tunnel_service = sdn.create_service(TUNNEL_TO_CLUSTER_SERVICE.into());
let tunnel_service_ctx = TunnelServiceCtx {
service: sdn_tunnel_service.requester(),
alias: sdn_alias_requester.clone(),
Expand Down Expand Up @@ -152,7 +152,13 @@ where
}

fn process_proxy<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static>(&mut self, proxy: T, dest: ProxyDestination, is_from_cluster: bool) {

Check warning on line 154 in bin/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

bin/relayer/src/lib.rs#L154

Added line #L154 was not covered by tests
let agent_id = dest.agent_id();
let agent_id = match dest.agent_id() {
Ok(agent_id) => agent_id,
Err(e) => {
log::warn!("[QuicRelayer] proxy to {dest:?} failed to get agent id: {e}");
return;
}
};
if let Some(sessions) = self.agent_tcp_sessions.get(&agent_id) {
let session = sessions.values().next().expect("should have session");
let job = proxy_local_to_agent(is_from_cluster, proxy, dest, session.clone());
Expand Down Expand Up @@ -374,7 +380,7 @@ async fn proxy_to_cluster<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'sta
let started = Instant::now();
counter!(METRICS_PROXY_HTTP_COUNT).increment(1);
counter!(METRICS_TUNNEL_CLUSTER_COUNT).increment(1);

Check warning on line 382 in bin/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

bin/relayer/src/lib.rs#L380-L382

Added lines #L380 - L382 were not covered by tests
let agent_id = dest.agent_id();
let agent_id = dest.agent_id()?;
log::info!("[ProxyCluster] finding location of agent {agent_id}");
let found_location = alias_requeser.find(*agent_id).await.ok_or(anyhow!("ALIAS_NOT_FOUND"))?;
let dest_node = match found_location {
Expand Down
2 changes: 1 addition & 1 deletion bin/relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn main() {
};
let validator = ClusterValidatorImpl::new(args.root_domain);
let mut relayer = QuicRelayer::new(cfg, validator).await.expect("should create relayer");
while let Ok(_) = relayer.recv().await {}
while relayer.recv().await.is_ok() {}
}

struct DummyTunnelHandle;
Expand Down
6 changes: 3 additions & 3 deletions bin/relayer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ pub fn describe_metrics() {
describe_histogram!(METRICS_AGENT_HISTOGRAM, "Incoming agent connection accept time histogram");
describe_counter!(METRICS_AGENT_COUNT, "Number of connected agents");

// this is for proxy from agent counting (incomming)
// this is for proxy from agent counting (incoming)
describe_gauge!(METRICS_PROXY_AGENT_LIVE, "Live incoming proxy from agent to cluster");
describe_counter!(METRICS_PROXY_AGENT_COUNT, "Number of incoming proxy from agent to cluster");
describe_histogram!(METRICS_PROXY_AGENT_HISTOGRAM, "Incoming proxy from agent to cluster latency histogram");
describe_counter!(METRICS_PROXY_AGENT_ERROR_COUNT, "Number of incoming proxy error from agent to cluster");

// this is for http proxy counting (incomming)
// this is for http proxy counting (incoming)
describe_gauge!(METRICS_PROXY_HTTP_LIVE, "Live incoming http proxy");
describe_counter!(METRICS_PROXY_HTTP_COUNT, "Number of incoming http proxy");
describe_counter!(METRICS_PROXY_HTTP_ERROR_COUNT, "Number of incoming http proxy error");

// this is for cluster proxy (incomming)
// this is for cluster proxy (incoming)
describe_gauge!(METRICS_PROXY_CLUSTER_LIVE, "Live incoming cluster proxy");
describe_counter!(METRICS_PROXY_CLUSTER_COUNT, "Number of incoming cluster proxy");
describe_counter!(METRICS_PROXY_CLUSTER_ERROR_COUNT, "Number of incoming cluster proxy error");
Expand Down
4 changes: 2 additions & 2 deletions bin/relayer/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ impl<Detector: ProxyDestinationDetector> ProxyTcpListener<Detector> {
tokio::spawn(async move {
match detector.determine(&mut stream).await {
Ok(destination) => {
log::info!("[ProxyTcpListener] determine destionation {}, service {:?} for remote {remote}", destination.domain, destination.service);
log::info!("[ProxyTcpListener] determine destination {}, service {:?} for remote {remote}", destination.domain, destination.service);
tx.send((destination, stream)).await.expect("tcp listener channel should work");
},
Err(err) => {
log::info!("[ProxyTcpListener] determine destionation for {remote} error {err}");
log::info!("[ProxyTcpListener] determine destination for {remote} error {err}");
},
}
});
Expand Down
2 changes: 1 addition & 1 deletion bin/relayer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn configure_server(priv_key: PrivatePkcs8KeyDer<'static>, cert: CertificateDer<
let cert_chain = vec![cert];

let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key.into())?;
let transport_config = Arc::get_mut(&mut server_config.transport).unwrap();
let transport_config = Arc::get_mut(&mut server_config.transport).expect("Should get transport config mut right after create server config");
transport_config.max_concurrent_uni_streams(0_u8.into());
transport_config.max_idle_timeout(Some(Duration::from_secs(30).try_into().expect("Should config timeout")));

Expand Down
6 changes: 3 additions & 3 deletions crates/cert_utils/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ struct Args {

fn main() {
let args = Args::parse();
let cert = rcgen::generate_simple_self_signed(args.domains).unwrap();
let cert = rcgen::generate_simple_self_signed(args.domains).expect("Should generate cert");
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis();
std::fs::write(format!("./certificate-{}.cert", since_the_epoch), cert.cert.der()).unwrap();
std::fs::write(format!("./certificate-{}.key", since_the_epoch), cert.key_pair.serialize_der()).unwrap();
std::fs::write(format!("./certificate-{}.cert", since_the_epoch), cert.cert.der()).expect("Should write cert");
std::fs::write(format!("./certificate-{}.key", since_the_epoch), cert.key_pair.serialize_der()).expect("Should write key");
}
6 changes: 3 additions & 3 deletions crates/protocol/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct ClusterTunnelRequest {

impl From<&ClusterTunnelRequest> for Vec<u8> {
fn from(resp: &ClusterTunnelRequest) -> Self {
bincode::serialize(resp).expect("Should ok")
bincode::serialize(resp).expect("Should serialize cluster tunnel request")
}
}

Expand All @@ -29,7 +29,7 @@ pub struct ClusterTunnelResponse {

impl From<&ClusterTunnelResponse> for Vec<u8> {
fn from(resp: &ClusterTunnelResponse) -> Self {
bincode::serialize(resp).expect("Should ok")
bincode::serialize(resp).expect("Should serialize cluster tunnel response")
}
}

Expand All @@ -50,7 +50,7 @@ pub struct AgentTunnelRequest {

impl From<&AgentTunnelRequest> for Vec<u8> {
fn from(resp: &AgentTunnelRequest) -> Self {
bincode::serialize(resp).expect("Should ok")
bincode::serialize(resp).expect("Should serialize agent tunnel request")
}
}

Expand Down
10 changes: 6 additions & 4 deletions crates/protocol/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Display;

use anyhow::anyhow;
use derive_more::derive::{Deref, From};
use serde::{Deserialize, Serialize};

Expand All @@ -13,8 +14,9 @@ impl Display for AgentId {
}

impl AgentId {
pub fn from_domain(domain: &str) -> Self {
Self(u64::from_be_bytes(domain.as_bytes()[0..8].try_into().expect("should convert to u64")))
pub fn try_from_domain(domain: &str) -> anyhow::Result<Self> {
let (first, _) = domain.as_bytes().split_at_checked(8).ok_or(anyhow!("domain should be at least 8 bytes"))?;
Ok(Self(u64::from_be_bytes(first.try_into()?)))
}
}

Expand All @@ -26,7 +28,7 @@ pub struct ProxyDestination {
}

impl ProxyDestination {
pub fn agent_id(&self) -> AgentId {
AgentId::from_domain(&self.domain)
pub fn agent_id(&self) -> anyhow::Result<AgentId> {
AgentId::try_from_domain(&self.domain)
}
}
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ allow = [
"Unicode-DFS-2016",
"WTFPL",
"OpenSSL",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [
Expand Down

0 comments on commit e1d7854

Please sign in to comment.