diff --git a/core/handshake/src/handshake.rs b/core/handshake/src/handshake.rs index 031a84599..745304b53 100644 --- a/core/handshake/src/handshake.rs +++ b/core/handshake/src/handshake.rs @@ -224,9 +224,17 @@ impl Context

{ }, ); - Proxy::new(connection_id, socket, rx, self.clone(), self.timeout).spawn(Some( - State::OnlyPrimaryConnection((sender, receiver).into()), - )); + Proxy::new( + connection_id, + service, + socket, + rx, + self.clone(), + self.timeout, + ) + .spawn(Some(State::OnlyPrimaryConnection( + (sender, receiver).into(), + ))); }, // Join request to an existing connection HandshakeRequestFrame::JoinRequest { access_token } => { diff --git a/core/handshake/src/proxy.rs b/core/handshake/src/proxy.rs index 71da6940d..5f4121a4e 100644 --- a/core/handshake/src/proxy.rs +++ b/core/handshake/src/proxy.rs @@ -6,6 +6,7 @@ use async_channel::Receiver; use bytes::BytesMut; use lightning_interfaces::schema::handshake::{ResponseFrame, TerminationReason}; use lightning_interfaces::{spawn, ExecutorProviderInterface}; +use lightning_metrics::increment_counter; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::UnixStream; @@ -17,6 +18,8 @@ pub struct Proxy { context: Context

, /// The id for this connection. connection_id: u64, + /// The id for the service this connection is connected to. + service_id: u32, /// The unix socket connection to the service made specifically for this ongoing connection. socket: UnixStream, /// The buffer using which we read bytes from the unix socket. @@ -63,6 +66,7 @@ impl Proxy

{ #[inline(always)] pub fn new( connection_id: u64, + service_id: u32, socket: UnixStream, connection_rx: Receiver<(IsPrimary, TransportPair)>, context: Context

, @@ -71,6 +75,7 @@ impl Proxy

{ Self { context, connection_id, + service_id, socket, buffer: Default::default(), connection_rx, @@ -445,6 +450,12 @@ impl Proxy

{ ) -> HandleRequestResult { match request { RequestFrame::ServicePayload { bytes } => { + let service_id = self.service_id.to_string(); + increment_counter!( + "handshake_service_payloads", + Some("Counter for the number of service payloads received"), + "service_id" => service_id.as_str() + ); if self.socket.write_u32(bytes.len() as u32).await.is_err() { return HandleRequestResult::TerminateConnection; }