Skip to content

Commit

Permalink
fix remove worker after instantly agent is reconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
marverlous811 committed Sep 27, 2024
1 parent 707e542 commit cef67b6
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 17 deletions.
1 change: 1 addition & 0 deletions crates/relayer/src/agent_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub trait AgentConnection<S: AgentSubConnection<R, W>, R: AsyncRead + Unpin, W:
Send + Sync
{
fn domain(&self) -> String;
fn conn_id(&self) -> u64;
async fn create_sub_connection(&mut self) -> Result<S, Box<dyn Error>>;
async fn recv(&mut self) -> Result<S, Box<dyn Error>>;
}
Expand Down
13 changes: 11 additions & 2 deletions crates/relayer/src/agent_listener/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use quinn::{Endpoint, RecvStream, SendStream, ServerConfig};
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::de::DeserializeOwned;

use crate::METRICS_AGENT_HISTOGRAM;
use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM};

use super::{AgentConnection, AgentListener, AgentSubConnection};

Expand Down Expand Up @@ -96,7 +96,11 @@ impl<REQ: DeserializeOwned + Debug> AgentQuicListener<REQ> {
log::info!("register request domain {}", domain);
let res_buf = cluster_validator.sign_response_res(&request, None);
send.write_all(&res_buf).await?;
Ok(AgentQuicConnection { domain, conn })
Ok(AgentQuicConnection {
domain,
conn,
conn_id: now_ms(),
})

Check warning on line 103 in crates/relayer/src/agent_listener/quic.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/quic.rs#L99-L103

Added lines #L99 - L103 were not covered by tests
}
Err(e) => {
log::error!("invalid register request {:?}, error {}", request, e);
Expand Down Expand Up @@ -125,6 +129,7 @@ impl<REQ: DeserializeOwned + Send + Sync + Debug>

pub struct AgentQuicConnection {
domain: String,
conn_id: u64,
conn: quinn::Connection,
}

Expand All @@ -134,6 +139,10 @@ impl AgentConnection<AgentQuicSubConnection, RecvStream, SendStream> for AgentQu
self.domain.clone()
}

fn conn_id(&self) -> u64 {
self.conn_id
}

Check warning on line 144 in crates/relayer/src/agent_listener/quic.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/quic.rs#L142-L144

Added lines #L142 - L144 were not covered by tests

async fn create_sub_connection(&mut self) -> Result<AgentQuicSubConnection, Box<dyn Error>> {
let (send, recv) = self.conn.open_bi().await?;
Ok(AgentQuicSubConnection { send, recv })
Expand Down
8 changes: 7 additions & 1 deletion crates/relayer/src/agent_listener/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use metrics::histogram;
use protocol::key::ClusterValidator;
use serde::de::DeserializeOwned;

use crate::METRICS_AGENT_HISTOGRAM;
use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM};

use super::{AgentConnection, AgentListener, AgentSubConnection};

Expand Down Expand Up @@ -53,6 +53,7 @@ impl<VALIDATE: ClusterValidator<REQ>, REQ: DeserializeOwned + Debug>
stream.write_all(&res).await?;
Ok(AgentTcpConnection {
domain,
conn_id: now_ms(),

Check warning on line 56 in crates/relayer/src/agent_listener/tcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/tcp.rs#L56

Added line #L56 was not covered by tests
connection: yamux::Connection::new(
stream,
Default::default(),
Expand Down Expand Up @@ -111,6 +112,7 @@ impl<

pub struct AgentTcpConnection {
domain: String,
conn_id: u64,
connection: yamux::Connection<TcpStream>,
}

Expand All @@ -122,6 +124,10 @@ impl AgentConnection<AgentTcpSubConnection, ReadHalf<yamux::Stream>, WriteHalf<y
self.domain.clone()
}

fn conn_id(&self) -> u64 {
self.conn_id
}

Check warning on line 129 in crates/relayer/src/agent_listener/tcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/tcp.rs#L127-L129

Added lines #L127 - L129 were not covered by tests

async fn create_sub_connection(&mut self) -> Result<AgentTcpSubConnection, Box<dyn Error>> {
let client = OpenStreamsClient {
connection: &mut self.connection,
Expand Down
30 changes: 21 additions & 9 deletions crates/relayer/src/agent_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,44 @@ use async_std::channel::Sender;

use crate::ProxyTunnel;

pub struct AgentEntry {
pub session: u64,
pub tx: Sender<Box<dyn ProxyTunnel>>,
}

#[derive(Clone, Default)]
pub struct AgentStore {
#[allow(clippy::type_complexity)]
agents: Arc<RwLock<HashMap<u64, Sender<Box<dyn ProxyTunnel>>>>>,
agents: Arc<RwLock<HashMap<u64, AgentEntry>>>,
}

impl AgentStore {
pub fn add(&self, id: u64, tx: Sender<Box<dyn ProxyTunnel>>) {
pub fn add(&self, id: u64, session: u64, tx: Sender<Box<dyn ProxyTunnel>>) {

Check warning on line 22 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L22

Added line #L22 was not covered by tests
self.agents
.write()
.expect("Should write agents")
.insert(id, tx);
.insert(id, AgentEntry { tx, session });

Check warning on line 26 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L26

Added line #L26 was not covered by tests
}

pub fn get(&self, id: u64) -> Option<Sender<Box<dyn ProxyTunnel>>> {
self.agents
.read()
.expect("Should write agents")
.get(&id)
.cloned()
.map(|entry| entry.tx.clone())

Check warning on line 34 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L34

Added line #L34 was not covered by tests
}

pub fn remove(&self, id: u64) {
self.agents
.write()
.expect("Should write agents")
.remove(&id);
pub fn remove(&self, id: u64, session: u64) -> bool {
let mut storage = self.agents.write().expect("Should write agents");

let current = storage.get(&id);
if let Some(entry) = current {
if entry.session == session {
storage.remove(&id);
return true;
}
}

Check warning on line 46 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L37-L46

Added lines #L37 - L46 were not covered by tests

false

Check warning on line 48 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L48

Added line #L48 was not covered by tests
}
}
18 changes: 13 additions & 5 deletions crates/relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ pub async fn run_agent_connection<AG, S, R, W>(
counter!(METRICS_AGENT_COUNT).increment(1);
log::info!("agent_connection.domain(): {}", agent_connection.domain());
let domain = agent_connection.domain().to_string();
let conn_id = agent_connection.conn_id();

Check warning on line 82 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L82

Added line #L82 was not covered by tests
let (mut agent_worker, proxy_tunnel_tx) =
agent_worker::AgentWorker::<AG, S, R, W>::new(agent_connection, agent_rpc_handler);
let home_id = home_id_from_domain(&domain);
agents.add(home_id, proxy_tunnel_tx);
log::info!("added home_id: {}, conn_id: {}", home_id, conn_id);
agents.add(home_id, conn_id, proxy_tunnel_tx);

Check warning on line 87 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L86-L87

Added lines #L86 - L87 were not covered by tests
node_alias_sdk.register_alias(home_id).await;
let agents = agents.clone();
gauge!(METRICS_AGENT_LIVE).increment(1.0);
Expand All @@ -96,10 +98,16 @@ pub async fn run_agent_connection<AG, S, R, W>(
}
}
}
agents.remove(home_id);
node_alias_sdk
.unregister_alias(home_id_from_domain(&domain))
.await;
let removed = agents.remove(home_id, conn_id);
log::info!(
"remove home_id: {}, conn_id: {}, removed: {}",

Check warning on line 103 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L101-L103

Added lines #L101 - L103 were not covered by tests
home_id,
conn_id,
removed
);
if removed {
node_alias_sdk.unregister_alias(home_id).await;
}

Check warning on line 110 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L108-L110

Added lines #L108 - L110 were not covered by tests
log::info!("agent_worker exit for domain: {}", domain);
gauge!(METRICS_AGENT_LIVE).decrement(1.0);
}
9 changes: 9 additions & 0 deletions crates/relayer/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
time::{SystemTime, UNIX_EPOCH},
};

/// get home id from domain by get first subdomain
Expand All @@ -10,3 +11,11 @@ pub fn home_id_from_domain(domain: &str) -> u64 {
parts.next().unwrap_or(domain).hash(&mut hasher);
hasher.finish()
}

pub fn now_ms() -> u64 {
let start = SystemTime::now();
start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64
}

Check warning on line 21 in crates/relayer/src/utils.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/utils.rs#L15-L21

Added lines #L15 - L21 were not covered by tests

0 comments on commit cef67b6

Please sign in to comment.