Skip to content

Commit

Permalink
[ENH]: add push_logs() to log interface (#3576)
Browse files Browse the repository at this point in the history
## Description of changes

Does what it says on the tin.

## Test plan
*How are these changes tested?*

- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*

n/a
  • Loading branch information
codetheweb authored Jan 29, 2025
1 parent 495b557 commit 6b8d00d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
23 changes: 22 additions & 1 deletion rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use crate::tracing::client_interceptor;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use crate::PushLogsError;
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::chroma_proto::log_service_client::LogServiceClient;
use chroma_types::chroma_proto::{self};
use chroma_types::{CollectionUuid, LogRecord};
use chroma_types::{CollectionUuid, LogRecord, OperationRecord, RecordConversionError};
use std::fmt::Debug;
use std::time::Duration;
use thiserror::Error;
Expand Down Expand Up @@ -136,6 +137,26 @@ impl GrpcLog {
}
}

pub(super) async fn push_logs(
&mut self,
collection_id: CollectionUuid,
records: Vec<OperationRecord>,
) -> Result<(), PushLogsError> {
let request = chroma_proto::PushLogsRequest {
collection_id: collection_id.0.to_string(),

records:
records.into_iter().map(|r| r.try_into()).collect::<Result<
Vec<chroma_types::chroma_proto::OperationRecord>,
RecordConversionError,
>>()?,
};

self.client.push_logs(request).await?;

Ok(())
}

pub(super) async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
Expand Down
14 changes: 13 additions & 1 deletion rust/log/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::in_memory_log::InMemoryLog;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use chroma_types::{CollectionUuid, LogRecord};
use crate::PushLogsError;
use chroma_types::{CollectionUuid, LogRecord, OperationRecord};
use std::fmt::Debug;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -44,6 +45,17 @@ impl Log {
}
}

pub async fn push_logs(
&mut self,
collection_id: CollectionUuid,
records: Vec<OperationRecord>,
) -> Result<(), PushLogsError> {
match self {
Log::Grpc(log) => log.push_logs(collection_id, records).await,
Log::InMemory(_) => unimplemented!(),
}
}

pub async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
Expand Down
3 changes: 3 additions & 0 deletions rust/log/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ impl ChromaError for PullLogsError {
pub enum PushLogsError {
#[error("Failed to push logs")]
FailedToPushLogs(#[from] tonic::Status),
#[error("Failed to convert records to proto")]
ConversionError(#[from] RecordConversionError),
}

impl ChromaError for PushLogsError {
fn code(&self) -> ErrorCodes {
match self {
PushLogsError::FailedToPushLogs(_) => ErrorCodes::Internal,
PushLogsError::ConversionError(_) => ErrorCodes::Internal,
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions rust/types/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@ impl_base_convert_error!(RecordConversionError, {
RecordConversionError::VectorConversionError(inner) => inner.code(),
});

impl TryFrom<OperationRecord> for chroma_proto::OperationRecord {
type Error = RecordConversionError;

fn try_from(operation_record: OperationRecord) -> Result<Self, Self::Error> {
let vector = match operation_record.embedding {
Some(embedding) => {
let len = embedding.len();
let encoding = operation_record.encoding.unwrap_or(ScalarEncoding::FLOAT32);
Some((embedding, encoding, len))
}
None => None,
};

let metadata = operation_record.metadata.map(|metadata| metadata.into());

let proto_vector = match vector {
Some(vector) => Some(vector.try_into()?),
None => None,
};

Ok(chroma_proto::OperationRecord {
id: operation_record.id,
vector: proto_vector,
metadata,
operation: operation_record.operation as i32,
})
}
}

impl TryFrom<chroma_proto::OperationRecord> for OperationRecord {
type Error = RecordConversionError;

Expand Down

0 comments on commit 6b8d00d

Please sign in to comment.