Skip to content

Commit

Permalink
[LLT-5960] Add spans to Derp
Browse files Browse the repository at this point in the history
  • Loading branch information
stalowyjez committed Jan 28, 2025
1 parent 80a1f1e commit dcbbef5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 59 deletions.
Empty file.
120 changes: 62 additions & 58 deletions crates/telio-relay/src/derp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use telio_utils::{
};
use tokio::sync::mpsc::OwnedPermit;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{instrument, Instrument};

use crypto_box::aead::{AeadCore, Error, Nonce, Payload};
use telio_crypto::chachabox::ChaChaBox;
Expand Down Expand Up @@ -192,15 +193,15 @@ impl State {
// Stop attempts to connect
if let Some(c) = self.connecting.take() {
c.abort();
let _ = c.await;
let _ = c.instrument(tracing::Span::current()).await;
}
// Stop current connection
if let Some(c) = self.conn.take() {
c.stop();
}
// kill server
if let Some(mut server) = self.server.take() {
telio_log_debug!("({}) Disconnected from DERP server!", Self::NAME);
telio_log_debug!("Disconnected from DERP server!");
server.conn_state = RelayState::Disconnected;
let _ = self.event.send(Box::new(server));
}
Expand All @@ -220,18 +221,13 @@ impl State {
let mut server = match config.servers.get_next() {
Some(server) => {
telio_log_debug!(
"({}) Trying to connect to DERP server: {}",
Self::NAME,
"Trying to connect to DERP server: {}",
&server.get_address()
);
server
}
None => {
telio_log_info!(
"({}) Server list exhausted sleeping for: {}",
Self::NAME,
sleep_time
);
telio_log_info!("Server list exhausted sleeping for: {}", sleep_time);
config.servers.reset_server_index();
sleep(Duration::from_secs_f64(sleep_time)).await;
sleep_time = (sleep_time * 2f64).min(60f64);
Expand Down Expand Up @@ -260,7 +256,7 @@ impl State {
.await
{
Ok(conn) => {
telio_log_info!("({}) Connected to {}", Self::NAME, server.get_address());
telio_log_info!("Connected to {}", server.get_address());
server.conn_state = RelayState::Connected;
if let Some(aggregator) = aggregator.as_ref() {
aggregator
Expand All @@ -275,8 +271,7 @@ impl State {
Err(err) => {
if config.servers.current_server_num == config.servers.servers.len() {
telio_log_warn!(
"({}) Failed to connect to any of {} servers: {}",
Self::NAME,
"Failed to connect to any of {} servers: {}",
config.servers.servers.len(),
err
);
Expand All @@ -286,13 +281,18 @@ impl State {
}
}
}
};
}
.instrument(tracing::Span::current());
tokio::spawn(connection)
}
}

impl DerpRelay {
/// Relay's constructor
#[instrument(
name = "DerpRelay::start_with",
skip(channel, socket_pool, event, aggregator)
)]
pub fn start_with(
channel: Chan<(PublicKey, PacketRelayed)>,
socket_pool: Arc<SocketPool>,
Expand Down Expand Up @@ -321,6 +321,7 @@ impl DerpRelay {
}

/// Change DERP config
#[instrument(name = "DerpRelay::configure", skip(self))]
pub async fn configure(&self, config: Option<Config>) {
let _ = task_exec!(&self.task, async move |s| {
if s.config == config {
Expand All @@ -339,68 +340,80 @@ impl DerpRelay {
// Current server not found in new config or server no config
if !config.servers.contains(server) {
telio_log_info!("Currently active server is no longer available in config - reconnecting.");
s.disconnect().await;
s.disconnect().instrument(tracing::Span::current()).await;
}
}
None => {
// Disconnect and start from the top of the list
telio_log_info!("No active relay server. Reconnecting.");
s.disconnect().await;
s.disconnect().instrument(tracing::Span::current()).await;
}
}
}

Ok(())
})
.instrument(tracing::Span::current())
.await;
}

/// Get current Derp configuration
#[instrument(name = "DerpRelay::get_config", skip(self))]
pub async fn get_config(&self) -> Option<Config> {
task_exec!(&self.task, async move |s| Ok(s.config.as_ref().cloned()))
.instrument(tracing::Span::current())
.await
.ok()
.flatten()
}

/// Get current connection state (connected/disconnected)
#[instrument(name = "DerpRelay::get_conn_state", skip(self))]
pub async fn get_conn_state(&self) -> bool {
task_exec!(&self.task, async move |s| Ok(s.conn.is_some()))
.instrument(tracing::Span::current())
.await
.ok()
.unwrap_or(false)
}

/// Get server we are currently connected to.
/// Returns None if we are not connected
#[instrument(name = "DerpRelay::get_connected_server", skip(self))]
pub async fn get_connected_server(&self) -> Option<Server> {
task_exec!(&self.task, async move |s| Ok(s.server.clone()))
.instrument(tracing::Span::current())
.await
.ok()
.unwrap_or(None)
}

/// Get newest information about remote peer states
#[instrument(name = "DerpRelay::get_remote_peer_states", skip(self))]
pub async fn get_remote_peer_states(&self) -> PeersStatesMap {
task_exec!(&self.task, async move |s| Ok(s.remote_peers_states.clone()))
.instrument(tracing::Span::current())
.await
.ok()
.unwrap_or_default()
}

/// Try reconnect
#[instrument(name = "DerpRelay::reconnect", skip(self))]
pub async fn reconnect(&self) {
let _ = task_exec!(&self.task, async move |s| {
telio_log_info!("Explicit relay reconnect requested");
s.disconnect().await;
s.disconnect().instrument(tracing::Span::current()).await;
Ok(())
})
.instrument(tracing::Span::current())
.await;
}

/// Stop relay
#[instrument(name = "DerpRelay::stop", skip(self))]
pub async fn stop(self) {
let _ = self.task.stop().await;
let _ = self.task.stop().instrument(tracing::Span::current()).await;
}

// Routines related with encryption exclusively of control
Expand Down Expand Up @@ -551,8 +564,7 @@ impl State {
) {
// TODO add custom task's log format macro
telio_log_trace!(
"({}) Tx --> DERP, pubkey: {:?}, packet type: {:?}",
Self::NAME,
"Tx --> DERP, pubkey: {:?}, packet type: {:?}",
pk,
msg.packet_type()
);
Expand All @@ -563,28 +575,24 @@ impl State {
let _ = permit.send((pk, cipher_text));
}
Err(error) => {
telio_log_debug!("({}) Encryption failed: {}", Self::NAME, error);
telio_log_debug!("Encryption failed: {}", error);
}
},
Err(e) => {
telio_log_debug!("({}) Failed to encode packet: {}", Self::NAME, e);
telio_log_debug!("Failed to encode packet: {}", e);
}
}
}

/// handle traffic for |LocalNode -> Derp|
async fn handle_outcoming_payload_direct(permit: OwnedPermit<Vec<u8>>, msg: PacketControl) {
telio_log_trace!(
"({}) Tx --> DERP, packet type: {:?}",
Self::NAME,
msg.packet_type()
);
telio_log_trace!("Tx --> DERP, packet type: {:?}", msg.packet_type());
match msg.encode() {
Ok(buf) => {
let _ = permit.send(buf.to_vec());
}
Err(e) => {
telio_log_debug!("({}) Failed to encode packet: {}", Self::NAME, e);
telio_log_debug!("Failed to encode packet: {}", e);
}
}
}
Expand All @@ -601,32 +609,23 @@ impl State {
Ok(plain_text) => match PacketRelayed::decode(&plain_text) {
Ok(msg) => {
telio_log_trace!(
"({}) DERP --> Rx, pubkey: {:?}, len: {}, packet type: {:?}",
Self::NAME,
"DERP --> Rx, pubkey: {:?}, len: {}, packet type: {:?}",
pk,
buf.len(),
msg.packet_type()
);
permit.send((pk, msg));
}
Err(e) => {
telio_log_debug!(
"({}) DERP --> Rx, failed to parse packet: ({})",
Self::NAME,
e
);
telio_log_debug!("DERP --> Rx, failed to parse packet: ({})", e);
}
},
Err(error) => {
telio_log_debug!("Decryption failed: {}", error);
}
}
} else {
telio_log_debug!(
"({}) DERP --> Rx, received a packet with unknown pubkey: {}",
Self::NAME,
pk
);
telio_log_debug!("DERP --> Rx, received a packet with unknown pubkey: {}", pk);
}
}

Expand All @@ -650,17 +649,15 @@ impl State {
}
_ => {
telio_log_debug!(
"({}) DERP --> Rx, unexpected packet type: {:?}",
Self::NAME,
"DERP --> Rx, unexpected packet type: {:?}",
msg.packet_type()
);
None
}
},
Err(e) => {
telio_log_debug!(
"({}) DERP --> Rx, failed to decode derp poll response packet: ({}), bytes: {:?}",
Self::NAME,
"DERP --> Rx, failed to decode derp poll response packet: ({}), bytes: {:?}",
e,
buf
);
Expand All @@ -676,6 +673,7 @@ impl Runtime for State {

type Err = ();

#[instrument(name = "DerpRelay::wait_with_update", skip(self, update))]
async fn wait_with_update<F>(&mut self, update: F) -> Result<(), Self::Err>
where
F: Future<Output = BoxAction<Self, Result<(), Self::Err>>> + Send,
Expand All @@ -685,8 +683,10 @@ impl Runtime for State {
Some(c) => c,
None => {
telio_log_info!("Disconnecting from DERP server due to empty config");
self.disconnect().await;
return (update.await)(self).await;
self.disconnect().instrument(tracing::Span::current()).await;
return (update.instrument(tracing::Span::current()).await)(self)
.instrument(tracing::Span::current())
.await;
}
};

Expand All @@ -696,7 +696,9 @@ impl Runtime for State {
Some(chan) => (chan.rx.recv(), &chan.tx),
None => {
// Similar case to config, but the log is already printed
return (update.await)(self).await;
return (update.instrument(tracing::Span::current()).await)(self)
.instrument(tracing::Span::current())
.await;
}
};

Expand All @@ -719,7 +721,7 @@ impl Runtime for State {
Ok(Err(err)) => err.into(),
_ => RelayConnectionChangeReason::ClientError,
};
self.disconnect().await;
self.disconnect().instrument(tracing::Span::current()).await;
},
// Received payload from upper relay, forward it to DERP stream
res = wait_for_tx(&c.comms_relayed.tx, upper_read) => match res {
Expand All @@ -730,13 +732,13 @@ impl Runtime for State {
telio_log_debug!("Disconnecting from DERP server due to closed rx channel");
self.channel = None;
self.last_disconnection_reason = RelayConnectionChangeReason::ClientError;
self.disconnect().await;
self.disconnect().instrument(tracing::Span::current()).await;
}
// If forwarding fails, disconnect
None => {
telio_log_info!("Disconnecting from DERP server");
self.last_disconnection_reason = RelayConnectionChangeReason::ClientError;
self.disconnect().await;
self.disconnect().instrument(tracing::Span::current()).await;
}
},
// On tick send derp poll request to derp stream
Expand All @@ -746,19 +748,19 @@ impl Runtime for State {
telio_log_debug!("Sending DerpPollRequest with session {}", self.derp_poll_session);
Self::handle_outcoming_payload_direct(permit, PacketControl::DerpPollRequest(DerpPollRequestMsg::new(
self.derp_poll_session, &config.meshnet_peers
))).await;
))).instrument(tracing::Span::current()).await;
}
}
// Received payload from DERP stream, forward it to upper relay
Some((permit, Some((pk, buf)))) = wait_for_tx(chan_tx, derp_relayed_read) => {
Self::handle_incoming_payload_relayed(permit, pk, buf, config).await;
Self::handle_incoming_payload_relayed(permit, pk, buf, config).instrument(tracing::Span::current()).await;
},
Some((_, Some(buf))) = wait_for_tx(chan_tx, derp_direct_read) => {
self.remote_peers_states = Self::handle_incoming_payload_direct(self.derp_poll_session, buf).await.unwrap_or_default();
self.remote_peers_states = Self::handle_incoming_payload_direct(self.derp_poll_session, buf).instrument(tracing::Span::current()).await.unwrap_or_default();
telio_log_debug!("Remote peers statuses: {:?}", self.remote_peers_states);
}

update = update => return update(self).await,
update = update => return update(self).instrument(tracing::Span::current()).await,

else => (),
}
Expand All @@ -767,7 +769,9 @@ impl Runtime for State {
}
None => {
if self.channel.is_none() {
return (update.await)(self).await;
return (update.instrument(tracing::Span::current()).await)(self)
.instrument(tracing::Span::current())
.await;
}

let connecting = if let Some(connecting) = &mut self.connecting {
Expand All @@ -786,16 +790,16 @@ impl Runtime for State {
self.server = Some(server.clone());
self.conn = Some(conn);
if let Err(err) = self.event.send(Box::new(server.clone())) {
telio_log_warn!("({}) sending new server info failed {}", Self::NAME, err)
telio_log_warn!("sending new server info failed {}", err)
}
}
Err(err) => {
self.last_disconnection_reason = RelayConnectionChangeReason::ClientError;
telio_log_warn!("({}) connecting task failed {}", Self::NAME, err)
telio_log_warn!("connecting task failed {}", err)
}
}
}
update = update => update(self).await?,
update = update => update(self).instrument(tracing::Span::current()).await?,
}
Ok(())
}
Expand All @@ -805,7 +809,7 @@ impl Runtime for State {
async fn stop(mut self) {
// Abort the connection tasks
telio_log_info!("Stopping relay");
self.disconnect().await;
self.disconnect().instrument(tracing::Span::current()).await;
}
}

Expand Down
Loading

0 comments on commit dcbbef5

Please sign in to comment.