Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix remove worker after instantly agent is reconnected #62

Merged
merged 5 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/relayer/src/agent_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub trait AgentConnection<S: AgentSubConnection<R, W>, R: AsyncRead + Unpin, W:
Send + Sync
{
fn domain(&self) -> String;
fn conn_id(&self) -> u64;
async fn create_sub_connection(&mut self) -> Result<S, Box<dyn Error>>;
async fn recv(&mut self) -> Result<S, Box<dyn Error>>;
}
Expand Down
13 changes: 11 additions & 2 deletions crates/relayer/src/agent_listener/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::de::DeserializeOwned;

use crate::METRICS_AGENT_HISTOGRAM;
use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM};

use super::{AgentConnection, AgentListener, AgentSubConnection};

Expand Down Expand Up @@ -96,7 +96,11 @@
log::info!("register request domain {}", domain);
let res_buf = cluster_validator.sign_response_res(&request, None);
send.write_all(&res_buf).await?;
Ok(AgentQuicConnection { domain, conn })
Ok(AgentQuicConnection {
domain,
conn,
conn_id: now_ms(),
})

Check warning on line 103 in crates/relayer/src/agent_listener/quic.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/quic.rs#L99-L103

Added lines #L99 - L103 were not covered by tests
}
Err(e) => {
log::error!("invalid register request {:?}, error {}", request, e);
Expand Down Expand Up @@ -125,6 +129,7 @@

pub struct AgentQuicConnection {
domain: String,
conn_id: u64,
conn: quinn::Connection,
}

Expand All @@ -134,6 +139,10 @@
self.domain.clone()
}

fn conn_id(&self) -> u64 {
self.conn_id
}

Check warning on line 144 in crates/relayer/src/agent_listener/quic.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/quic.rs#L142-L144

Added lines #L142 - L144 were not covered by tests

async fn create_sub_connection(&mut self) -> Result<AgentQuicSubConnection, Box<dyn Error>> {
let (send, recv) = self.conn.open_bi().await?;
Ok(AgentQuicSubConnection { send, recv })
Expand Down
8 changes: 7 additions & 1 deletion crates/relayer/src/agent_listener/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use protocol::key::ClusterValidator;
use serde::de::DeserializeOwned;

use crate::METRICS_AGENT_HISTOGRAM;
use crate::{utils::now_ms, METRICS_AGENT_HISTOGRAM};

use super::{AgentConnection, AgentListener, AgentSubConnection};

Expand Down Expand Up @@ -53,6 +53,7 @@
stream.write_all(&res).await?;
Ok(AgentTcpConnection {
domain,
conn_id: now_ms(),

Check warning on line 56 in crates/relayer/src/agent_listener/tcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/tcp.rs#L56

Added line #L56 was not covered by tests
connection: yamux::Connection::new(
stream,
Default::default(),
Expand Down Expand Up @@ -111,6 +112,7 @@

pub struct AgentTcpConnection {
domain: String,
conn_id: u64,
connection: yamux::Connection<TcpStream>,
}

Expand All @@ -122,6 +124,10 @@
self.domain.clone()
}

fn conn_id(&self) -> u64 {
self.conn_id
}

Check warning on line 129 in crates/relayer/src/agent_listener/tcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_listener/tcp.rs#L127-L129

Added lines #L127 - L129 were not covered by tests

async fn create_sub_connection(&mut self) -> Result<AgentTcpSubConnection, Box<dyn Error>> {
let client = OpenStreamsClient {
connection: &mut self.connection,
Expand Down
30 changes: 21 additions & 9 deletions crates/relayer/src/agent_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,44 @@

use crate::ProxyTunnel;

pub struct AgentEntry {
pub session: u64,
pub tx: Sender<Box<dyn ProxyTunnel>>,
}

#[derive(Clone, Default)]
pub struct AgentStore {
#[allow(clippy::type_complexity)]
agents: Arc<RwLock<HashMap<u64, Sender<Box<dyn ProxyTunnel>>>>>,
agents: Arc<RwLock<HashMap<u64, AgentEntry>>>,
}

impl AgentStore {
pub fn add(&self, id: u64, tx: Sender<Box<dyn ProxyTunnel>>) {
pub fn add(&self, id: u64, session: u64, tx: Sender<Box<dyn ProxyTunnel>>) {

Check warning on line 22 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L22

Added line #L22 was not covered by tests
self.agents
.write()
.expect("Should write agents")
.insert(id, tx);
.insert(id, AgentEntry { tx, session });

Check warning on line 26 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L26

Added line #L26 was not covered by tests
}

pub fn get(&self, id: u64) -> Option<Sender<Box<dyn ProxyTunnel>>> {
self.agents
.read()
.expect("Should write agents")
.get(&id)
.cloned()
.map(|entry| entry.tx.clone())

Check warning on line 34 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L34

Added line #L34 was not covered by tests
}

pub fn remove(&self, id: u64) {
self.agents
.write()
.expect("Should write agents")
.remove(&id);
pub fn remove(&self, id: u64, session: u64) -> bool {
let mut storage = self.agents.write().expect("Should write agents");

let current = storage.get(&id);
if let Some(entry) = current {
if entry.session == session {
storage.remove(&id);
return true;
}
}

Check warning on line 46 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L37-L46

Added lines #L37 - L46 were not covered by tests

false

Check warning on line 48 in crates/relayer/src/agent_store.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/agent_store.rs#L48

Added line #L48 was not covered by tests
}
}
18 changes: 13 additions & 5 deletions crates/relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@
counter!(METRICS_AGENT_COUNT).increment(1);
log::info!("agent_connection.domain(): {}", agent_connection.domain());
let domain = agent_connection.domain().to_string();
let conn_id = agent_connection.conn_id();

Check warning on line 82 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L82

Added line #L82 was not covered by tests
let (mut agent_worker, proxy_tunnel_tx) =
agent_worker::AgentWorker::<AG, S, R, W>::new(agent_connection, agent_rpc_handler);
let home_id = home_id_from_domain(&domain);
agents.add(home_id, proxy_tunnel_tx);
log::info!("added home_id: {}, conn_id: {}", home_id, conn_id);
agents.add(home_id, conn_id, proxy_tunnel_tx);

Check warning on line 87 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L86-L87

Added lines #L86 - L87 were not covered by tests
node_alias_sdk.register_alias(home_id).await;
let agents = agents.clone();
gauge!(METRICS_AGENT_LIVE).increment(1.0);
Expand All @@ -96,10 +98,16 @@
}
}
}
agents.remove(home_id);
node_alias_sdk
.unregister_alias(home_id_from_domain(&domain))
.await;
let removed = agents.remove(home_id, conn_id);
log::info!(
"remove home_id: {}, conn_id: {}, removed: {}",

Check warning on line 103 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L101-L103

Added lines #L101 - L103 were not covered by tests
home_id,
conn_id,
removed
);
if removed {
node_alias_sdk.unregister_alias(home_id).await;
}

Check warning on line 110 in crates/relayer/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/lib.rs#L108-L110

Added lines #L108 - L110 were not covered by tests
log::info!("agent_worker exit for domain: {}", domain);
gauge!(METRICS_AGENT_LIVE).decrement(1.0);
}
9 changes: 9 additions & 0 deletions crates/relayer/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
time::{SystemTime, UNIX_EPOCH},
};

/// get home id from domain by get first subdomain
Expand All @@ -10,3 +11,11 @@
parts.next().unwrap_or(domain).hash(&mut hasher);
hasher.finish()
}

pub fn now_ms() -> u64 {
let start = SystemTime::now();
start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64
}

Check warning on line 21 in crates/relayer/src/utils.rs

View check run for this annotation

Codecov / codecov/patch

crates/relayer/src/utils.rs#L15-L21

Added lines #L15 - L21 were not covered by tests
Loading