From 1cd2ced9577369171c6a19ffa9e1a5de5b23ace1 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Mon, 1 Apr 2024 16:47:51 -0700 Subject: [PATCH] [ENH] Add memberlist for compaction service (#1949) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - ... - New functionality - This PR adds memberlist consumption for the compaction service and filter collections based on memberlist using Rendezvous assignment policy. ## Test plan *How are these changes tested?* - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust - [ ] Manual local integration testing with Tilt ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- rust/worker/chroma_config.yaml | 37 ++- .../src/assignment/assignment_policy.rs | 37 +-- rust/worker/src/assignment/mod.rs | 16 +- .../src/compactor/compaction_manager.rs | 77 ++++-- rust/worker/src/compactor/scheduler.rs | 78 +++++- rust/worker/src/config.rs | 228 +++++++++++++----- rust/worker/src/execution/dispatcher.rs | 13 +- rust/worker/src/lib.rs | 53 ++-- rust/worker/src/log/log.rs | 7 +- rust/worker/src/log/mod.rs | 11 +- rust/worker/src/memberlist/config.rs | 2 + .../src/memberlist/memberlist_provider.rs | 18 +- rust/worker/src/server.rs | 12 +- rust/worker/src/storage/s3.rs | 8 +- rust/worker/src/sysdb/mod.rs | 11 +- rust/worker/src/sysdb/sysdb.rs | 8 +- 16 files changed, 443 insertions(+), 173 deletions(-) diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 6b9b450a424..40e6e1799c3 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -1,21 +1,17 @@ -# Default configuration for Chroma worker +# Default configuration for query and compaction service # In the long term, every service should have an entry in this file # and this can become the global configuration file for Chroma # for now we nest it in the worker directory -worker: +query_service: my_ip: "10.244.0.9" my_port: 50051 - num_indexing_threads: 4 - pulsar_url: "pulsar://127.0.0.1:6650" - pulsar_tenant: "default" - pulsar_namespace: "default" - kube_namespace: "chroma" assignment_policy: RendezvousHashing: hasher: Murmur3 memberlist_provider: CustomResource: + kube_namespace: "chroma" memberlist_name: "query-service-memberlist" queue_size: 100 sysdb: @@ -33,6 +29,33 @@ worker: num_worker_threads: 4 dispatcher_queue_size: 100 worker_queue_size: 100 + +compaction_service: + my_ip: "10.244.0.9" + my_port: 50051 + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "compaction-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "sysdb.chroma" + port: 50051 + storage: + S3: + bucket: "chroma-storage" + log: + Grpc: + host: "logservice.chroma" + port: 50051 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 compactor: compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 diff --git a/rust/worker/src/assignment/assignment_policy.rs b/rust/worker/src/assignment/assignment_policy.rs index bde70b26625..71cdbedc0a5 100644 --- a/rust/worker/src/assignment/assignment_policy.rs +++ b/rust/worker/src/assignment/assignment_policy.rs @@ -1,7 +1,4 @@ -use crate::{ - config::{Configurable, WorkerConfig}, - errors::ChromaError, -}; +use crate::{config::Configurable, errors::ChromaError}; use super::{ config::{AssignmentPolicyConfig, HasherType}, @@ -20,14 +17,10 @@ Interfaces /// This trait mirrors the go and python versions of the assignment policy /// interface. /// # Methods -/// - assign: Assign a key to a topic. +/// - assign: Assign a key to a member. /// - get_members: Get the members that can be assigned to. /// - set_members: Set the members that can be assigned to. -/// # Notes -/// An assignment policy is not responsible for creating the topics it assigns to. -/// It is the responsibility of the caller to ensure that the topics exist. -/// An assignment policy must be Send. -pub(crate) trait AssignmentPolicy: Send { +pub(crate) trait AssignmentPolicy: Send + Sync { fn assign(&self, key: &str) -> Result; fn get_members(&self) -> Vec; fn set_members(&mut self, members: Vec); @@ -45,16 +38,7 @@ pub(crate) struct RendezvousHashingAssignmentPolicy { } impl RendezvousHashingAssignmentPolicy { - // Rust beginners note - // The reason we take String and not &str is because we need to put the strings into our - // struct, and we can't do that with references so rather than clone the strings, we just - // take ownership of them and put the responsibility on the caller to clone them if they - // need to. This is the general pattern we should follow in rust - put the burden of cloning - // on the caller, and if they don't need to clone, they can pass ownership. - pub(crate) fn new( - pulsar_tenant: String, - pulsar_namespace: String, - ) -> RendezvousHashingAssignmentPolicy { + pub(crate) fn new() -> RendezvousHashingAssignmentPolicy { return RendezvousHashingAssignmentPolicy { hasher: Murmur3Hasher {}, members: vec![], @@ -67,9 +51,11 @@ impl RendezvousHashingAssignmentPolicy { } #[async_trait] -impl Configurable for RendezvousHashingAssignmentPolicy { - async fn try_from_config(worker_config: &WorkerConfig) -> Result> { - let assignment_policy_config = match &worker_config.assignment_policy { +impl Configurable for RendezvousHashingAssignmentPolicy { + async fn try_from_config( + config: &AssignmentPolicyConfig, + ) -> Result> { + let assignment_policy_config = match config { AssignmentPolicyConfig::RendezvousHashing(config) => config, }; let hasher = match assignment_policy_config.hasher { @@ -84,9 +70,8 @@ impl Configurable for RendezvousHashingAssignmentPolicy { impl AssignmentPolicy for RendezvousHashingAssignmentPolicy { fn assign(&self, key: &str) -> Result { - let topics = self.get_members(); - let topic = assign(key, topics, &self.hasher); - return topic; + let members = self.get_members(); + assign(key, members, &self.hasher) } fn get_members(&self) -> Vec { diff --git a/rust/worker/src/assignment/mod.rs b/rust/worker/src/assignment/mod.rs index 7ed1525f0bc..70be4c966cd 100644 --- a/rust/worker/src/assignment/mod.rs +++ b/rust/worker/src/assignment/mod.rs @@ -1,3 +1,17 @@ pub(crate) mod assignment_policy; pub(crate) mod config; -mod rendezvous_hash; +pub(crate) mod rendezvous_hash; + +use crate::{config::Configurable, errors::ChromaError}; + +use self::{assignment_policy::AssignmentPolicy, config::AssignmentPolicyConfig}; + +pub(crate) async fn from_config( + config: &AssignmentPolicyConfig, +) -> Result, Box> { + match &config { + crate::assignment::config::AssignmentPolicyConfig::RendezvousHashing(_) => Ok(Box::new( + assignment_policy::RendezvousHashingAssignmentPolicy::try_from_config(config).await?, + )), + } +} diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 82369f4280e..7d5572da3d8 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -2,6 +2,7 @@ use super::scheduler::Scheduler; use super::scheduler_policy::LasCompactionTimeSchedulerPolicy; use crate::compactor::types::CompactionJob; use crate::compactor::types::ScheduleMessage; +use crate::config::CompactionServiceConfig; use crate::config::Configurable; use crate::errors::ChromaError; use crate::errors::ErrorCodes; @@ -9,6 +10,7 @@ use crate::execution::operator::TaskMessage; use crate::execution::orchestration::CompactOrchestrator; use crate::execution::orchestration::CompactionResponse; use crate::log::log::Log; +use crate::memberlist::Memberlist; use crate::system::Component; use crate::system::ComponentContext; use crate::system::Handler; @@ -33,7 +35,6 @@ pub(crate) struct CompactionManager { dispatcher: Option>>, // Config compaction_manager_queue_size: usize, - max_concurrent_jobs: usize, compaction_interval: Duration, } @@ -56,7 +57,6 @@ impl CompactionManager { scheduler: Scheduler, log: Box, compaction_manager_queue_size: usize, - max_concurrent_jobs: usize, compaction_interval: Duration, ) -> Self { CompactionManager { @@ -65,7 +65,6 @@ impl CompactionManager { log, dispatcher: None, compaction_manager_queue_size, - max_concurrent_jobs, compaction_interval, } } @@ -152,33 +151,51 @@ impl CompactionManager { } #[async_trait] -impl Configurable for CompactionManager { +impl Configurable for CompactionManager { async fn try_from_config( - config: &crate::config::WorkerConfig, + config: &crate::config::CompactionServiceConfig, ) -> Result> { - let sysdb = match crate::sysdb::from_config(&config).await { + let sysdb_config = &config.sysdb; + let sysdb = match crate::sysdb::from_config(sysdb_config).await { Ok(sysdb) => sysdb, Err(err) => { return Err(err); } }; - let log = match crate::log::from_config(&config).await { + let log_config = &config.log; + let log = match crate::log::from_config(log_config).await { Ok(log) => log, Err(err) => { return Err(err); } }; + let my_ip = config.my_ip.clone(); let policy = Box::new(LasCompactionTimeSchedulerPolicy {}); - let scheduler = Scheduler::new(log.clone(), sysdb.clone(), policy, 1000); let compaction_interval_sec = config.compactor.compaction_interval_sec; let max_concurrent_jobs = config.compactor.max_concurrent_jobs; let compaction_manager_queue_size = config.compactor.compaction_manager_queue_size; + + let assignment_policy_config = &config.assignment_policy; + let assignment_policy = match crate::assignment::from_config(assignment_policy_config).await + { + Ok(assignment_policy) => assignment_policy, + Err(err) => { + return Err(err); + } + }; + let scheduler = Scheduler::new( + my_ip, + log.clone(), + sysdb.clone(), + policy, + max_concurrent_jobs, + assignment_policy, + ); Ok(CompactionManager::new( scheduler, log, compaction_manager_queue_size, - max_concurrent_jobs, Duration::from_secs(compaction_interval_sec), )) } @@ -188,7 +205,7 @@ impl Configurable for CompactionManager { #[async_trait] impl Component for CompactionManager { fn queue_size(&self) -> usize { - 1000 // TODO: make configurable + self.compaction_manager_queue_size } async fn on_start(&mut self, ctx: &crate::system::ComponentContext) -> () { @@ -220,9 +237,17 @@ impl Handler for CompactionManager { } } +#[async_trait] +impl Handler for CompactionManager { + async fn handle(&mut self, message: Memberlist, ctx: &ComponentContext) { + self.scheduler.set_memberlist(message); + } +} + #[cfg(test)] mod tests { use super::*; + use crate::assignment::assignment_policy::RendezvousHashingAssignmentPolicy; use crate::execution::dispatcher::Dispatcher; use crate::log::log::InMemoryLog; use crate::log::log::InternalLogRecord; @@ -306,17 +331,31 @@ mod tests { sysdb.add_collection(collection_1); sysdb.add_collection(collection_2); + let my_ip = "127.0.0.1".to_string(); + let compaction_manager_queue_size = 1000; + let max_concurrent_jobs = 10; + let compaction_interval = Duration::from_secs(1); + + // Set assignment policy + let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new()); + assignment_policy.set_members(vec![my_ip.clone()]); + + let mut scheduler = Scheduler::new( + my_ip.clone(), + log.clone(), + sysdb.clone(), + Box::new(LasCompactionTimeSchedulerPolicy {}), + max_concurrent_jobs, + assignment_policy, + ); + // Set memberlist + scheduler.set_memberlist(vec![my_ip.clone()]); + let mut manager = CompactionManager::new( - Scheduler::new( - log.clone(), - sysdb.clone(), - Box::new(LasCompactionTimeSchedulerPolicy {}), - 1000, - ), + scheduler, log, - 1000, - 10, - Duration::from_secs(1), + compaction_manager_queue_size, + compaction_interval, ); let system = System::new(); diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 17de4d1303b..12dc1be433a 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -1,33 +1,42 @@ +use crate::assignment::assignment_policy::AssignmentPolicy; use crate::compactor::scheduler_policy::SchedulerPolicy; use crate::compactor::types::CompactionJob; use crate::log::log::CollectionInfo; use crate::log::log::CollectionRecord; use crate::log::log::Log; +use crate::memberlist::Memberlist; use crate::sysdb::sysdb::SysDb; use uuid::Uuid; -#[derive(Clone)] pub(crate) struct Scheduler { + my_ip: String, log: Box, sysdb: Box, policy: Box, job_queue: Vec, max_concurrent_jobs: usize, + memberlist: Option, + assignment_policy: Box, } impl Scheduler { pub(crate) fn new( + my_ip: String, log: Box, sysdb: Box, policy: Box, max_concurrent_jobs: usize, + assignment_policy: Box, ) -> Scheduler { Scheduler { + my_ip, log, sysdb, policy, job_queue: Vec::with_capacity(max_concurrent_jobs), max_concurrent_jobs, + memberlist: None, + assignment_policy, } } @@ -86,7 +95,30 @@ impl Scheduler { } } } - collection_records + self.filter_collections(collection_records) + } + + fn filter_collections(&mut self, collections: Vec) -> Vec { + let mut filtered_collections = Vec::new(); + let members = self.memberlist.as_ref().unwrap(); + self.assignment_policy.set_members(members.clone()); + for collection in collections { + let result = self.assignment_policy.assign(collection.id.as_str()); + match result { + Ok(member) => { + if member == self.my_ip { + filtered_collections.push(collection); + } + } + Err(e) => { + // TODO: Log error + println!("Error: {:?}", e); + continue; + } + } + } + + filtered_collections } pub(crate) async fn schedule_internal(&mut self, collection_records: Vec) { @@ -100,6 +132,11 @@ impl Scheduler { } pub(crate) async fn schedule(&mut self) { + if self.memberlist.is_none() { + // TODO: Log error + println!("Memberlist is not set"); + return; + } let collections = self.get_collections_with_new_data().await; if collections.is_empty() { return; @@ -111,11 +148,16 @@ impl Scheduler { pub(crate) fn get_jobs(&self) -> impl Iterator { self.job_queue.iter() } + + pub(crate) fn set_memberlist(&mut self, memberlist: Memberlist) { + self.memberlist = Some(memberlist); + } } #[cfg(test)] mod tests { use super::*; + use crate::assignment::assignment_policy::RendezvousHashingAssignmentPolicy; use crate::compactor::scheduler_policy::LasCompactionTimeSchedulerPolicy; use crate::log::log::InMemoryLog; use crate::log::log::InternalLogRecord; @@ -198,9 +240,30 @@ mod tests { }; sysdb.add_collection(collection_1); sysdb.add_collection(collection_2); + + let my_ip = "0.0.0.1".to_string(); let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {}); - let mut scheduler = Scheduler::new(log, sysdb, scheduler_policy, 1000); + let max_concurrent_jobs = 1000; + + // Set assignment policy + let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new()); + assignment_policy.set_members(vec![my_ip.clone()]); + + let mut scheduler = Scheduler::new( + my_ip.clone(), + log, + sysdb, + scheduler_policy, + max_concurrent_jobs, + assignment_policy, + ); + // Scheduler does nothing without memberlist + scheduler.schedule().await; + let jobs = scheduler.get_jobs(); + assert_eq!(jobs.count(), 0); + // Set memberlist + scheduler.set_memberlist(vec![my_ip.clone()]); scheduler.schedule().await; let jobs = scheduler.get_jobs(); @@ -211,5 +274,14 @@ mod tests { assert_eq!(job_ids.len(), 2); assert!(job_ids.contains(&collection_id_1)); assert!(job_ids.contains(&collection_id_2)); + + // Test filter_collections + let member_1 = "0.0.0.1".to_string(); + let member_2 = "0.0.0.2".to_string(); + let members = vec![member_1.clone(), member_2.clone()]; + scheduler.set_memberlist(members.clone()); + scheduler.schedule().await; + let jobs = scheduler.get_jobs(); + assert_eq!(jobs.count(), 1); } } diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 4158a39004c..e8f2a75f9df 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -19,7 +19,8 @@ const ENV_PREFIX: &str = "CHROMA_"; pub(crate) struct RootConfig { // The root config object wraps the worker config object so that // we can share the same config file between multiple services. - pub worker: WorkerConfig, + pub query_service: QueryServiceConfig, + pub compaction_service: CompactionServiceConfig, } impl RootConfig { @@ -85,24 +86,38 @@ impl RootConfig { /// # Description /// The primary config for the worker service. /// ## Description of parameters -/// - my_ip: The IP address of the worker service. Used for memberlist assignment. Must be provided -/// - num_indexing_threads: The number of indexing threads to use. If not provided, defaults to the number of cores on the machine. -/// - pulsar_tenant: The pulsar tenant to use. Must be provided. -/// - pulsar_namespace: The pulsar namespace to use. Must be provided. +/// - my_ip: The IP address of the worker service. Used for memberlist assignment. Must be provided. /// - assignment_policy: The assignment policy to use. Must be provided. /// # Notes /// In order to set the enviroment variables, you must prefix them with CHROMA_WORKER__. /// For example, to set my_ip, you would set CHROMA_WORKER__MY_IP. /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. -pub(crate) struct WorkerConfig { +pub(crate) struct QueryServiceConfig { + pub(crate) my_ip: String, + pub(crate) my_port: u16, + pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, + pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, + pub(crate) sysdb: crate::sysdb::config::SysDbConfig, + pub(crate) storage: crate::storage::config::StorageConfig, + pub(crate) log: crate::log::config::LogConfig, + pub(crate) dispatcher: crate::execution::config::DispatcherConfig, +} + +#[derive(Deserialize)] +/// # Description +/// The primary config for the compaction service. +/// ## Description of parameters +/// - my_ip: The IP address of the worker service. Used for memberlist assignment. Must be provided. +/// - assignment_policy: The assignment policy to use. Must be provided. +/// # Notes +/// In order to set the enviroment variables, you must prefix them with CHROMA_COMPACTOR__. +/// For example, to set my_ip, you would set CHROMA_COMPACTOR__MY_IP. +/// Each submodule that needs to be configured from the config object should implement the Configurable trait and +/// have its own field in this struct for its Config struct. +pub(crate) struct CompactionServiceConfig { pub(crate) my_ip: String, pub(crate) my_port: u16, - pub(crate) num_indexing_threads: u32, - pub(crate) pulsar_tenant: String, - pub(crate) pulsar_namespace: String, - pub(crate) pulsar_url: String, - pub(crate) kube_namespace: String, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, pub(crate) sysdb: crate::sysdb::config::SysDbConfig, @@ -118,8 +133,8 @@ pub(crate) struct WorkerConfig { /// This trait is used to configure structs from the config object. /// Components that need to be configured from the config object should implement this trait. #[async_trait] -pub(crate) trait Configurable { - async fn try_from_config(worker_config: &WorkerConfig) -> Result> +pub(crate) trait Configurable { + async fn try_from_config(worker_config: &T) -> Result> where Self: Sized; } @@ -135,20 +150,43 @@ mod tests { let _ = jail.create_file( "chroma_config.yaml", r#" - worker: + query_service: my_ip: "192.0.0.1" my_port: 50051 - num_indexing_threads: 4 - pulsar_tenant: "public" - pulsar_namespace: "default" - pulsar_url: "pulsar://localhost:6650" - kube_namespace: "chroma" assignment_policy: RendezvousHashing: hasher: Murmur3 memberlist_provider: CustomResource: - memberlist_name: "worker-memberlist" + kube_namespace: "chroma" + memberlist_name: "query-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "localhost" + port: 50051 + storage: + S3: + bucket: "chroma" + log: + Grpc: + host: "localhost" + port: 50051 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 + + compaction_service: + my_ip: "192.0.0.1" + my_port: 50051 + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "compaction-service-memberlist" queue_size: 100 sysdb: Grpc: @@ -172,11 +210,11 @@ mod tests { "#, ); let config = RootConfig::load(); - assert_eq!(config.worker.my_ip, "192.0.0.1"); - assert_eq!(config.worker.num_indexing_threads, 4); - assert_eq!(config.worker.pulsar_tenant, "public"); - assert_eq!(config.worker.pulsar_namespace, "default"); - assert_eq!(config.worker.kube_namespace, "chroma"); + assert_eq!(config.query_service.my_ip, "192.0.0.1"); + assert_eq!(config.query_service.my_port, 50051); + + assert_eq!(config.compaction_service.my_ip, "192.0.0.1"); + assert_eq!(config.compaction_service.my_port, 50051); Ok(()) }); } @@ -187,20 +225,43 @@ mod tests { let _ = jail.create_file( "random_path.yaml", r#" - worker: + query_service: my_ip: "192.0.0.1" my_port: 50051 - num_indexing_threads: 4 - pulsar_tenant: "public" - pulsar_namespace: "default" - pulsar_url: "pulsar://localhost:6650" - kube_namespace: "chroma" assignment_policy: RendezvousHashing: hasher: Murmur3 memberlist_provider: CustomResource: - memberlist_name: "worker-memberlist" + kube_namespace: "chroma" + memberlist_name: "query-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "localhost" + port: 50051 + storage: + S3: + bucket: "chroma" + log: + Grpc: + host: "localhost" + port: 50051 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 + + compaction_service: + my_ip: "192.0.0.1" + my_port: 50051 + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "compaction-service-memberlist" queue_size: 100 sysdb: Grpc: @@ -224,11 +285,11 @@ mod tests { "#, ); let config = RootConfig::load_from_path("random_path.yaml"); - assert_eq!(config.worker.my_ip, "192.0.0.1"); - assert_eq!(config.worker.num_indexing_threads, 4); - assert_eq!(config.worker.pulsar_tenant, "public"); - assert_eq!(config.worker.pulsar_namespace, "default"); - assert_eq!(config.worker.kube_namespace, "chroma"); + assert_eq!(config.query_service.my_ip, "192.0.0.1"); + assert_eq!(config.query_service.my_port, 50051); + + assert_eq!(config.compaction_service.my_ip, "192.0.0.1"); + assert_eq!(config.compaction_service.my_port, 50051); Ok(()) }); } @@ -240,8 +301,10 @@ mod tests { let _ = jail.create_file( "chroma_config.yaml", r#" - worker: - num_indexing_threads: 4 + query_service: + assignment_policy: + RendezvousHashing: + hasher: Murmur3 "#, ); let _ = RootConfig::load(); @@ -255,19 +318,43 @@ mod tests { let _ = jail.create_file( "chroma_config.yaml", r#" - worker: + query_service: + my_ip: "192.0.0.1" + my_port: 50051 + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "query-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "localhost" + port: 50051 + storage: + S3: + bucket: "chroma" + log: + Grpc: + host: "localhost" + port: 50051 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 + + compaction_service: my_ip: "192.0.0.1" my_port: 50051 - pulsar_tenant: "public" - pulsar_namespace: "default" - kube_namespace: "chroma" - pulsar_url: "pulsar://localhost:6650" assignment_policy: RendezvousHashing: hasher: Murmur3 memberlist_provider: CustomResource: - memberlist_name: "worker-memberlist" + kube_namespace: "chroma" + memberlist_name: "query-service-memberlist" queue_size: 100 sysdb: Grpc: @@ -291,8 +378,7 @@ mod tests { "#, ); let config = RootConfig::load(); - assert_eq!(config.worker.my_ip, "192.0.0.1"); - assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32); + assert_eq!(config.query_service.my_ip, "192.0.0.1"); Ok(()) }); } @@ -300,22 +386,46 @@ mod tests { #[test] fn test_config_with_env_override() { Jail::expect_with(|jail| { - let _ = jail.set_env("CHROMA_WORKER__MY_IP", "192.0.0.1"); - let _ = jail.set_env("CHROMA_WORKER__MY_PORT", 50051); - let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A"); - let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B"); - let _ = jail.set_env("CHROMA_WORKER__KUBE_NAMESPACE", "C"); - let _ = jail.set_env("CHROMA_WORKER__PULSAR_URL", "pulsar://localhost:6650"); + let _ = jail.set_env("CHROMA_QUERY_SERVICE__MY_IP", "192.0.0.1"); + let _ = jail.set_env("CHROMA_QUERY_SERVICE__MY_PORT", 50051); + let _ = jail.set_env("CHROMA_COMPACTION_SERVICE__MY_IP", "192.0.0.1"); + let _ = jail.set_env("CHROMA_COMPACTION_SERVICE__MY_PORT", 50051); let _ = jail.create_file( "chroma_config.yaml", r#" - worker: + query_service: + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "query-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "localhost" + port: 50051 + storage: + S3: + bucket: "chroma" + log: + Grpc: + host: "localhost" + port: 50051 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 + + compaction_service: assignment_policy: RendezvousHashing: hasher: Murmur3 memberlist_provider: CustomResource: - memberlist_name: "worker-memberlist" + kube_namespace: "chroma" + memberlist_name: "compaction-service-memberlist" queue_size: 100 sysdb: Grpc: @@ -339,12 +449,8 @@ mod tests { "#, ); let config = RootConfig::load(); - assert_eq!(config.worker.my_ip, "192.0.0.1"); - assert_eq!(config.worker.my_port, 50051); - assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32); - assert_eq!(config.worker.pulsar_tenant, "A"); - assert_eq!(config.worker.pulsar_namespace, "B"); - assert_eq!(config.worker.kube_namespace, "C"); + assert_eq!(config.query_service.my_ip, "192.0.0.1"); + assert_eq!(config.query_service.my_port, 50051); Ok(()) }); } diff --git a/rust/worker/src/execution/dispatcher.rs b/rust/worker/src/execution/dispatcher.rs index b1668b1c60a..461fa655ea9 100644 --- a/rust/worker/src/execution/dispatcher.rs +++ b/rust/worker/src/execution/dispatcher.rs @@ -1,6 +1,7 @@ use super::{operator::TaskMessage, worker_thread::WorkerThread}; +use crate::execution::config::DispatcherConfig; use crate::{ - config::{Configurable, WorkerConfig}, + config::{Configurable, QueryServiceConfig}, errors::ChromaError, system::{Component, ComponentContext, Handler, Receiver, System}, }; @@ -129,12 +130,12 @@ impl Dispatcher { } #[async_trait] -impl Configurable for Dispatcher { - async fn try_from_config(worker_config: &WorkerConfig) -> Result> { +impl Configurable for Dispatcher { + async fn try_from_config(config: &DispatcherConfig) -> Result> { Ok(Dispatcher::new( - worker_config.dispatcher.num_worker_threads, - worker_config.dispatcher.dispatcher_queue_size, - worker_config.dispatcher.worker_queue_size, + config.num_worker_threads, + config.dispatcher_queue_size, + config.worker_queue_size, )) } } diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index a9736a3a4a1..0940a095eb2 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -16,6 +16,7 @@ mod system; mod types; use config::Configurable; +use memberlist::MemberlistProvider; mod chroma_proto { tonic::include_proto!("chroma"); @@ -23,17 +24,18 @@ mod chroma_proto { pub async fn query_service_entrypoint() { let config = config::RootConfig::load(); + let config = config.query_service; let system: system::System = system::System::new(); - let dispatcher = match execution::dispatcher::Dispatcher::try_from_config(&config.worker).await - { - Ok(dispatcher) => dispatcher, - Err(err) => { - println!("Failed to create dispatcher component: {:?}", err); - return; - } - }; + let dispatcher = + match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await { + Ok(dispatcher) => dispatcher, + Err(err) => { + println!("Failed to create dispatcher component: {:?}", err); + return; + } + }; let mut dispatcher_handle = system.start_component(dispatcher); - let mut worker_server = match server::WorkerServer::try_from_config(&config.worker).await { + let mut worker_server = match server::WorkerServer::try_from_config(&config).await { Ok(worker_server) => worker_server, Err(err) => { println!("Failed to create worker server component: {:?}", err); @@ -52,18 +54,32 @@ pub async fn query_service_entrypoint() { pub async fn compaction_service_entrypoint() { let config = config::RootConfig::load(); + let config = config.compaction_service; let system: system::System = system::System::new(); - let dispatcher = match execution::dispatcher::Dispatcher::try_from_config(&config.worker).await + + let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config( + &config.memberlist_provider, + ) + .await { - Ok(dispatcher) => dispatcher, + Ok(memberlist) => memberlist, Err(err) => { - println!("Failed to create dispatcher component: {:?}", err); + println!("Failed to create memberlist component: {:?}", err); return; } }; + + let dispatcher = + match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await { + Ok(dispatcher) => dispatcher, + Err(err) => { + println!("Failed to create dispatcher component: {:?}", err); + return; + } + }; let mut dispatcher_handle = system.start_component(dispatcher); let mut compaction_manager = - match crate::compactor::CompactionManager::try_from_config(&config.worker).await { + match crate::compactor::CompactionManager::try_from_config(&config).await { Ok(compaction_manager) => compaction_manager, Err(err) => { println!("Failed to create compaction manager component: {:?}", err); @@ -72,6 +88,15 @@ pub async fn compaction_service_entrypoint() { }; compaction_manager.set_dispatcher(dispatcher_handle.receiver()); compaction_manager.set_system(system.clone()); + let mut compaction_manager_handle = system.start_component(compaction_manager); - tokio::join!(compaction_manager_handle.join(), dispatcher_handle.join()); + memberlist.subscribe(compaction_manager_handle.receiver()); + + let mut memberlist_handle = system.start_component(memberlist); + + tokio::join!( + memberlist_handle.join(), + compaction_manager_handle.join(), + dispatcher_handle.join() + ); } diff --git a/rust/worker/src/log/log.rs b/rust/worker/src/log/log.rs index 5e721f20aa4..0b69a81f532 100644 --- a/rust/worker/src/log/log.rs +++ b/rust/worker/src/log/log.rs @@ -1,7 +1,6 @@ use crate::chroma_proto; use crate::chroma_proto::log_service_client::LogServiceClient; use crate::config::Configurable; -use crate::config::WorkerConfig; use crate::errors::ChromaError; use crate::errors::ErrorCodes; use crate::log::config::LogConfig; @@ -94,9 +93,9 @@ impl ChromaError for GrpcLogError { } #[async_trait] -impl Configurable for GrpcLog { - async fn try_from_config(worker_config: &WorkerConfig) -> Result> { - match &worker_config.log { +impl Configurable for GrpcLog { + async fn try_from_config(config: &LogConfig) -> Result> { + match &config { LogConfig::Grpc(my_config) => { let host = &my_config.host; let port = &my_config.port; diff --git a/rust/worker/src/log/mod.rs b/rust/worker/src/log/mod.rs index cd769734c48..f6b76068c83 100644 --- a/rust/worker/src/log/mod.rs +++ b/rust/worker/src/log/mod.rs @@ -1,15 +1,14 @@ pub(crate) mod config; pub(crate) mod log; -use crate::{ - config::{Configurable, WorkerConfig}, - errors::ChromaError, -}; +use crate::{config::Configurable, errors::ChromaError}; + +use self::config::LogConfig; pub(crate) async fn from_config( - config: &WorkerConfig, + config: &LogConfig, ) -> Result, Box> { - match &config.log { + match &config { crate::log::config::LogConfig::Grpc(_) => { Ok(Box::new(log::GrpcLog::try_from_config(config).await?)) } diff --git a/rust/worker/src/memberlist/config.rs b/rust/worker/src/memberlist/config.rs index d6aaf2c8682..f94088941fa 100644 --- a/rust/worker/src/memberlist/config.rs +++ b/rust/worker/src/memberlist/config.rs @@ -18,10 +18,12 @@ pub(crate) enum MemberlistProviderConfig { /// The configuration for the custom resource memberlist provider. /// # Fields +/// - kube_namespace: The namespace to use for the custom resource. /// - memberlist_name: The name of the custom resource to use for the memberlist. /// - queue_size: The size of the queue to use for the channel. #[derive(Deserialize)] pub(crate) struct CustomResourceMemberlistProviderConfig { + pub(crate) kube_namespace: String, pub(crate) memberlist_name: String, pub(crate) queue_size: usize, } diff --git a/rust/worker/src/memberlist/memberlist_provider.rs b/rust/worker/src/memberlist/memberlist_provider.rs index e0c3f791d0a..faa09b66413 100644 --- a/rust/worker/src/memberlist/memberlist_provider.rs +++ b/rust/worker/src/memberlist/memberlist_provider.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, sync::RwLock}; use super::config::MemberlistProviderConfig; use crate::system::Receiver; use crate::{ - config::{Configurable, WorkerConfig}, + config::Configurable, errors::{ChromaError, ErrorCodes}, system::{Component, ComponentContext, Handler, StreamHandler}, }; @@ -23,7 +23,9 @@ use thiserror::Error; pub(crate) type Memberlist = Vec; #[async_trait] -pub(crate) trait MemberlistProvider: Component + Configurable { +pub(crate) trait MemberlistProvider: + Component + Configurable +{ fn subscribe(&mut self, receiver: Box + Send>) -> (); } @@ -84,9 +86,11 @@ impl ChromaError for CustomResourceMemberlistProviderConfigurationError { } #[async_trait] -impl Configurable for CustomResourceMemberlistProvider { - async fn try_from_config(worker_config: &WorkerConfig) -> Result> { - let my_config = match &worker_config.memberlist_provider { +impl Configurable for CustomResourceMemberlistProvider { + async fn try_from_config( + config: &MemberlistProviderConfig, + ) -> Result> { + let my_config = match &config { MemberlistProviderConfig::CustomResource(config) => config, }; let kube_client = match Client::try_default().await { @@ -99,12 +103,12 @@ impl Configurable for CustomResourceMemberlistProvider { }; let memberlist_cr_client = Api::::namespaced( kube_client.clone(), - &worker_config.kube_namespace, + &my_config.kube_namespace, ); let c: CustomResourceMemberlistProvider = CustomResourceMemberlistProvider { memberlist_name: my_config.memberlist_name.clone(), - kube_ns: worker_config.kube_namespace.clone(), + kube_ns: my_config.kube_namespace.clone(), kube_client, memberlist_cr_client, queue_size: my_config.queue_size, diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index d9da86b61d4..78f768370b6 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -2,7 +2,7 @@ use crate::chroma_proto; use crate::chroma_proto::{ GetVectorsRequest, GetVectorsResponse, QueryVectorsRequest, QueryVectorsResponse, }; -use crate::config::{Configurable, WorkerConfig}; +use crate::config::{Configurable, QueryServiceConfig}; use crate::errors::ChromaError; use crate::execution::operator::TaskMessage; use crate::execution::orchestration::HnswQueryOrchestrator; @@ -26,15 +26,17 @@ pub struct WorkerServer { } #[async_trait] -impl Configurable for WorkerServer { - async fn try_from_config(config: &WorkerConfig) -> Result> { - let sysdb = match crate::sysdb::from_config(&config).await { +impl Configurable for WorkerServer { + async fn try_from_config(config: &QueryServiceConfig) -> Result> { + let sysdb_config = &config.sysdb; + let sysdb = match crate::sysdb::from_config(sysdb_config).await { Ok(sysdb) => sysdb, Err(err) => { return Err(err); } }; - let log = match crate::log::from_config(&config).await { + let log_config = &config.log; + let log = match crate::log::from_config(log_config).await { Ok(log) => log, Err(err) => { return Err(err); diff --git a/rust/worker/src/storage/s3.rs b/rust/worker/src/storage/s3.rs index f78767e4896..7750c22735d 100644 --- a/rust/worker/src/storage/s3.rs +++ b/rust/worker/src/storage/s3.rs @@ -9,7 +9,7 @@ // streaming from s3. use super::{config::StorageConfig, Storage}; -use crate::config::{Configurable, WorkerConfig}; +use crate::config::{Configurable, QueryServiceConfig}; use crate::errors::ChromaError; use async_trait::async_trait; use aws_sdk_s3; @@ -73,9 +73,9 @@ impl S3Storage { } #[async_trait] -impl Configurable for S3Storage { - async fn try_from_config(config: &WorkerConfig) -> Result> { - match &config.storage { +impl Configurable for S3Storage { + async fn try_from_config(config: &StorageConfig) -> Result> { + match &config { StorageConfig::S3(s3_config) => { let config = aws_config::load_from_env().await; let client = aws_sdk_s3::Client::new(&config); diff --git a/rust/worker/src/sysdb/mod.rs b/rust/worker/src/sysdb/mod.rs index 1a26d2133f9..21e22265568 100644 --- a/rust/worker/src/sysdb/mod.rs +++ b/rust/worker/src/sysdb/mod.rs @@ -2,15 +2,14 @@ pub(crate) mod config; pub(crate) mod sysdb; pub(crate) mod test_sysdb; -use crate::{ - config::{Configurable, WorkerConfig}, - errors::ChromaError, -}; +use self::config::SysDbConfig; +use crate::config::Configurable; +use crate::errors::ChromaError; pub(crate) async fn from_config( - config: &WorkerConfig, + config: &SysDbConfig, ) -> Result, Box> { - match &config.sysdb { + match &config { crate::sysdb::config::SysDbConfig::Grpc(_) => { Ok(Box::new(sysdb::GrpcSysDb::try_from_config(config).await?)) } diff --git a/rust/worker/src/sysdb/sysdb.rs b/rust/worker/src/sysdb/sysdb.rs index fa4d6c387df..61a802a006e 100644 --- a/rust/worker/src/sysdb/sysdb.rs +++ b/rust/worker/src/sysdb/sysdb.rs @@ -1,6 +1,6 @@ use super::config::SysDbConfig; use crate::chroma_proto; -use crate::config::{Configurable, WorkerConfig}; +use crate::config::Configurable; use crate::types::{CollectionConversionError, SegmentConversionError}; use crate::{ chroma_proto::sys_db_client, @@ -78,9 +78,9 @@ impl ChromaError for GrpcSysDbError { } #[async_trait] -impl Configurable for GrpcSysDb { - async fn try_from_config(worker_config: &WorkerConfig) -> Result> { - match &worker_config.sysdb { +impl Configurable for GrpcSysDb { + async fn try_from_config(config: &SysDbConfig) -> Result> { + match &config { SysDbConfig::Grpc(my_config) => { let host = &my_config.host; let port = &my_config.port;