-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CLN]: split up log impls into separate files
- Loading branch information
1 parent
5316a64
commit bc54aac
Showing
13 changed files
with
444 additions
and
412 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
use super::config::LogConfig; | ||
use crate::tracing::client_interceptor; | ||
use crate::types::{ | ||
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError, | ||
}; | ||
use async_trait::async_trait; | ||
use chroma_config::Configurable; | ||
use chroma_error::{ChromaError, ErrorCodes}; | ||
use chroma_types::chroma_proto::log_service_client::LogServiceClient; | ||
use chroma_types::chroma_proto::{self}; | ||
use chroma_types::{CollectionUuid, LogRecord}; | ||
use std::fmt::Debug; | ||
use std::time::Duration; | ||
use thiserror::Error; | ||
use tonic::service::interceptor; | ||
use tonic::transport::Endpoint; | ||
use tonic::{Request, Status}; | ||
use uuid::Uuid; | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct GrpcLog { | ||
#[allow(clippy::type_complexity)] | ||
client: LogServiceClient< | ||
interceptor::InterceptedService< | ||
tonic::transport::Channel, | ||
fn(Request<()>) -> Result<Request<()>, Status>, | ||
>, | ||
>, | ||
} | ||
|
||
impl GrpcLog { | ||
#[allow(clippy::type_complexity)] | ||
pub(crate) fn new( | ||
client: LogServiceClient< | ||
interceptor::InterceptedService< | ||
tonic::transport::Channel, | ||
fn(Request<()>) -> Result<Request<()>, Status>, | ||
>, | ||
>, | ||
) -> Self { | ||
Self { client } | ||
} | ||
} | ||
|
||
#[derive(Error, Debug)] | ||
pub(crate) enum GrpcLogError { | ||
#[error("Failed to connect to log service")] | ||
FailedToConnect(#[from] tonic::transport::Error), | ||
} | ||
|
||
impl ChromaError for GrpcLogError { | ||
fn code(&self) -> ErrorCodes { | ||
match self { | ||
GrpcLogError::FailedToConnect(_) => ErrorCodes::Internal, | ||
} | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Configurable<LogConfig> for GrpcLog { | ||
async fn try_from_config(config: &LogConfig) -> Result<Self, Box<dyn ChromaError>> { | ||
match &config { | ||
LogConfig::Grpc(my_config) => { | ||
let host = &my_config.host; | ||
let port = &my_config.port; | ||
tracing::info!("Connecting to log service at {}:{}", host, port); | ||
let connection_string = format!("http://{}:{}", host, port); | ||
let endpoint_res = match Endpoint::from_shared(connection_string) { | ||
Ok(endpoint) => endpoint, | ||
Err(e) => return Err(Box::new(GrpcLogError::FailedToConnect(e))), | ||
}; | ||
let endpoint_res = endpoint_res | ||
.connect_timeout(Duration::from_millis(my_config.connect_timeout_ms)) | ||
.timeout(Duration::from_millis(my_config.request_timeout_ms)); | ||
let client = endpoint_res.connect().await; | ||
match client { | ||
Ok(client) => { | ||
let channel: LogServiceClient< | ||
interceptor::InterceptedService< | ||
tonic::transport::Channel, | ||
fn(Request<()>) -> Result<Request<()>, Status>, | ||
>, | ||
> = LogServiceClient::with_interceptor(client, client_interceptor); | ||
return Ok(GrpcLog::new(channel)); | ||
} | ||
Err(e) => { | ||
return Err(Box::new(GrpcLogError::FailedToConnect(e))); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl GrpcLog { | ||
pub(super) async fn read( | ||
&mut self, | ||
collection_id: CollectionUuid, | ||
offset: i64, | ||
batch_size: i32, | ||
end_timestamp: Option<i64>, | ||
) -> Result<Vec<LogRecord>, PullLogsError> { | ||
let end_timestamp = match end_timestamp { | ||
Some(end_timestamp) => end_timestamp, | ||
None => i64::MAX, | ||
}; | ||
let request = self.client.pull_logs(chroma_proto::PullLogsRequest { | ||
// NOTE(rescrv): Use the untyped string representation of the collection ID. | ||
collection_id: collection_id.0.to_string(), | ||
start_from_offset: offset, | ||
batch_size, | ||
end_timestamp, | ||
}); | ||
let response = request.await; | ||
match response { | ||
Ok(response) => { | ||
let logs = response.into_inner().records; | ||
let mut result = Vec::new(); | ||
for log_record_proto in logs { | ||
let log_record = log_record_proto.try_into(); | ||
match log_record { | ||
Ok(log_record) => { | ||
result.push(log_record); | ||
} | ||
Err(err) => { | ||
return Err(PullLogsError::ConversionError(err)); | ||
} | ||
} | ||
} | ||
Ok(result) | ||
} | ||
Err(e) => { | ||
tracing::error!("Failed to pull logs: {}", e); | ||
Err(PullLogsError::FailedToPullLogs(e)) | ||
} | ||
} | ||
} | ||
|
||
pub(super) async fn get_collections_with_new_data( | ||
&mut self, | ||
min_compaction_size: u64, | ||
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> { | ||
let response = self | ||
.client | ||
.get_all_collection_info_to_compact( | ||
chroma_proto::GetAllCollectionInfoToCompactRequest { | ||
min_compaction_size, | ||
}, | ||
) | ||
.await; | ||
|
||
match response { | ||
Ok(response) => { | ||
let collections = response.into_inner().all_collection_info; | ||
let mut result = Vec::new(); | ||
for collection in collections { | ||
let collection_uuid = match Uuid::parse_str(&collection.collection_id) { | ||
Ok(uuid) => uuid, | ||
Err(_) => { | ||
tracing::error!( | ||
"Failed to parse collection id: {}", | ||
collection.collection_id | ||
); | ||
continue; | ||
} | ||
}; | ||
let collection_id = CollectionUuid(collection_uuid); | ||
result.push(CollectionInfo { | ||
collection_id, | ||
first_log_offset: collection.first_log_offset, | ||
first_log_ts: collection.first_log_ts, | ||
}); | ||
} | ||
Ok(result) | ||
} | ||
Err(e) => { | ||
tracing::error!("Failed to get collections: {}", e); | ||
Err(GetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(e)) | ||
} | ||
} | ||
} | ||
|
||
pub(super) async fn update_collection_log_offset( | ||
&mut self, | ||
collection_id: CollectionUuid, | ||
new_offset: i64, | ||
) -> Result<(), UpdateCollectionLogOffsetError> { | ||
let request = self.client.update_collection_log_offset( | ||
chroma_proto::UpdateCollectionLogOffsetRequest { | ||
// NOTE(rescrv): Use the untyped string representation of the collection ID. | ||
collection_id: collection_id.0.to_string(), | ||
log_offset: new_offset, | ||
}, | ||
); | ||
let response = request.await; | ||
match response { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(UpdateCollectionLogOffsetError::FailedToUpdateCollectionLogOffset(e)), | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
use crate::types::{ | ||
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError, | ||
}; | ||
use chroma_types::{CollectionUuid, LogRecord}; | ||
use std::collections::HashMap; | ||
use std::fmt::Debug; | ||
|
||
// This is used for testing only, it represents a log record that is stored in memory | ||
// internal to a mock log implementation | ||
#[derive(Clone)] | ||
pub struct InternalLogRecord { | ||
pub collection_id: CollectionUuid, | ||
pub log_offset: i64, | ||
pub log_ts: i64, | ||
pub record: LogRecord, | ||
} | ||
|
||
impl Debug for InternalLogRecord { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("LogRecord") | ||
.field("collection_id", &self.collection_id) | ||
.field("log_offset", &self.log_offset) | ||
.field("log_ts", &self.log_ts) | ||
.field("record", &self.record) | ||
.finish() | ||
} | ||
} | ||
|
||
// This is used for testing only | ||
#[derive(Clone, Debug)] | ||
pub struct InMemoryLog { | ||
collection_to_log: HashMap<CollectionUuid, Vec<InternalLogRecord>>, | ||
offsets: HashMap<CollectionUuid, i64>, | ||
} | ||
|
||
impl InMemoryLog { | ||
pub fn new() -> InMemoryLog { | ||
InMemoryLog { | ||
collection_to_log: HashMap::new(), | ||
offsets: HashMap::new(), | ||
} | ||
} | ||
|
||
pub fn add_log(&mut self, collection_id: CollectionUuid, log: InternalLogRecord) { | ||
let logs = self.collection_to_log.entry(collection_id).or_default(); | ||
// Ensure that the log offset is correct. Since we only use the InMemoryLog for testing, | ||
// we expect callers to send us logs in the correct order. | ||
let next_offset = logs.len() as i64; | ||
if log.log_offset != next_offset { | ||
panic!( | ||
"Expected log offset to be {}, but got {}", | ||
next_offset, log.log_offset | ||
); | ||
} | ||
logs.push(log); | ||
} | ||
} | ||
|
||
impl InMemoryLog { | ||
pub(super) async fn read( | ||
&mut self, | ||
collection_id: CollectionUuid, | ||
offset: i64, | ||
batch_size: i32, | ||
end_timestamp: Option<i64>, | ||
) -> Result<Vec<LogRecord>, PullLogsError> { | ||
let end_timestamp = match end_timestamp { | ||
Some(end_timestamp) => end_timestamp, | ||
None => i64::MAX, | ||
}; | ||
|
||
let logs = match self.collection_to_log.get(&collection_id) { | ||
Some(logs) => logs, | ||
None => return Ok(Vec::new()), | ||
}; | ||
let mut result = Vec::new(); | ||
for i in offset..(offset + batch_size as i64) { | ||
if i < logs.len() as i64 && logs[i as usize].log_ts <= end_timestamp { | ||
result.push(logs[i as usize].record.clone()); | ||
} | ||
} | ||
Ok(result) | ||
} | ||
|
||
pub(super) async fn get_collections_with_new_data( | ||
&mut self, | ||
min_compaction_size: u64, | ||
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> { | ||
let mut collections = Vec::new(); | ||
for (collection_id, log_records) in self.collection_to_log.iter() { | ||
if log_records.is_empty() { | ||
continue; | ||
} | ||
let filtered_records = match self.offsets.get(collection_id) { | ||
Some(last_offset) => { | ||
// Make sure there is at least one record past the last offset | ||
let max_offset = log_records.len() as i64 - 1; | ||
if *last_offset + 1 > max_offset { | ||
continue; | ||
} | ||
&log_records[(*last_offset + 1) as usize..] | ||
} | ||
None => &log_records[..], | ||
}; | ||
|
||
if (filtered_records.len() as u64) < min_compaction_size { | ||
continue; | ||
} | ||
|
||
let mut logs = filtered_records.to_vec(); | ||
logs.sort_by(|a, b| a.log_offset.cmp(&b.log_offset)); | ||
collections.push(CollectionInfo { | ||
collection_id: *collection_id, | ||
first_log_offset: logs[0].log_offset, | ||
first_log_ts: logs[0].log_ts, | ||
}); | ||
} | ||
Ok(collections) | ||
} | ||
|
||
pub(super) async fn update_collection_log_offset( | ||
&mut self, | ||
collection_id: CollectionUuid, | ||
new_offset: i64, | ||
) -> Result<(), UpdateCollectionLogOffsetError> { | ||
self.offsets.insert(collection_id, new_offset); | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl Default for InMemoryLog { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,22 @@ | ||
pub mod config; | ||
pub mod grpc_log; | ||
pub mod in_memory_log; | ||
#[allow(clippy::module_inception)] | ||
pub mod log; | ||
mod log; | ||
pub mod test; | ||
pub mod tracing; | ||
pub mod types; | ||
|
||
use chroma_config::Configurable; | ||
use chroma_error::ChromaError; | ||
use config::LogConfig; | ||
pub use log::*; | ||
pub use types::*; | ||
|
||
pub async fn from_config(config: &LogConfig) -> Result<Box<log::Log>, Box<dyn ChromaError>> { | ||
match &config { | ||
config::LogConfig::Grpc(_) => Ok(Box::new(log::Log::Grpc( | ||
log::GrpcLog::try_from_config(config).await?, | ||
grpc_log::GrpcLog::try_from_config(config).await?, | ||
))), | ||
} | ||
} |
Oops, something went wrong.