Skip to content

Commit

Permalink
[CLN]: split up log impls into separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 28, 2025
1 parent d397589 commit e0312e9
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 406 deletions.
201 changes: 201 additions & 0 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use super::config::LogConfig;
use crate::tracing::client_interceptor;
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::chroma_proto::log_service_client::LogServiceClient;
use chroma_types::chroma_proto::{self};
use chroma_types::{CollectionUuid, LogRecord};
use std::fmt::Debug;
use std::time::Duration;
use thiserror::Error;
use tonic::service::interceptor;
use tonic::transport::Endpoint;
use tonic::{Request, Status};
use uuid::Uuid;

#[derive(Clone, Debug)]
pub struct GrpcLog {
#[allow(clippy::type_complexity)]
client: LogServiceClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
>,
}

impl GrpcLog {
#[allow(clippy::type_complexity)]
pub(crate) fn new(
client: LogServiceClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
>,
) -> Self {
Self { client }
}
}

#[derive(Error, Debug)]
pub(crate) enum GrpcLogError {
#[error("Failed to connect to log service")]
FailedToConnect(#[from] tonic::transport::Error),
}

impl ChromaError for GrpcLogError {
fn code(&self) -> ErrorCodes {
match self {
GrpcLogError::FailedToConnect(_) => ErrorCodes::Internal,
}
}
}

#[async_trait]
impl Configurable<LogConfig> for GrpcLog {
async fn try_from_config(config: &LogConfig) -> Result<Self, Box<dyn ChromaError>> {
match &config {
LogConfig::Grpc(my_config) => {
let host = &my_config.host;
let port = &my_config.port;
tracing::info!("Connecting to log service at {}:{}", host, port);
let connection_string = format!("http://{}:{}", host, port);
let endpoint_res = match Endpoint::from_shared(connection_string) {
Ok(endpoint) => endpoint,
Err(e) => return Err(Box::new(GrpcLogError::FailedToConnect(e))),
};
let endpoint_res = endpoint_res
.connect_timeout(Duration::from_millis(my_config.connect_timeout_ms))
.timeout(Duration::from_millis(my_config.request_timeout_ms));
let client = endpoint_res.connect().await;
match client {
Ok(client) => {
let channel: LogServiceClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
> = LogServiceClient::with_interceptor(client, client_interceptor);
return Ok(GrpcLog::new(channel));
}
Err(e) => {
return Err(Box::new(GrpcLogError::FailedToConnect(e)));
}
}
}
}
}
}

impl GrpcLog {
pub(super) async fn read(
&mut self,
collection_id: CollectionUuid,
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
let end_timestamp = match end_timestamp {
Some(end_timestamp) => end_timestamp,
None => i64::MAX,
};
let request = self.client.pull_logs(chroma_proto::PullLogsRequest {
// NOTE(rescrv): Use the untyped string representation of the collection ID.
collection_id: collection_id.0.to_string(),
start_from_offset: offset,
batch_size,
end_timestamp,
});
let response = request.await;
match response {
Ok(response) => {
let logs = response.into_inner().records;
let mut result = Vec::new();
for log_record_proto in logs {
let log_record = log_record_proto.try_into();
match log_record {
Ok(log_record) => {
result.push(log_record);
}
Err(err) => {
return Err(PullLogsError::ConversionError(err));
}
}
}
Ok(result)
}
Err(e) => {
tracing::error!("Failed to pull logs: {}", e);
Err(PullLogsError::FailedToPullLogs(e))
}
}
}

pub(super) async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
let response = self
.client
.get_all_collection_info_to_compact(
chroma_proto::GetAllCollectionInfoToCompactRequest {
min_compaction_size,
},
)
.await;

match response {
Ok(response) => {
let collections = response.into_inner().all_collection_info;
let mut result = Vec::new();
for collection in collections {
let collection_uuid = match Uuid::parse_str(&collection.collection_id) {
Ok(uuid) => uuid,
Err(_) => {
tracing::error!(
"Failed to parse collection id: {}",
collection.collection_id
);
continue;
}
};
let collection_id = CollectionUuid(collection_uuid);
result.push(CollectionInfo {
collection_id,
first_log_offset: collection.first_log_offset,
first_log_ts: collection.first_log_ts,
});
}
Ok(result)
}
Err(e) => {
tracing::error!("Failed to get collections: {}", e);
Err(GetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(e))
}
}
}

pub(super) async fn update_collection_log_offset(
&mut self,
collection_id: CollectionUuid,
new_offset: i64,
) -> Result<(), UpdateCollectionLogOffsetError> {
let request = self.client.update_collection_log_offset(
chroma_proto::UpdateCollectionLogOffsetRequest {
// NOTE(rescrv): Use the untyped string representation of the collection ID.
collection_id: collection_id.0.to_string(),
log_offset: new_offset,
},
);
let response = request.await;
match response {
Ok(_) => Ok(()),
Err(e) => Err(UpdateCollectionLogOffsetError::FailedToUpdateCollectionLogOffset(e)),
}
}
}
135 changes: 135 additions & 0 deletions rust/log/src/in_memory_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::types::{
CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError,
};
use chroma_types::{CollectionUuid, LogRecord};
use std::collections::HashMap;
use std::fmt::Debug;

// This is used for testing only, it represents a log record that is stored in memory
// internal to a mock log implementation
#[derive(Clone)]
pub struct InternalLogRecord {
pub collection_id: CollectionUuid,
pub log_offset: i64,
pub log_ts: i64,
pub record: LogRecord,
}

impl Debug for InternalLogRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogRecord")
.field("collection_id", &self.collection_id)
.field("log_offset", &self.log_offset)
.field("log_ts", &self.log_ts)
.field("record", &self.record)
.finish()
}
}

// This is used for testing only
#[derive(Clone, Debug)]
pub struct InMemoryLog {
collection_to_log: HashMap<CollectionUuid, Vec<InternalLogRecord>>,
offsets: HashMap<CollectionUuid, i64>,
}

impl InMemoryLog {
pub fn new() -> InMemoryLog {
InMemoryLog {
collection_to_log: HashMap::new(),
offsets: HashMap::new(),
}
}

pub fn add_log(&mut self, collection_id: CollectionUuid, log: InternalLogRecord) {
let logs = self.collection_to_log.entry(collection_id).or_default();
// Ensure that the log offset is correct. Since we only use the InMemoryLog for testing,
// we expect callers to send us logs in the correct order.
let next_offset = logs.len() as i64;
if log.log_offset != next_offset {
panic!(
"Expected log offset to be {}, but got {}",
next_offset, log.log_offset
);
}
logs.push(log);
}
}

impl InMemoryLog {
pub(super) async fn read(
&mut self,
collection_id: CollectionUuid,
offset: i64,
batch_size: i32,
end_timestamp: Option<i64>,
) -> Result<Vec<LogRecord>, PullLogsError> {
let end_timestamp = match end_timestamp {
Some(end_timestamp) => end_timestamp,
None => i64::MAX,
};

let logs = match self.collection_to_log.get(&collection_id) {
Some(logs) => logs,
None => return Ok(Vec::new()),
};
let mut result = Vec::new();
for i in offset..(offset + batch_size as i64) {
if i < logs.len() as i64 && logs[i as usize].log_ts <= end_timestamp {
result.push(logs[i as usize].record.clone());
}
}
Ok(result)
}

pub(super) async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
let mut collections = Vec::new();
for (collection_id, log_records) in self.collection_to_log.iter() {
if log_records.is_empty() {
continue;
}
let filtered_records = match self.offsets.get(collection_id) {
Some(last_offset) => {
// Make sure there is at least one record past the last offset
let max_offset = log_records.len() as i64 - 1;
if *last_offset + 1 > max_offset {
continue;
}
&log_records[(*last_offset + 1) as usize..]
}
None => &log_records[..],
};

if (filtered_records.len() as u64) < min_compaction_size {
continue;
}

let mut logs = filtered_records.to_vec();
logs.sort_by(|a, b| a.log_offset.cmp(&b.log_offset));
collections.push(CollectionInfo {
collection_id: *collection_id,
first_log_offset: logs[0].log_offset,
first_log_ts: logs[0].log_ts,
});
}
Ok(collections)
}

pub(super) async fn update_collection_log_offset(
&mut self,
collection_id: CollectionUuid,
new_offset: i64,
) -> Result<(), UpdateCollectionLogOffsetError> {
self.offsets.insert(collection_id, new_offset);
Ok(())
}
}

impl Default for InMemoryLog {
fn default() -> Self {
Self::new()
}
}
9 changes: 7 additions & 2 deletions rust/log/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
pub mod config;
pub mod grpc_log;
pub mod in_memory_log;
#[allow(clippy::module_inception)]
pub mod log;
mod log;
pub mod test;
pub mod tracing;
pub mod types;

use chroma_config::Configurable;
use chroma_error::ChromaError;
use config::LogConfig;
pub use log::*;
pub use types::*;

pub async fn from_config(config: &LogConfig) -> Result<Box<log::Log>, Box<dyn ChromaError>> {
match &config {
config::LogConfig::Grpc(_) => Ok(Box::new(log::Log::Grpc(
log::GrpcLog::try_from_config(config).await?,
grpc_log::GrpcLog::try_from_config(config).await?,
))),
}
}
Loading

0 comments on commit e0312e9

Please sign in to comment.