Skip to content

Commit

Permalink
feat(handshake): emit service requests/payloads metric
Browse files Browse the repository at this point in the history
  • Loading branch information
snormore committed May 30, 2024
1 parent 5bee75f commit adb3554
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
14 changes: 11 additions & 3 deletions core/handshake/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,17 @@ impl<P: ExecutorProviderInterface> Context<P> {
},
);

Proxy::new(connection_id, socket, rx, self.clone(), self.timeout).spawn(Some(
State::OnlyPrimaryConnection((sender, receiver).into()),
));
Proxy::new(
connection_id,
service,
socket,
rx,
self.clone(),
self.timeout,
)
.spawn(Some(State::OnlyPrimaryConnection(
(sender, receiver).into(),
)));
},
// Join request to an existing connection
HandshakeRequestFrame::JoinRequest { access_token } => {
Expand Down
11 changes: 11 additions & 0 deletions core/handshake/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_channel::Receiver;
use bytes::BytesMut;
use lightning_interfaces::schema::handshake::{ResponseFrame, TerminationReason};
use lightning_interfaces::{spawn, ExecutorProviderInterface};
use lightning_metrics::increment_counter;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;

Expand All @@ -17,6 +18,8 @@ pub struct Proxy<P: ExecutorProviderInterface> {
context: Context<P>,
/// The id for this connection.
connection_id: u64,
/// The id for the service this connection is connected to.
service_id: u32,
/// The unix socket connection to the service made specifically for this ongoing connection.
socket: UnixStream,
/// The buffer using which we read bytes from the unix socket.
Expand Down Expand Up @@ -63,6 +66,7 @@ impl<P: ExecutorProviderInterface> Proxy<P> {
#[inline(always)]
pub fn new(
connection_id: u64,
service_id: u32,
socket: UnixStream,
connection_rx: Receiver<(IsPrimary, TransportPair)>,
context: Context<P>,
Expand All @@ -71,6 +75,7 @@ impl<P: ExecutorProviderInterface> Proxy<P> {
Self {
context,
connection_id,
service_id,
socket,
buffer: Default::default(),
connection_rx,
Expand Down Expand Up @@ -445,6 +450,12 @@ impl<P: ExecutorProviderInterface> Proxy<P> {
) -> HandleRequestResult {
match request {
RequestFrame::ServicePayload { bytes } => {
let service_id = self.service_id.to_string();
increment_counter!(
"handshake_service_payloads",
Some("Counter for the number of service payloads received"),
"service_id" => service_id.as_str()
);
if self.socket.write_u32(bytes.len() as u32).await.is_err() {
return HandleRequestResult::TerminateConnection;
}
Expand Down

0 comments on commit adb3554

Please sign in to comment.