Skip to content

Commit

Permalink
fix: alias not release bug (#77)
Browse files Browse the repository at this point in the history
* fix: alias not release because AgentSession is Clone, move AliasGuard outside AgentSession

* fix license deny
  • Loading branch information
giangndm authored Nov 7, 2024
1 parent 0a04c61 commit 79d8a96
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 34 deletions.
11 changes: 2 additions & 9 deletions bin/relayer/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::sync::Arc;

use derive_more::derive::{Deref, Display, From};
use p2p::alias_service::AliasGuard;
use protocol::proxy::AgentId;
use tokio::{
io::{AsyncRead, AsyncWrite},
Expand Down Expand Up @@ -30,7 +27,6 @@ pub struct AgentSession<S> {
session_id: AgentSessionId,
domain: String,
control_tx: Sender<AgentSessionControl<S>>,
alias_guard: Option<Arc<AliasGuard>>,
}

impl<S> AgentSession<S> {
Expand All @@ -40,13 +36,11 @@ impl<S> AgentSession<S> {
session_id,
domain,
control_tx,
alias_guard: None,
}
}

/// We keep this alias guard here fore automatic unregister when session destroyed
pub(super) fn set_alias_guard(&mut self, alias: AliasGuard) {
self.alias_guard = Some(alias.into());
pub fn agent_id(&self) -> AgentId {
self.agent_id
}
}

Expand All @@ -57,7 +51,6 @@ impl<S> Clone for AgentSession<S> {
session_id: self.session_id,
domain: self.domain.clone(),
control_tx: self.control_tx.clone(),
alias_guard: self.alias_guard.clone(),
}
}
}
Expand Down
49 changes: 24 additions & 25 deletions bin/relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use agent::{
};
use anyhow::anyhow;
use p2p::{
alias_service::{AliasService, AliasServiceRequester},
alias_service::{AliasGuard, AliasService, AliasServiceRequester},
HandshakeProtocol, P2pNetwork, P2pNetworkConfig, P2pService, P2pServiceEvent, P2pServiceRequester, PeerAddress, PeerId,
};
use protocol::{
Expand Down Expand Up @@ -96,8 +96,8 @@ pub struct QuicRelayer<SECURE, VALIDATE, REQ, TSH> {
tunnel_service_ctx: TunnelServiceCtx,
tunnel_service_handle: TSH,

agent_quic_sessions: HashMap<AgentId, HashMap<AgentSessionId, AgentSession<TunnelQuicStream>>>,
agent_tcp_sessions: HashMap<AgentId, HashMap<AgentSessionId, AgentSession<TunnelTcpStream>>>,
agent_quic_sessions: HashMap<AgentId, HashMap<AgentSessionId, (AgentSession<TunnelQuicStream>, AliasGuard)>>,
agent_tcp_sessions: HashMap<AgentId, HashMap<AgentSessionId, (AgentSession<TunnelTcpStream>, AliasGuard)>>,
}

impl<SECURE, VALIDATE, REQ, TSH> QuicRelayer<SECURE, VALIDATE, REQ, TSH>
Expand Down Expand Up @@ -161,22 +161,22 @@ where
};
if let Some(sessions) = self.agent_tcp_sessions.get(&agent_id) {
let session = sessions.values().next().expect("should have session");
let job = proxy_local_to_agent(is_from_cluster, proxy, dest, session.clone());
let job = proxy_local_to_agent(is_from_cluster, proxy, dest, session.0.clone());
tokio::spawn(async move {
if let Err(e) = job.await {
counter!(METRICS_PROXY_HTTP_ERROR_COUNT).increment(1);
counter!(METRICS_TUNNEL_AGENT_ERROR_COUNT).increment(1);
log::error!("[QuicRelayer] proxy to agent error {:?}", e);
log::error!("[QuicRelayer {agent_id}] proxy to agent error {:?}", e);
};
});
} else if let Some(sessions) = self.agent_quic_sessions.get(&agent_id) {
let session = sessions.values().next().expect("should have session");
let job = proxy_local_to_agent(is_from_cluster, proxy, dest, session.clone());
let job = proxy_local_to_agent(is_from_cluster, proxy, dest, session.0.clone());
tokio::spawn(async move {
if let Err(e) = job.await {
counter!(METRICS_PROXY_HTTP_ERROR_COUNT).increment(1);
counter!(METRICS_TUNNEL_AGENT_ERROR_COUNT).increment(1);
log::error!("[QuicRelayer] proxy to agent error {:?}", e);
log::error!("[QuicRelayer {agent_id}] proxy to agent error {:?}", e);
};
});
} else if !is_from_cluster {
Expand All @@ -187,11 +187,11 @@ where
if let Err(e) = job.await {
counter!(METRICS_PROXY_HTTP_ERROR_COUNT).increment(1);
counter!(METRICS_TUNNEL_CLUSTER_ERROR_COUNT).increment(1);
log::error!("[QuicRelayer] proxy to cluster error {:?}", e);
log::error!("[QuicRelayer {agent_id}] proxy to cluster error {:?}", e);
};
});
} else {
log::warn!("[QuicRelayer] proxy to {dest:?} not match any kind");
log::warn!("[QuicRelayer {agent_id}] proxy to {dest:?} not match any kind");
counter!(METRICS_PROXY_CLUSTER_ERROR_COUNT).increment(1);
counter!(METRICS_TUNNEL_AGENT_ERROR_COUNT).increment(1);
}
Expand Down Expand Up @@ -227,14 +227,13 @@ where
Ok(QuicRelayerEvent::Continue)
},
event = self.agent_quic.recv() => match event? {
AgentListenerEvent::Connected(agent_id, mut agent_session) => {
AgentListenerEvent::Connected(agent_id, agent_session) => {
counter!(METRICS_AGENT_COUNT).increment(1);
let session_id = agent_session.session_id();
let domain = agent_session.domain().to_owned();
log::info!("[QuicRelayer] agent {agent_id} {} connected", agent_session.session_id());
let alias = self.sdn_alias_requester.register(*agent_id);
agent_session.set_alias_guard(alias);
self.agent_quic_sessions.entry(agent_id).or_default().insert(agent_session.session_id(), agent_session);
self.agent_quic_sessions.entry(agent_id).or_default().insert(agent_session.session_id(), (agent_session, alias));
gauge!(METRICS_AGENT_LIVE).increment(1.0);
Ok(QuicRelayerEvent::AgentConnected(agent_id, session_id, domain))
},
Expand All @@ -256,14 +255,13 @@ where
},
},
event = self.agent_tcp.recv() => match event? {
AgentListenerEvent::Connected(agent_id, mut agent_session) => {
AgentListenerEvent::Connected(agent_id, agent_session) => {
counter!(METRICS_AGENT_COUNT).increment(1);
log::info!("[QuicRelayer] agent {agent_id} {} connected", agent_session.session_id());
let session_id = agent_session.session_id();
let domain = agent_session.domain().to_owned();
let alias = self.sdn_alias_requester.register(*agent_id);
agent_session.set_alias_guard(alias);
self.agent_tcp_sessions.entry(agent_id).or_default().insert(agent_session.session_id(), agent_session);
self.agent_tcp_sessions.entry(agent_id).or_default().insert(agent_session.session_id(), (agent_session, alias));
gauge!(METRICS_AGENT_LIVE).increment(1.0);
Ok(QuicRelayerEvent::AgentConnected(agent_id, session_id, domain))
},
Expand Down Expand Up @@ -322,18 +320,19 @@ async fn proxy_local_to_agent<T: AsyncRead + AsyncWrite + Send + Sync + Unpin +
dest: ProxyDestination,
agent: AgentSession<S>,
) -> anyhow::Result<()> {
let agent_id = agent.agent_id();
let started = Instant::now();
if is_from_cluster {
counter!(METRICS_PROXY_CLUSTER_COUNT).increment(1);
} else {
counter!(METRICS_PROXY_HTTP_COUNT).increment(1);
}
counter!(METRICS_TUNNEL_AGENT_COUNT).increment(1);
log::info!("[ProxyLocal] creating stream to agent");
log::info!("[ProxyLocal {agent_id}] creating stream to agent");
let mut stream = agent.create_stream().await?;

histogram!(METRICS_TUNNEL_AGENT_HISTOGRAM).record(started.elapsed().as_millis() as f32 / 1000.0);
log::info!("[ProxyLocal] created stream to agent => writing connect request");
log::info!("[ProxyLocal {agent_id}] created stream to agent => writing connect request");
write_object::<_, _, 500>(
&mut stream,
&AgentTunnelRequest {
Expand All @@ -344,7 +343,7 @@ async fn proxy_local_to_agent<T: AsyncRead + AsyncWrite + Send + Sync + Unpin +
)
.await?;

log::info!("[ProxyLocal] proxy data with agent ...");
log::info!("[ProxyLocal {agent_id}] proxy data with agent ...");

gauge!(METRICS_TUNNEL_AGENT_LIVE).increment(1.0);
if is_from_cluster {
Expand All @@ -354,10 +353,10 @@ async fn proxy_local_to_agent<T: AsyncRead + AsyncWrite + Send + Sync + Unpin +
}
match copy_bidirectional(&mut proxy, &mut stream).await {
Ok(res) => {
log::info!("[ProxyLocal] proxy data with agent done with res {res:?}");
log::info!("[ProxyLocal {agent_id}] proxy data with agent done with res {res:?}");
}
Err(e) => {
log::error!("[ProxyLocal] proxy data with agent error {e}");
log::error!("[ProxyLocal {agent_id}] proxy data with agent error {e}");
}
};

Expand All @@ -381,30 +380,30 @@ async fn proxy_to_cluster<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'sta
counter!(METRICS_PROXY_HTTP_COUNT).increment(1);
counter!(METRICS_TUNNEL_CLUSTER_COUNT).increment(1);
let agent_id = dest.agent_id()?;
log::info!("[ProxyCluster] finding location of agent {agent_id}");
log::info!("[ProxyCluster {agent_id}] finding location of agent {agent_id}");
let found_location = alias_requeser.find(*agent_id).await.ok_or(anyhow!("ALIAS_NOT_FOUND"))?;
let dest_node = match found_location {
p2p::alias_service::AliasFoundLocation::Local => return Err(anyhow!("wrong alias context, cluster shouldn't in local")),
p2p::alias_service::AliasFoundLocation::Hint(dest) => dest,
p2p::alias_service::AliasFoundLocation::Scan(dest) => dest,
};
log::info!("[ProxyCluster] found location of agent {agent_id}: {found_location:?} => opening cluster connection to {dest_node}");
log::info!("[ProxyCluster {agent_id}] found location of agent {agent_id}: {found_location:?} => opening cluster connection to {dest_node}");

let meta = bincode::serialize(&dest).expect("should convert ProxyDestination to bytes");

let mut stream = sdn_requester.open_stream(dest_node, meta).await?;
histogram!(METRICS_TUNNEL_CLUSTER_HISTOGRAM).record(started.elapsed().as_millis() as f32 / 1000.0);

log::info!("[ProxyLocal] proxy over {dest_node} ...");
log::info!("[ProxyCluster {agent_id}] proxy over {dest_node} ...");
gauge!(METRICS_TUNNEL_CLUSTER_LIVE).increment(1.0);
gauge!(METRICS_PROXY_HTTP_LIVE).increment(1.0);

match copy_bidirectional(&mut proxy, &mut stream).await {
Ok(res) => {
log::info!("[ProxyLocal] proxy over {dest_node} done with res {res:?}");
log::info!("[ProxyCluster {agent_id}] proxy over {dest_node} done with res {res:?}");
}
Err(e) => {
log::error!("[ProxyLocal] proxy over {dest_node} error {e}");
log::error!("[ProxyCluster {agent_id}] proxy over {dest_node} error {e}");
}
}

Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ allow = [
"WTFPL",
"OpenSSL",
"Zlib",
"Unicode-3.0"
]
confidence-threshold = 0.8
exceptions = [
Expand Down

0 comments on commit 79d8a96

Please sign in to comment.