From cef67b6cb1703fa4c907ce09c3fa1d4bf0880614 Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Sat, 28 Sep 2024 01:39:51 +0700 Subject: [PATCH 1/5] fix remove worker after instantly agent is reconnected --- crates/relayer/src/agent_listener.rs | 1 + crates/relayer/src/agent_listener/quic.rs | 13 ++++++++-- crates/relayer/src/agent_listener/tcp.rs | 8 +++++- crates/relayer/src/agent_store.rs | 30 ++++++++++++++++------- crates/relayer/src/lib.rs | 18 ++++++++++---- crates/relayer/src/utils.rs | 9 +++++++ 6 files changed, 62 insertions(+), 17 deletions(-) diff --git a/crates/relayer/src/agent_listener.rs b/crates/relayer/src/agent_listener.rs index 05220cc..2c88981 100644 --- a/crates/relayer/src/agent_listener.rs +++ b/crates/relayer/src/agent_listener.rs @@ -16,6 +16,7 @@ pub trait AgentConnection, R: AsyncRead + Unpin, W: Send + Sync { fn domain(&self) -> String; + fn conn_id(&self) -> u64; async fn create_sub_connection(&mut self) -> Result>; async fn recv(&mut self) -> Result>; } diff --git a/crates/relayer/src/agent_listener/quic.rs b/crates/relayer/src/agent_listener/quic.rs index 0f879dd..3df623f 100644 --- a/crates/relayer/src/agent_listener/quic.rs +++ b/crates/relayer/src/agent_listener/quic.rs @@ -14,7 +14,7 @@ use quinn::{Endpoint, RecvStream, SendStream, ServerConfig}; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use serde::de::DeserializeOwned; -use crate::METRICS_AGENT_HISTOGRAM; +use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM}; use super::{AgentConnection, AgentListener, AgentSubConnection}; @@ -96,7 +96,11 @@ impl AgentQuicListener { log::info!("register request domain {}", domain); let res_buf = cluster_validator.sign_response_res(&request, None); send.write_all(&res_buf).await?; - Ok(AgentQuicConnection { domain, conn }) + Ok(AgentQuicConnection { + domain, + conn, + conn_id: now_ms(), + }) } Err(e) => { log::error!("invalid register request {:?}, error {}", request, e); @@ -125,6 +129,7 @@ impl pub struct AgentQuicConnection { domain: String, + conn_id: u64, conn: quinn::Connection, } @@ -134,6 +139,10 @@ impl AgentConnection for AgentQu self.domain.clone() } + fn conn_id(&self) -> u64 { + self.conn_id + } + async fn create_sub_connection(&mut self) -> Result> { let (send, recv) = self.conn.open_bi().await?; Ok(AgentQuicSubConnection { send, recv }) diff --git a/crates/relayer/src/agent_listener/tcp.rs b/crates/relayer/src/agent_listener/tcp.rs index 1da0136..e170ed0 100644 --- a/crates/relayer/src/agent_listener/tcp.rs +++ b/crates/relayer/src/agent_listener/tcp.rs @@ -16,7 +16,7 @@ use metrics::histogram; use protocol::key::ClusterValidator; use serde::de::DeserializeOwned; -use crate::METRICS_AGENT_HISTOGRAM; +use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM}; use super::{AgentConnection, AgentListener, AgentSubConnection}; @@ -53,6 +53,7 @@ impl, REQ: DeserializeOwned + Debug> stream.write_all(&res).await?; Ok(AgentTcpConnection { domain, + conn_id: now_ms(), connection: yamux::Connection::new( stream, Default::default(), @@ -111,6 +112,7 @@ impl< pub struct AgentTcpConnection { domain: String, + conn_id: u64, connection: yamux::Connection, } @@ -122,6 +124,10 @@ impl AgentConnection, WriteHalf u64 { + self.conn_id + } + async fn create_sub_connection(&mut self) -> Result> { let client = OpenStreamsClient { connection: &mut self.connection, diff --git a/crates/relayer/src/agent_store.rs b/crates/relayer/src/agent_store.rs index 83fc012..4d124cf 100644 --- a/crates/relayer/src/agent_store.rs +++ b/crates/relayer/src/agent_store.rs @@ -7,18 +7,23 @@ use async_std::channel::Sender; use crate::ProxyTunnel; +pub struct AgentEntry { + pub session: u64, + pub tx: Sender>, +} + #[derive(Clone, Default)] pub struct AgentStore { #[allow(clippy::type_complexity)] - agents: Arc>>>>, + agents: Arc>>, } impl AgentStore { - pub fn add(&self, id: u64, tx: Sender>) { + pub fn add(&self, id: u64, session: u64, tx: Sender>) { self.agents .write() .expect("Should write agents") - .insert(id, tx); + .insert(id, AgentEntry { tx, session }); } pub fn get(&self, id: u64) -> Option>> { @@ -26,13 +31,20 @@ impl AgentStore { .read() .expect("Should write agents") .get(&id) - .cloned() + .map(|entry| entry.tx.clone()) } - pub fn remove(&self, id: u64) { - self.agents - .write() - .expect("Should write agents") - .remove(&id); + pub fn remove(&self, id: u64, session: u64) -> bool { + let mut storage = self.agents.write().expect("Should write agents"); + + let current = storage.get(&id); + if let Some(entry) = current { + if entry.session == session { + storage.remove(&id); + return true; + } + } + + false } } diff --git a/crates/relayer/src/lib.rs b/crates/relayer/src/lib.rs index 2266d4d..a1c879c 100644 --- a/crates/relayer/src/lib.rs +++ b/crates/relayer/src/lib.rs @@ -79,10 +79,12 @@ pub async fn run_agent_connection( counter!(METRICS_AGENT_COUNT).increment(1); log::info!("agent_connection.domain(): {}", agent_connection.domain()); let domain = agent_connection.domain().to_string(); + let conn_id = agent_connection.conn_id(); let (mut agent_worker, proxy_tunnel_tx) = agent_worker::AgentWorker::::new(agent_connection, agent_rpc_handler); let home_id = home_id_from_domain(&domain); - agents.add(home_id, proxy_tunnel_tx); + log::info!("added home_id: {}, conn_id: {}", home_id, conn_id); + agents.add(home_id, conn_id, proxy_tunnel_tx); node_alias_sdk.register_alias(home_id).await; let agents = agents.clone(); gauge!(METRICS_AGENT_LIVE).increment(1.0); @@ -96,10 +98,16 @@ pub async fn run_agent_connection( } } } - agents.remove(home_id); - node_alias_sdk - .unregister_alias(home_id_from_domain(&domain)) - .await; + let removed = agents.remove(home_id, conn_id); + log::info!( + "remove home_id: {}, conn_id: {}, removed: {}", + home_id, + conn_id, + removed + ); + if removed { + node_alias_sdk.unregister_alias(home_id).await; + } log::info!("agent_worker exit for domain: {}", domain); gauge!(METRICS_AGENT_LIVE).decrement(1.0); } diff --git a/crates/relayer/src/utils.rs b/crates/relayer/src/utils.rs index 6169986..0feee78 100644 --- a/crates/relayer/src/utils.rs +++ b/crates/relayer/src/utils.rs @@ -1,6 +1,7 @@ use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, + time::{SystemTime, UNIX_EPOCH}, }; /// get home id from domain by get first subdomain @@ -10,3 +11,11 @@ pub fn home_id_from_domain(domain: &str) -> u64 { parts.next().unwrap_or(domain).hash(&mut hasher); hasher.finish() } + +pub fn now_ms() -> u64 { + let start = SystemTime::now(); + start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64 +} From 15cbc19be7f253b3241e379fa2fb72ce45295592 Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Sat, 28 Sep 2024 23:13:36 +0700 Subject: [PATCH 2/5] fix: standardlize session to conn_id and generate random conn_id --- crates/relayer/Cargo.toml | 1 + crates/relayer/src/agent_listener/quic.rs | 4 ++-- crates/relayer/src/agent_listener/tcp.rs | 4 ++-- crates/relayer/src/agent_store.rs | 10 +++++----- crates/relayer/src/utils.rs | 9 --------- 5 files changed, 10 insertions(+), 18 deletions(-) diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index af5c730..4ebd06b 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -26,6 +26,7 @@ rustls = { workspace = true, features = ["ring", "std"] } atm0s-sdn = { workspace = true } rtsp-types = { workspace = true } local-ip-address = { workspace = true } +rand = { workspace = true } [features] default = ["binary"] diff --git a/crates/relayer/src/agent_listener/quic.rs b/crates/relayer/src/agent_listener/quic.rs index 3df623f..e8ee3de 100644 --- a/crates/relayer/src/agent_listener/quic.rs +++ b/crates/relayer/src/agent_listener/quic.rs @@ -14,7 +14,7 @@ use quinn::{Endpoint, RecvStream, SendStream, ServerConfig}; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use serde::de::DeserializeOwned; -use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM}; +use crate::METRICS_AGENT_HISTOGRAM; use super::{AgentConnection, AgentListener, AgentSubConnection}; @@ -99,7 +99,7 @@ impl AgentQuicListener { Ok(AgentQuicConnection { domain, conn, - conn_id: now_ms(), + conn_id: rand::random(), }) } Err(e) => { diff --git a/crates/relayer/src/agent_listener/tcp.rs b/crates/relayer/src/agent_listener/tcp.rs index e170ed0..be4df22 100644 --- a/crates/relayer/src/agent_listener/tcp.rs +++ b/crates/relayer/src/agent_listener/tcp.rs @@ -16,7 +16,7 @@ use metrics::histogram; use protocol::key::ClusterValidator; use serde::de::DeserializeOwned; -use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM}; +use crate::METRICS_AGENT_HISTOGRAM; use super::{AgentConnection, AgentListener, AgentSubConnection}; @@ -53,7 +53,7 @@ impl, REQ: DeserializeOwned + Debug> stream.write_all(&res).await?; Ok(AgentTcpConnection { domain, - conn_id: now_ms(), + conn_id: rand::random(), connection: yamux::Connection::new( stream, Default::default(), diff --git a/crates/relayer/src/agent_store.rs b/crates/relayer/src/agent_store.rs index 4d124cf..944ec9d 100644 --- a/crates/relayer/src/agent_store.rs +++ b/crates/relayer/src/agent_store.rs @@ -8,7 +8,7 @@ use async_std::channel::Sender; use crate::ProxyTunnel; pub struct AgentEntry { - pub session: u64, + pub conn_id: u64, pub tx: Sender>, } @@ -19,11 +19,11 @@ pub struct AgentStore { } impl AgentStore { - pub fn add(&self, id: u64, session: u64, tx: Sender>) { + pub fn add(&self, id: u64, conn_id: u64, tx: Sender>) { self.agents .write() .expect("Should write agents") - .insert(id, AgentEntry { tx, session }); + .insert(id, AgentEntry { tx, conn_id }); } pub fn get(&self, id: u64) -> Option>> { @@ -34,12 +34,12 @@ impl AgentStore { .map(|entry| entry.tx.clone()) } - pub fn remove(&self, id: u64, session: u64) -> bool { + pub fn remove(&self, id: u64, conn_id: u64) -> bool { let mut storage = self.agents.write().expect("Should write agents"); let current = storage.get(&id); if let Some(entry) = current { - if entry.session == session { + if entry.conn_id == conn_id { storage.remove(&id); return true; } diff --git a/crates/relayer/src/utils.rs b/crates/relayer/src/utils.rs index 0feee78..6169986 100644 --- a/crates/relayer/src/utils.rs +++ b/crates/relayer/src/utils.rs @@ -1,7 +1,6 @@ use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, - time::{SystemTime, UNIX_EPOCH}, }; /// get home id from domain by get first subdomain @@ -11,11 +10,3 @@ pub fn home_id_from_domain(domain: &str) -> u64 { parts.next().unwrap_or(domain).hash(&mut hasher); hasher.finish() } - -pub fn now_ms() -> u64 { - let start = SystemTime::now(); - start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64 -} From 95625f78afc0bc50c8e20f6dd0f7bb482e0594e8 Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Sun, 29 Sep 2024 00:03:39 +0700 Subject: [PATCH 3/5] chrone: add log warn when add new agent to store --- crates/relayer/src/agent_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/relayer/src/agent_store.rs b/crates/relayer/src/agent_store.rs index 944ec9d..c43ddcb 100644 --- a/crates/relayer/src/agent_store.rs +++ b/crates/relayer/src/agent_store.rs @@ -20,6 +20,7 @@ pub struct AgentStore { impl AgentStore { pub fn add(&self, id: u64, conn_id: u64, tx: Sender>) { + log::warn!("add new connection for agent {id}, old connection will deactive"); self.agents .write() .expect("Should write agents") From cf6d2bb2516798a9fa7c2fc8dcc1d5dd14ebb3ce Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Sun, 29 Sep 2024 00:08:52 +0700 Subject: [PATCH 4/5] chrone: add log warn and close channel old agent when have new connection --- crates/relayer/src/agent_store.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/relayer/src/agent_store.rs b/crates/relayer/src/agent_store.rs index c43ddcb..a72cfeb 100644 --- a/crates/relayer/src/agent_store.rs +++ b/crates/relayer/src/agent_store.rs @@ -20,11 +20,18 @@ pub struct AgentStore { impl AgentStore { pub fn add(&self, id: u64, conn_id: u64, tx: Sender>) { - log::warn!("add new connection for agent {id}, old connection will deactive"); - self.agents + if let Some(agent) = self + .agents .write() .expect("Should write agents") - .insert(id, AgentEntry { tx, conn_id }); + .insert(id, AgentEntry { tx, conn_id }) + { + log::warn!( + "add new connection for agent {id}, old connection {} will deactive", + agent.conn_id + ); + agent.tx.close(); + } } pub fn get(&self, id: u64) -> Option>> { From 62965aee5035bd813ab4c73ae828a239ffd11adb Mon Sep 17 00:00:00 2001 From: shadowWalker Date: Sun, 29 Sep 2024 00:11:40 +0700 Subject: [PATCH 5/5] fix typos --- crates/relayer/src/agent_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/relayer/src/agent_store.rs b/crates/relayer/src/agent_store.rs index a72cfeb..319b3b7 100644 --- a/crates/relayer/src/agent_store.rs +++ b/crates/relayer/src/agent_store.rs @@ -27,7 +27,7 @@ impl AgentStore { .insert(id, AgentEntry { tx, conn_id }) { log::warn!( - "add new connection for agent {id}, old connection {} will deactive", + "add new connection for agent {id}, old connection {} will deactivate", agent.conn_id ); agent.tx.close();