Skip to content

Commit

Permalink
[ENH]: add Rust SQLite impl of log
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 29, 2025
1 parent 206fb61 commit 0b5f332
Show file tree
Hide file tree
Showing 13 changed files with 1,150 additions and 125 deletions.
454 changes: 443 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ tracing-bunyan-formatter = "0.3"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] }
sqlx = { version = "0.8.3", features = ["runtime-tokio", "sqlite"] }
bytemuck = "1.21.0"

chroma-benchmark = { path = "rust/benchmark" }
chroma-blockstore = { path = "rust/blockstore" }
Expand Down
6 changes: 5 additions & 1 deletion rust/log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ tracing = { workspace = true }
# Used by tracing
tracing-opentelemetry = { workspace = true }
uuid = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }
bytemuck = { workspace = true }
futures = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-segment = { workspace = true }
chroma-types = { workspace = true }

87 changes: 75 additions & 12 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use super::config::LogConfig;
use crate::tracing::client_interceptor;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use crate::PushLogsError;
use crate::types::CollectionInfo;
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
Expand All @@ -18,6 +15,72 @@ use tonic::transport::Endpoint;
use tonic::{Request, Status};
use uuid::Uuid;

#[derive(Error, Debug)]
pub enum GrpcPullLogsError {
#[error("Failed to fetch")]
FailedToPullLogs(#[from] tonic::Status),
#[error("Failed to convert proto embedding record into EmbeddingRecord")]
ConversionError(#[from] RecordConversionError),
}

impl ChromaError for GrpcPullLogsError {
fn code(&self) -> ErrorCodes {
match self {
GrpcPullLogsError::FailedToPullLogs(_) => ErrorCodes::Internal,
GrpcPullLogsError::ConversionError(_) => ErrorCodes::Internal,
}
}
}

#[derive(Error, Debug)]
pub enum GrpcPushLogsError {
#[error("Failed to push logs")]
FailedToPushLogs(#[from] tonic::Status),
#[error("Failed to convert records to proto")]
ConversionError(#[from] RecordConversionError),
}

impl ChromaError for GrpcPushLogsError {
fn code(&self) -> ErrorCodes {
match self {
GrpcPushLogsError::FailedToPushLogs(_) => ErrorCodes::Internal,
GrpcPushLogsError::ConversionError(_) => ErrorCodes::Internal,
}
}
}

#[derive(Error, Debug)]
pub enum GrpcGetCollectionsWithNewDataError {
#[error("Failed to fetch")]
FailedGetCollectionsWithNewData(#[from] tonic::Status),
}

impl ChromaError for GrpcGetCollectionsWithNewDataError {
fn code(&self) -> ErrorCodes {
match self {
GrpcGetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(_) => {
ErrorCodes::Internal
}
}
}
}

#[derive(Error, Debug)]
pub enum GrpcUpdateCollectionLogOffsetError {
#[error("Failed to update collection log offset")]
FailedToUpdateCollectionLogOffset(#[from] tonic::Status),
}

impl ChromaError for GrpcUpdateCollectionLogOffsetError {
fn code(&self) -> ErrorCodes {
match self {
GrpcUpdateCollectionLogOffsetError::FailedToUpdateCollectionLogOffset(_) => {
ErrorCodes::Internal
}
}
}
}

#[derive(Clone, Debug)]
pub struct GrpcLog {
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -100,7 +163,7 @@ impl GrpcLog {
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
) -> Result<Vec<LogRecord>, GrpcPullLogsError> {
let end_timestamp = match end_timestamp {
Some(end_timestamp) => end_timestamp,
None => i64::MAX,
Expand All @@ -124,15 +187,15 @@ impl GrpcLog {
result.push(log_record);
}
Err(err) => {
return Err(PullLogsError::ConversionError(err));
return Err(GrpcPullLogsError::ConversionError(err));
}
}
}
Ok(result)
}
Err(e) => {
tracing::error!("Failed to pull logs: {}", e);
Err(PullLogsError::FailedToPullLogs(e))
Err(GrpcPullLogsError::FailedToPullLogs(e))
}
}
}
Expand All @@ -141,7 +204,7 @@ impl GrpcLog {
&mut self,
collection_id: CollectionUuid,
records: Vec<OperationRecord>,
) -> Result<(), PushLogsError> {
) -> Result<(), GrpcPushLogsError> {
let request = chroma_proto::PushLogsRequest {
collection_id: collection_id.0.to_string(),

Expand All @@ -160,7 +223,7 @@ impl GrpcLog {
pub(super) async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
) -> Result<Vec<CollectionInfo>, GrpcGetCollectionsWithNewDataError> {
let response = self
.client
.get_all_collection_info_to_compact(
Expand Down Expand Up @@ -196,7 +259,7 @@ impl GrpcLog {
}
Err(e) => {
tracing::error!("Failed to get collections: {}", e);
Err(GetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(e))
Err(GrpcGetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(e))
}
}
}
Expand All @@ -205,7 +268,7 @@ impl GrpcLog {
&mut self,
collection_id: CollectionUuid,
new_offset: i64,
) -> Result<(), UpdateCollectionLogOffsetError> {
) -> Result<(), GrpcUpdateCollectionLogOffsetError> {
let request = self.client.update_collection_log_offset(
chroma_proto::UpdateCollectionLogOffsetRequest {
// NOTE(rescrv): Use the untyped string representation of the collection ID.
Expand All @@ -216,7 +279,7 @@ impl GrpcLog {
let response = request.await;
match response {
Ok(_) => Ok(()),
Err(e) => Err(UpdateCollectionLogOffsetError::FailedToUpdateCollectionLogOffset(e)),
Err(e) => Err(GrpcUpdateCollectionLogOffsetError::FailedToUpdateCollectionLogOffset(e)),
}
}
}
17 changes: 7 additions & 10 deletions rust/log/src/in_memory_log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use crate::types::CollectionInfo;
use chroma_types::{CollectionUuid, LogRecord};
use std::collections::HashMap;
use std::fmt::Debug;
Expand Down Expand Up @@ -63,29 +61,29 @@ impl InMemoryLog {
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
) -> Vec<LogRecord> {
let end_timestamp = match end_timestamp {
Some(end_timestamp) => end_timestamp,
None => i64::MAX,
};

let logs = match self.collection_to_log.get(&collection_id) {
Some(logs) => logs,
None => return Ok(Vec::new()),
None => return Vec::new(),
};
let mut result = Vec::new();
for i in offset..(offset + batch_size as i64) {
if i < logs.len() as i64 && logs[i as usize].log_ts <= end_timestamp {
result.push(logs[i as usize].record.clone());
}
}
Ok(result)
result
}

pub(super) async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
) -> Vec<CollectionInfo> {
let mut collections = Vec::new();
for (collection_id, log_records) in self.collection_to_log.iter() {
if log_records.is_empty() {
Expand Down Expand Up @@ -115,16 +113,15 @@ impl InMemoryLog {
first_log_ts: logs[0].log_ts,
});
}
Ok(collections)
collections
}

pub(super) async fn update_collection_log_offset(
&mut self,
collection_id: CollectionUuid,
new_offset: i64,
) -> Result<(), UpdateCollectionLogOffsetError> {
) {
self.offsets.insert(collection_id, new_offset);
Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions rust/log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod grpc_log;
pub mod in_memory_log;
#[allow(clippy::module_inception)]
mod log;
pub mod sqlite_log;
pub mod test;
pub mod tracing;
pub mod types;
Expand Down
70 changes: 46 additions & 24 deletions rust/log/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::grpc_log::GrpcLog;
use crate::in_memory_log::InMemoryLog;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use crate::PushLogsError;
use crate::sqlite_log::SqliteLog;
use crate::types::CollectionInfo;
use chroma_error::ChromaError;
use chroma_types::{CollectionUuid, LogRecord, OperationRecord};
use std::fmt::Debug;

Expand All @@ -20,6 +19,7 @@ pub struct CollectionRecord {

#[derive(Clone, Debug)]
pub enum Log {
Sqlite(SqliteLog),
Grpc(GrpcLog),
#[allow(dead_code)]
InMemory(InMemoryLog),
Expand All @@ -32,53 +32,75 @@ impl Log {
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
) -> Result<Vec<LogRecord>, Box<dyn ChromaError>> {
match self {
Log::Grpc(log) => {
log.read(collection_id, offset, batch_size, end_timestamp)
.await
}
Log::InMemory(log) => {
log.read(collection_id, offset, batch_size, end_timestamp)
.await
}
Log::Sqlite(log) => log
.read(collection_id, offset, batch_size, end_timestamp)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::Grpc(log) => log
.read(collection_id, offset, batch_size, end_timestamp)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::InMemory(log) => Ok(log
.read(collection_id, offset, batch_size, end_timestamp)
.await),
}
}

pub async fn push_logs(
&mut self,
collection_id: CollectionUuid,
records: Vec<OperationRecord>,
) -> Result<(), PushLogsError> {
) -> Result<(), Box<dyn ChromaError>> {
match self {
Log::Grpc(log) => log.push_logs(collection_id, records).await,
Log::Sqlite(log) => log
.push_logs(collection_id, records)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::Grpc(log) => log
.push_logs(collection_id, records)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::InMemory(_) => unimplemented!(),
}
}

pub async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
) -> Result<Vec<CollectionInfo>, Box<dyn ChromaError>> {
match self {
Log::Grpc(log) => log.get_collections_with_new_data(min_compaction_size).await,
Log::InMemory(log) => log.get_collections_with_new_data(min_compaction_size).await,
Log::Sqlite(log) => log
.get_collections_with_new_data(min_compaction_size)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::Grpc(log) => log
.get_collections_with_new_data(min_compaction_size)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::InMemory(log) => Ok(log.get_collections_with_new_data(min_compaction_size).await),
}
}

pub async fn update_collection_log_offset(
&mut self,
collection_id: CollectionUuid,
new_offset: i64,
) -> Result<(), UpdateCollectionLogOffsetError> {
) -> Result<(), Box<dyn ChromaError>> {
match self {
Log::Grpc(log) => {
log.update_collection_log_offset(collection_id, new_offset)
.await
}
Log::Sqlite(log) => log
.update_collection_log_offset(collection_id, new_offset)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::Grpc(log) => log
.update_collection_log_offset(collection_id, new_offset)
.await
.map_err(|e| Box::new(e) as Box<dyn ChromaError>),
Log::InMemory(log) => {
log.update_collection_log_offset(collection_id, new_offset)
.await
.await;
Ok(())
}
}
}
Expand Down
Loading

0 comments on commit 0b5f332

Please sign in to comment.