Skip to content

Commit

Permalink
[ENH]: add Rust SQLite impl of log
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 28, 2025
1 parent a9ccc20 commit 03ce145
Show file tree
Hide file tree
Showing 11 changed files with 933 additions and 31 deletions.
454 changes: 443 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ tracing-bunyan-formatter = "0.3"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] }
sqlx = { version = "0.8.3", features = ["runtime-tokio", "sqlite"] }
bytemuck = "1.21.0"

chroma-benchmark = { path = "rust/benchmark" }
chroma-blockstore = { path = "rust/blockstore" }
Expand Down
5 changes: 4 additions & 1 deletion rust/log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ tracing = { workspace = true }
# Used by tracing
tracing-opentelemetry = { workspace = true }
uuid = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }
bytemuck = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-segment = { workspace = true }
chroma-types = { workspace = true }

25 changes: 21 additions & 4 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::config::LogConfig;
use crate::tracing::client_interceptor;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
CollectionInfo, GetCollectionsWithNewDataError, UpdateCollectionLogOffsetError,
};
use crate::PushLogsError;
use async_trait::async_trait;
Expand All @@ -18,6 +18,23 @@ use tonic::transport::Endpoint;
use tonic::{Request, Status};
use uuid::Uuid;

#[derive(Error, Debug)]
pub enum GrpcPullLogsError {
#[error("Failed to fetch")]
FailedToPullLogs(#[from] tonic::Status),
#[error("Failed to convert proto embedding record into EmbeddingRecord")]
ConversionError(#[from] RecordConversionError),
}

impl ChromaError for GrpcPullLogsError {
fn code(&self) -> ErrorCodes {
match self {
GrpcPullLogsError::FailedToPullLogs(_) => ErrorCodes::Internal,
GrpcPullLogsError::ConversionError(_) => ErrorCodes::Internal,
}
}
}

#[derive(Clone, Debug)]
pub struct GrpcLog {
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -100,7 +117,7 @@ impl GrpcLog {
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
) -> Result<Vec<LogRecord>, GrpcPullLogsError> {
let end_timestamp = match end_timestamp {
Some(end_timestamp) => end_timestamp,
None => i64::MAX,
Expand All @@ -124,15 +141,15 @@ impl GrpcLog {
result.push(log_record);
}
Err(err) => {
return Err(PullLogsError::ConversionError(err));
return Err(GrpcPullLogsError::ConversionError(err));
}
}
}
Ok(result)
}
Err(e) => {
tracing::error!("Failed to pull logs: {}", e);
Err(PullLogsError::FailedToPullLogs(e))
Err(GrpcPullLogsError::FailedToPullLogs(e))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod grpc_log;
pub mod in_memory_log;
#[allow(clippy::module_inception)]
mod log;
pub mod sqlite_log;
pub mod test;
pub mod tracing;
pub mod types;
Expand Down
41 changes: 28 additions & 13 deletions rust/log/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::grpc_log::GrpcLog;
use crate::in_memory_log::InMemoryLog;
use crate::sqlite_log::SqliteLog;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
CollectionInfo, GetCollectionsWithNewDataError, UpdateCollectionLogOffsetError,
};
use crate::PushLogsError;
use chroma_error::ChromaError;
use chroma_types::{CollectionUuid, LogRecord, OperationRecord};
use std::fmt::Debug;

Expand All @@ -20,6 +21,7 @@ pub struct CollectionRecord {

#[derive(Clone, Debug)]
pub enum Log {
Sqlite(SqliteLog),
Grpc(GrpcLog),
#[allow(dead_code)]
InMemory(InMemoryLog),
Expand All @@ -32,26 +34,37 @@ impl Log {
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
) -> Result<Vec<LogRecord>, Box<dyn ChromaError>> {
match self {
Log::Grpc(log) => {
log.read(collection_id, offset, batch_size, end_timestamp)
.await
}
Log::InMemory(log) => {
log.read(collection_id, offset, batch_size, end_timestamp)
.await
}
Log::Sqlite(log) => log
.read(collection_id, offset, batch_size, end_timestamp)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::Grpc(log) => log
.read(collection_id, offset, batch_size, end_timestamp)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::InMemory(log) => log
.read(collection_id, offset, batch_size, end_timestamp)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
}
}

pub async fn push_logs(
&mut self,
collection_id: CollectionUuid,
records: Vec<OperationRecord>,
) -> Result<(), PushLogsError> {
) -> Result<(), Box<dyn ChromaError>> {
match self {
Log::Grpc(log) => log.push_logs(collection_id, records).await,
Log::Sqlite(log) => log
.push_logs(collection_id, records)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::Grpc(log) => log
.push_logs(collection_id, records)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::InMemory(_) => unimplemented!(),
}
}
Expand All @@ -61,6 +74,7 @@ impl Log {
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
match self {
Log::Sqlite(_) => unimplemented!(),
Log::Grpc(log) => log.get_collections_with_new_data(min_compaction_size).await,
Log::InMemory(log) => log.get_collections_with_new_data(min_compaction_size).await,
}
Expand All @@ -72,6 +86,7 @@ impl Log {
new_offset: i64,
) -> Result<(), UpdateCollectionLogOffsetError> {
match self {
Log::Sqlite(_) => unimplemented!(),
Log::Grpc(log) => {
log.update_collection_log_offset(collection_id, new_offset)
.await
Expand Down
Loading

0 comments on commit 03ce145

Please sign in to comment.