Skip to content

Commit

Permalink
Initial work on OpenTelemetry (#2892)
Browse files Browse the repository at this point in the history
  • Loading branch information
eifrah-aws authored Jan 7, 2025
1 parent b6565eb commit 578e6ec
Show file tree
Hide file tree
Showing 6 changed files with 592 additions and 2 deletions.
21 changes: 21 additions & 0 deletions glide-core/redis-rs/redis/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{fmt, io};
use crate::connection::ConnectionLike;
use crate::pipeline::Pipeline;
use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
use telemetrylib::GlideSpan;

/// An argument to a redis command
#[derive(Clone)]
Expand All @@ -30,6 +31,8 @@ pub struct Cmd {
cursor: Option<u64>,
// If it's true command's response won't be read from socket. Useful for Pub/Sub.
no_response: bool,
/// The span associated with this command
span: Option<GlideSpan>,
}

/// Represents a redis iterator.
Expand Down Expand Up @@ -321,6 +324,7 @@ impl Cmd {
args: vec![],
cursor: None,
no_response: false,
span: None,
}
}

Expand All @@ -331,6 +335,7 @@ impl Cmd {
args: Vec::with_capacity(arg_count),
cursor: None,
no_response: false,
span: None,
}
}

Expand Down Expand Up @@ -360,6 +365,16 @@ impl Cmd {
self
}

/// Associate a trackable span to the command. This allow tracking the lifetime
/// of the command.
///
/// A span is used by an OpenTelemetry backend to track the lifetime of the command
#[inline]
pub fn with_span(&mut self, name: &str) -> &mut Cmd {
self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name));
self
}

/// Works similar to `arg` but adds a cursor argument. This is always
/// an integer and also flips the command implementation to support a
/// different mode for the iterators where the iterator will ask for
Expand Down Expand Up @@ -582,6 +597,12 @@ impl Cmd {
pub fn is_no_response(&self) -> bool {
self.no_response
}

/// Return this command span
#[inline]
pub fn span(&self) -> Option<GlideSpan> {
self.span.clone()
}
}

impl fmt::Debug for Cmd {
Expand Down
9 changes: 7 additions & 2 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,15 @@ async fn send_command(
mut client: Client,
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
client
let child_span = cmd.span().map(|span| span.add_span("send_command"));
let res = client
.send_command(&cmd, routing)
.await
.map_err(|err| err.into())
.map_err(|err| err.into());
if let Some(child_span) = child_span {
child_span.end();
}
res
}

// Parse the cluster scan command parameters from protobuf and send the command to redis-rs.
Expand Down
6 changes: 6 additions & 0 deletions glide-core/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ authors = ["Valkey GLIDE Maintainers"]
lazy_static = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0"
futures-util = "0"
tokio = { version = "1", features = ["macros", "time"] }

opentelemetry = "0"
opentelemetry_sdk = { version = "0", features = ["rt-tokio"] }
5 changes: 5 additions & 0 deletions glide-core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use lazy_static::lazy_static;
use serde::Serialize;
use std::sync::RwLock as StdRwLock;
mod open_telemetry;
mod open_telemetry_exporter_file;

pub use open_telemetry::{GlideOpenTelemetry, GlideSpan};
pub use open_telemetry_exporter_file::SpanExporterFile;

#[derive(Default, Serialize)]
#[allow(dead_code)]
Expand Down
Loading

0 comments on commit 578e6ec

Please sign in to comment.