Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on OpenTelemetry #2892

Merged
merged 8 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions glide-core/redis-rs/redis/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{fmt, io};
use crate::connection::ConnectionLike;
use crate::pipeline::Pipeline;
use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
use telemetrylib::GlideSpan;

/// An argument to a redis command
#[derive(Clone)]
Expand All @@ -30,6 +31,8 @@ pub struct Cmd {
cursor: Option<u64>,
// If it's true command's response won't be read from socket. Useful for Pub/Sub.
no_response: bool,
/// The span associated with this command
span: Option<GlideSpan>,
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}

/// Represents a redis iterator.
Expand Down Expand Up @@ -321,6 +324,7 @@ impl Cmd {
args: vec![],
cursor: None,
no_response: false,
span: None,
}
}

Expand All @@ -331,6 +335,7 @@ impl Cmd {
args: Vec::with_capacity(arg_count),
cursor: None,
no_response: false,
span: None,
}
}

Expand Down Expand Up @@ -360,6 +365,16 @@ impl Cmd {
self
}

/// Associate a trackable span to the command. This allow tracking the lifetime
/// of the command.
///
/// A span is used by an OpenTelemetry backend to track the lifetime of the command
#[inline]
pub fn with_span(&mut self, name: &str) -> &mut Cmd {
self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name));
self
}

/// Works similar to `arg` but adds a cursor argument. This is always
/// an integer and also flips the command implementation to support a
/// different mode for the iterators where the iterator will ask for
Expand Down Expand Up @@ -582,6 +597,12 @@ impl Cmd {
pub fn is_no_response(&self) -> bool {
self.no_response
}

/// Return this command span
#[inline]
pub fn span(&self) -> Option<GlideSpan> {
self.span.clone()
}
}

impl fmt::Debug for Cmd {
Expand Down
11 changes: 9 additions & 2 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,17 @@ async fn send_command(
mut client: Client,
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
client
if let Some(span) = cmd.span() {
span.add_event("RequestSent");
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}
let res = client
.send_command(&cmd, routing)
.await
.map_err(|err| err.into())
.map_err(|err| err.into());
if let Some(span) = cmd.span() {
span.add_event("ResponseArrived");
}
res
}

// Parse the cluster scan command parameters from protobuf and send the command to redis-rs.
Expand Down
6 changes: 6 additions & 0 deletions glide-core/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ authors = ["Valkey GLIDE Maintainers"]
lazy_static = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0"
futures-util = "0"
tokio = { version = "1", features = ["macros", "time"] }

opentelemetry = "0"
opentelemetry_sdk = { version = "0", features = ["rt-tokio"] }
5 changes: 5 additions & 0 deletions glide-core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use lazy_static::lazy_static;
use serde::Serialize;
use std::sync::RwLock as StdRwLock;
mod open_telemetry;
mod open_telemetry_exporter_file;

pub use open_telemetry::{GlideOpenTelemetry, GlideSpan};
pub use open_telemetry_exporter_file::SpanExporterFile;

#[derive(Default, Serialize)]
#[allow(dead_code)]
Expand Down
Loading
Loading