Skip to content

Commit

Permalink
[ENH]: fix tracing path for query service from Rust frontend (#3631)
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb authored Jan 31, 2025
1 parent 31a1cd6 commit 87dc549
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 229 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 29 additions & 7 deletions rust/frontend/src/executor/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,26 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tonic::transport::{Channel, Endpoint};
use tonic::{
service::interceptor::InterceptedService,
transport::{Channel, Endpoint},
};
use tower::discover::Change;

pub(super) type NodeNameToClient = Arc<
RwLock<
HashMap<
String,
QueryExecutorClient<
InterceptedService<
tonic::transport::Channel,
chroma_tracing::GrpcClientInterceptor,
>,
>,
>,
>,
>;

/// A component that manages the gRPC clients for the query executors
/// # Fields
/// - `node_name_to_client` - A map from the node name to the gRPC client
Expand All @@ -22,8 +39,7 @@ use tower::discover::Change;
#[derive(Debug)]
pub(super) struct ClientManager {
// The name of the node to the grpc client
node_name_to_client:
Arc<RwLock<HashMap<String, QueryExecutorClient<tonic::transport::Channel>>>>,
node_name_to_client: NodeNameToClient,
// The name of the node to the sender to the channel to add / remove the ip
node_name_to_change_sender:
HashMap<String, tokio::sync::mpsc::Sender<Change<String, Endpoint>>>,
Expand All @@ -35,9 +51,7 @@ pub(super) struct ClientManager {

impl ClientManager {
pub(super) fn new(
node_name_to_client: Arc<
RwLock<HashMap<String, QueryExecutorClient<tonic::transport::Channel>>>,
>,
node_name_to_client: NodeNameToClient,
connections_per_node: usize,
connect_timeout_ms: u64,
request_timeout_ms: u64,
Expand Down Expand Up @@ -93,7 +107,15 @@ impl ClientManager {
None => {
let (chan, channel_change_sender) =
Channel::balance_channel::<String>(self.connections_per_node);
let client = QueryExecutorClient::new(chan);
let client: QueryExecutorClient<
InterceptedService<
tonic::transport::Channel,
chroma_tracing::GrpcClientInterceptor,
>,
> = QueryExecutorClient::with_interceptor(
chan,
chroma_tracing::grpc_client_interceptor,
);

let mut node_name_to_client_guard = self.node_name_to_client.write();
node_name_to_client_guard.insert(node.to_string(), client);
Expand Down
23 changes: 11 additions & 12 deletions rust/frontend/src/executor/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::client_manager::NodeNameToClient;
use super::{client_manager::ClientManager, config};
use async_trait::async_trait;
use backon::ExponentialBuilder;
Expand All @@ -18,11 +19,15 @@ use chroma_types::{
plan::{Count, Get, Knn},
CollectionUuid, ExecutorError,
};
use parking_lot::RwLock;
use rand::seq::SliceRandom;
use std::{cmp::min, collections::HashMap, sync::Arc};
use std::cmp::min;
use tonic::service::interceptor::InterceptedService;
use tonic::Request;

type Client = QueryExecutorClient<
InterceptedService<tonic::transport::Channel, chroma_tracing::GrpcClientInterceptor>,
>;

/// A distributed executor that routes requests to the appropriate node based on the assignment policy
/// # Fields
/// - `node_name_to_client` - A map from the node name to the gRPC client
Expand All @@ -35,8 +40,7 @@ use tonic::Request;
/// outside.
#[derive(Clone, Debug)]
pub struct DistributedExecutor {
node_name_to_client:
Arc<RwLock<HashMap<String, QueryExecutorClient<tonic::transport::Channel>>>>,
node_name_to_client: NodeNameToClient,
assignment_policy: Box<dyn AssignmentPolicy>,
replication_factor: usize,
backoff: ExponentialBuilder,
Expand All @@ -48,7 +52,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
(config, system): &(config::DistributedExecutorConfig, System),
) -> Result<Self, Box<dyn ChromaError>> {
let assignment_policy = assignment::from_config(&config.assignment).await?;
let node_name_to_client = Arc::new(RwLock::new(HashMap::new()));
let node_name_to_client = NodeNameToClient::default();
let client_manager = ClientManager::new(
node_name_to_client.clone(),
config.connections_per_node,
Expand Down Expand Up @@ -128,10 +132,7 @@ impl DistributedExecutor {
/// # Errors
/// - If no client is found for the given collection id
/// - If the assignment policy fails to assign the collection id
fn clients(
&mut self,
collection_id: CollectionUuid,
) -> Result<Vec<QueryExecutorClient<tonic::transport::Channel>>, ExecutorError> {
fn clients(&mut self, collection_id: CollectionUuid) -> Result<Vec<Client>, ExecutorError> {
let node_name_to_client_guard = self.node_name_to_client.read();
let members: Vec<String> = node_name_to_client_guard.keys().cloned().collect();
let target_replication_factor = min(self.replication_factor, members.len());
Expand Down Expand Up @@ -163,9 +164,7 @@ fn no_clients_found_status() -> tonic::Status {
tonic::Status::internal("No clients found")
}

fn choose_client(
clients: &[QueryExecutorClient<tonic::transport::Channel>],
) -> Result<QueryExecutorClient<tonic::transport::Channel>, tonic::Status> {
fn choose_client(clients: &[Client]) -> Result<Client, tonic::Status> {
Ok(clients
.choose(&mut rand::thread_rng())
.ok_or(no_clients_found_status())?
Expand Down
2 changes: 1 addition & 1 deletion rust/log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-segment = { workspace = true }
chroma-types = { workspace = true }

chroma-tracing = { workspace = true, features = ["grpc"] }
6 changes: 4 additions & 2 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::config::LogConfig;
use crate::tracing::client_interceptor;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
Expand Down Expand Up @@ -81,7 +80,10 @@ impl Configurable<LogConfig> for GrpcLog {
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
> = LogServiceClient::with_interceptor(client, client_interceptor);
> = LogServiceClient::with_interceptor(
client,
chroma_tracing::grpc_client_interceptor,
);
return Ok(GrpcLog::new(channel));
}
Err(e) => {
Expand Down
1 change: 0 additions & 1 deletion rust/log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod in_memory_log;
#[allow(clippy::module_inception)]
mod log;
pub mod test;
pub mod tracing;
pub mod types;

use chroma_config::Configurable;
Expand Down
47 changes: 0 additions & 47 deletions rust/log/src/tracing.rs

This file was deleted.

1 change: 1 addition & 0 deletions rust/sysdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ parking_lot = { workspace = true }
chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-types = { workspace = true }
chroma-tracing = { workspace = true, features = ["grpc"] }
1 change: 0 additions & 1 deletion rust/sysdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod config;
#[allow(clippy::module_inception)]
pub mod sysdb;
pub mod test_sysdb;
mod util;
pub use config::*;
pub use sysdb::*;
pub use test_sysdb::*;
3 changes: 1 addition & 2 deletions rust/sysdb/src/sysdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::config::SysDbConfig;
use super::test_sysdb::TestSysDb;
use crate::util::client_interceptor;
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
Expand Down Expand Up @@ -335,7 +334,7 @@ impl Configurable<SysDbConfig> for GrpcSysDb {
Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
> = SysDbClient::with_interceptor(chans, client_interceptor);
> = SysDbClient::with_interceptor(chans, chroma_tracing::grpc_client_interceptor);
Ok(GrpcSysDb { client })
}
}
Expand Down
3 changes: 3 additions & 0 deletions rust/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
tracing.workspace = true
tonic.workspace = true

[features]
grpc = []
7 changes: 4 additions & 3 deletions rust/sysdb/src/util.rs → rust/tracing/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::str::FromStr;

use opentelemetry::trace::TraceContextExt;
use std::str::FromStr;
use tonic::{metadata::MetadataValue, Request, Status};
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

const TRACE_ID_HEADER_KEY: &str = "chroma-traceid";
const SPAN_ID_HEADER_KEY: &str = "chroma-spanid";

pub(crate) fn client_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
pub type GrpcClientInterceptor = fn(Request<()>) -> Result<Request<()>, Status>;

pub fn grpc_client_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
// If span is disabled then nothing to append in the header.
if Span::current().is_disabled() {
return Ok(request);
Expand Down
Loading

0 comments on commit 87dc549

Please sign in to comment.