diff --git a/.unreleased/LLT-5960-add_spans_to_derp b/.unreleased/LLT-5960-add_spans_to_derp new file mode 100644 index 000000000..e69de29bb diff --git a/crates/telio-relay/src/derp.rs b/crates/telio-relay/src/derp.rs index 860b47537..8ac2c676b 100644 --- a/crates/telio-relay/src/derp.rs +++ b/crates/telio-relay/src/derp.rs @@ -38,6 +38,7 @@ use telio_utils::{ }; use tokio::sync::mpsc::OwnedPermit; use tokio::{task::JoinHandle, time::sleep}; +use tracing::{instrument, Instrument}; use crypto_box::aead::{AeadCore, Error, Nonce, Payload}; use telio_crypto::chachabox::ChaChaBox; @@ -192,7 +193,7 @@ impl State { // Stop attempts to connect if let Some(c) = self.connecting.take() { c.abort(); - let _ = c.await; + let _ = c.instrument(tracing::Span::current()).await; } // Stop current connection if let Some(c) = self.conn.take() { @@ -200,7 +201,7 @@ impl State { } // kill server if let Some(mut server) = self.server.take() { - telio_log_debug!("({}) Disconnected from DERP server!", Self::NAME); + telio_log_debug!("Disconnected from DERP server!"); server.conn_state = RelayState::Disconnected; let _ = self.event.send(Box::new(server)); } @@ -220,18 +221,13 @@ impl State { let mut server = match config.servers.get_next() { Some(server) => { telio_log_debug!( - "({}) Trying to connect to DERP server: {}", - Self::NAME, + "Trying to connect to DERP server: {}", &server.get_address() ); server } None => { - telio_log_info!( - "({}) Server list exhausted sleeping for: {}", - Self::NAME, - sleep_time - ); + telio_log_info!("Server list exhausted sleeping for: {}", sleep_time); config.servers.reset_server_index(); sleep(Duration::from_secs_f64(sleep_time)).await; sleep_time = (sleep_time * 2f64).min(60f64); @@ -260,7 +256,7 @@ impl State { .await { Ok(conn) => { - telio_log_info!("({}) Connected to {}", Self::NAME, server.get_address()); + telio_log_info!("Connected to {}", server.get_address()); server.conn_state = RelayState::Connected; if let Some(aggregator) = aggregator.as_ref() { aggregator @@ -275,8 +271,7 @@ impl State { Err(err) => { if config.servers.current_server_num == config.servers.servers.len() { telio_log_warn!( - "({}) Failed to connect to any of {} servers: {}", - Self::NAME, + "Failed to connect to any of {} servers: {}", config.servers.servers.len(), err ); @@ -286,13 +281,18 @@ impl State { } } } - }; + } + .instrument(tracing::Span::current()); tokio::spawn(connection) } } impl DerpRelay { /// Relay's constructor + #[instrument( + name = "DerpRelay::start_with", + skip(channel, socket_pool, event, aggregator) + )] pub fn start_with( channel: Chan<(PublicKey, PacketRelayed)>, socket_pool: Arc, @@ -321,6 +321,7 @@ impl DerpRelay { } /// Change DERP config + #[instrument(name = "DerpRelay::configure", skip(self))] pub async fn configure(&self, config: Option) { let _ = task_exec!(&self.task, async move |s| { if s.config == config { @@ -339,33 +340,38 @@ impl DerpRelay { // Current server not found in new config or server no config if !config.servers.contains(server) { telio_log_info!("Currently active server is no longer available in config - reconnecting."); - s.disconnect().await; + s.disconnect().instrument(tracing::Span::current()).await; } } None => { // Disconnect and start from the top of the list telio_log_info!("No active relay server. Reconnecting."); - s.disconnect().await; + s.disconnect().instrument(tracing::Span::current()).await; } } } Ok(()) }) + .instrument(tracing::Span::current()) .await; } /// Get current Derp configuration + #[instrument(name = "DerpRelay::get_config", skip(self))] pub async fn get_config(&self) -> Option { task_exec!(&self.task, async move |s| Ok(s.config.as_ref().cloned())) + .instrument(tracing::Span::current()) .await .ok() .flatten() } /// Get current connection state (connected/disconnected) + #[instrument(name = "DerpRelay::get_conn_state", skip(self))] pub async fn get_conn_state(&self) -> bool { task_exec!(&self.task, async move |s| Ok(s.conn.is_some())) + .instrument(tracing::Span::current()) .await .ok() .unwrap_or(false) @@ -373,34 +379,41 @@ impl DerpRelay { /// Get server we are currently connected to. /// Returns None if we are not connected + #[instrument(name = "DerpRelay::get_connected_server", skip(self))] pub async fn get_connected_server(&self) -> Option { task_exec!(&self.task, async move |s| Ok(s.server.clone())) + .instrument(tracing::Span::current()) .await .ok() .unwrap_or(None) } /// Get newest information about remote peer states + #[instrument(name = "DerpRelay::get_remote_peer_states", skip(self))] pub async fn get_remote_peer_states(&self) -> PeersStatesMap { task_exec!(&self.task, async move |s| Ok(s.remote_peers_states.clone())) + .instrument(tracing::Span::current()) .await .ok() .unwrap_or_default() } /// Try reconnect + #[instrument(name = "DerpRelay::reconnect", skip(self))] pub async fn reconnect(&self) { let _ = task_exec!(&self.task, async move |s| { telio_log_info!("Explicit relay reconnect requested"); - s.disconnect().await; + s.disconnect().instrument(tracing::Span::current()).await; Ok(()) }) + .instrument(tracing::Span::current()) .await; } /// Stop relay + #[instrument(name = "DerpRelay::stop", skip(self))] pub async fn stop(self) { - let _ = self.task.stop().await; + let _ = self.task.stop().instrument(tracing::Span::current()).await; } // Routines related with encryption exclusively of control @@ -551,8 +564,7 @@ impl State { ) { // TODO add custom task's log format macro telio_log_trace!( - "({}) Tx --> DERP, pubkey: {:?}, packet type: {:?}", - Self::NAME, + "Tx --> DERP, pubkey: {:?}, packet type: {:?}", pk, msg.packet_type() ); @@ -563,28 +575,24 @@ impl State { let _ = permit.send((pk, cipher_text)); } Err(error) => { - telio_log_debug!("({}) Encryption failed: {}", Self::NAME, error); + telio_log_debug!("Encryption failed: {}", error); } }, Err(e) => { - telio_log_debug!("({}) Failed to encode packet: {}", Self::NAME, e); + telio_log_debug!("Failed to encode packet: {}", e); } } } /// handle traffic for |LocalNode -> Derp| async fn handle_outcoming_payload_direct(permit: OwnedPermit>, msg: PacketControl) { - telio_log_trace!( - "({}) Tx --> DERP, packet type: {:?}", - Self::NAME, - msg.packet_type() - ); + telio_log_trace!("Tx --> DERP, packet type: {:?}", msg.packet_type()); match msg.encode() { Ok(buf) => { let _ = permit.send(buf.to_vec()); } Err(e) => { - telio_log_debug!("({}) Failed to encode packet: {}", Self::NAME, e); + telio_log_debug!("Failed to encode packet: {}", e); } } } @@ -601,8 +609,7 @@ impl State { Ok(plain_text) => match PacketRelayed::decode(&plain_text) { Ok(msg) => { telio_log_trace!( - "({}) DERP --> Rx, pubkey: {:?}, len: {}, packet type: {:?}", - Self::NAME, + "DERP --> Rx, pubkey: {:?}, len: {}, packet type: {:?}", pk, buf.len(), msg.packet_type() @@ -610,11 +617,7 @@ impl State { permit.send((pk, msg)); } Err(e) => { - telio_log_debug!( - "({}) DERP --> Rx, failed to parse packet: ({})", - Self::NAME, - e - ); + telio_log_debug!("DERP --> Rx, failed to parse packet: ({})", e); } }, Err(error) => { @@ -622,11 +625,7 @@ impl State { } } } else { - telio_log_debug!( - "({}) DERP --> Rx, received a packet with unknown pubkey: {}", - Self::NAME, - pk - ); + telio_log_debug!("DERP --> Rx, received a packet with unknown pubkey: {}", pk); } } @@ -650,8 +649,7 @@ impl State { } _ => { telio_log_debug!( - "({}) DERP --> Rx, unexpected packet type: {:?}", - Self::NAME, + "DERP --> Rx, unexpected packet type: {:?}", msg.packet_type() ); None @@ -659,8 +657,7 @@ impl State { }, Err(e) => { telio_log_debug!( - "({}) DERP --> Rx, failed to decode derp poll response packet: ({}), bytes: {:?}", - Self::NAME, + "DERP --> Rx, failed to decode derp poll response packet: ({}), bytes: {:?}", e, buf ); @@ -676,6 +673,7 @@ impl Runtime for State { type Err = (); + #[instrument(name = "DerpRelay::wait_with_update", skip(self, update))] async fn wait_with_update(&mut self, update: F) -> Result<(), Self::Err> where F: Future>> + Send, @@ -685,8 +683,10 @@ impl Runtime for State { Some(c) => c, None => { telio_log_info!("Disconnecting from DERP server due to empty config"); - self.disconnect().await; - return (update.await)(self).await; + self.disconnect().instrument(tracing::Span::current()).await; + return (update.instrument(tracing::Span::current()).await)(self) + .instrument(tracing::Span::current()) + .await; } }; @@ -696,7 +696,9 @@ impl Runtime for State { Some(chan) => (chan.rx.recv(), &chan.tx), None => { // Similar case to config, but the log is already printed - return (update.await)(self).await; + return (update.instrument(tracing::Span::current()).await)(self) + .instrument(tracing::Span::current()) + .await; } }; @@ -719,7 +721,7 @@ impl Runtime for State { Ok(Err(err)) => err.into(), _ => RelayConnectionChangeReason::ClientError, }; - self.disconnect().await; + self.disconnect().instrument(tracing::Span::current()).await; }, // Received payload from upper relay, forward it to DERP stream res = wait_for_tx(&c.comms_relayed.tx, upper_read) => match res { @@ -730,13 +732,13 @@ impl Runtime for State { telio_log_debug!("Disconnecting from DERP server due to closed rx channel"); self.channel = None; self.last_disconnection_reason = RelayConnectionChangeReason::ClientError; - self.disconnect().await; + self.disconnect().instrument(tracing::Span::current()).await; } // If forwarding fails, disconnect None => { telio_log_info!("Disconnecting from DERP server"); self.last_disconnection_reason = RelayConnectionChangeReason::ClientError; - self.disconnect().await; + self.disconnect().instrument(tracing::Span::current()).await; } }, // On tick send derp poll request to derp stream @@ -746,19 +748,19 @@ impl Runtime for State { telio_log_debug!("Sending DerpPollRequest with session {}", self.derp_poll_session); Self::handle_outcoming_payload_direct(permit, PacketControl::DerpPollRequest(DerpPollRequestMsg::new( self.derp_poll_session, &config.meshnet_peers - ))).await; + ))).instrument(tracing::Span::current()).await; } } // Received payload from DERP stream, forward it to upper relay Some((permit, Some((pk, buf)))) = wait_for_tx(chan_tx, derp_relayed_read) => { - Self::handle_incoming_payload_relayed(permit, pk, buf, config).await; + Self::handle_incoming_payload_relayed(permit, pk, buf, config).instrument(tracing::Span::current()).await; }, Some((_, Some(buf))) = wait_for_tx(chan_tx, derp_direct_read) => { - self.remote_peers_states = Self::handle_incoming_payload_direct(self.derp_poll_session, buf).await.unwrap_or_default(); + self.remote_peers_states = Self::handle_incoming_payload_direct(self.derp_poll_session, buf).instrument(tracing::Span::current()).await.unwrap_or_default(); telio_log_debug!("Remote peers statuses: {:?}", self.remote_peers_states); } - update = update => return update(self).await, + update = update => return update(self).instrument(tracing::Span::current()).await, else => (), } @@ -767,7 +769,9 @@ impl Runtime for State { } None => { if self.channel.is_none() { - return (update.await)(self).await; + return (update.instrument(tracing::Span::current()).await)(self) + .instrument(tracing::Span::current()) + .await; } let connecting = if let Some(connecting) = &mut self.connecting { @@ -786,16 +790,16 @@ impl Runtime for State { self.server = Some(server.clone()); self.conn = Some(conn); if let Err(err) = self.event.send(Box::new(server.clone())) { - telio_log_warn!("({}) sending new server info failed {}", Self::NAME, err) + telio_log_warn!("sending new server info failed {}", err) } } Err(err) => { self.last_disconnection_reason = RelayConnectionChangeReason::ClientError; - telio_log_warn!("({}) connecting task failed {}", Self::NAME, err) + telio_log_warn!("connecting task failed {}", err) } } } - update = update => update(self).await?, + update = update => update(self).instrument(tracing::Span::current()).await?, } Ok(()) } @@ -805,7 +809,7 @@ impl Runtime for State { async fn stop(mut self) { // Abort the connection tasks telio_log_info!("Stopping relay"); - self.disconnect().await; + self.disconnect().instrument(tracing::Span::current()).await; } } diff --git a/src/ffi/logging.rs b/src/ffi/logging.rs index 53fe1499a..720a38a9d 100644 --- a/src/ffi/logging.rs +++ b/src/ffi/logging.rs @@ -95,6 +95,7 @@ pub fn build_subscriber( .with(LevelFilter::from_level(log_level.into())) .with( fmt::layer() + .with_span_events(fmt::format::FmtSpan::CLOSE) .event_format(TelioEventFmt) .with_ansi(false) .with_writer(FfiCallback::new(log_sender)), @@ -116,11 +117,20 @@ where ) -> std::fmt::Result { let tid = std::thread::current().id(); let meta = event.metadata(); + let span_name = if meta.is_span() { + meta.name() + } else { + match ctx.current_span().into_inner() { + Some((_, metadata)) => metadata.name(), + None => "", + } + }; write!( writer, - "{tid:?} {:?}:{} ", + "{tid:?} {:?}:{} ({}) ", meta.module_path().unwrap_or(""), meta.line().unwrap_or(0), + span_name, )?; ctx.format_fields(writer.by_ref(), event)?;