Skip to content

Commit

Permalink
[ENH] Add memberlist for compaction service (chroma-core#1949)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - ...
 - New functionality
- This PR adds memberlist consumption for the compaction service and
filter collections based on memberlist using Rendezvous assignment
policy.

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust
- [ ] Manual local integration testing with Tilt

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara authored Apr 1, 2024
1 parent 978547c commit 1cd2ced
Show file tree
Hide file tree
Showing 16 changed files with 443 additions and 173 deletions.
37 changes: 30 additions & 7 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
# Default configuration for Chroma worker
# Default configuration for query and compaction service
# In the long term, every service should have an entry in this file
# and this can become the global configuration file for Chroma
# for now we nest it in the worker directory

worker:
query_service:
my_ip: "10.244.0.9"
my_port: 50051
num_indexing_threads: 4
pulsar_url: "pulsar://127.0.0.1:6650"
pulsar_tenant: "default"
pulsar_namespace: "default"
kube_namespace: "chroma"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
memberlist_provider:
CustomResource:
kube_namespace: "chroma"
memberlist_name: "query-service-memberlist"
queue_size: 100
sysdb:
Expand All @@ -33,6 +29,33 @@ worker:
num_worker_threads: 4
dispatcher_queue_size: 100
worker_queue_size: 100

compaction_service:
my_ip: "10.244.0.9"
my_port: 50051
assignment_policy:
RendezvousHashing:
hasher: Murmur3
memberlist_provider:
CustomResource:
kube_namespace: "chroma"
memberlist_name: "compaction-service-memberlist"
queue_size: 100
sysdb:
Grpc:
host: "sysdb.chroma"
port: 50051
storage:
S3:
bucket: "chroma-storage"
log:
Grpc:
host: "logservice.chroma"
port: 50051
dispatcher:
num_worker_threads: 4
dispatcher_queue_size: 100
worker_queue_size: 100
compactor:
compaction_manager_queue_size: 1000
max_concurrent_jobs: 100
Expand Down
37 changes: 11 additions & 26 deletions rust/worker/src/assignment/assignment_policy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
config::{Configurable, WorkerConfig},
errors::ChromaError,
};
use crate::{config::Configurable, errors::ChromaError};

use super::{
config::{AssignmentPolicyConfig, HasherType},
Expand All @@ -20,14 +17,10 @@ Interfaces
/// This trait mirrors the go and python versions of the assignment policy
/// interface.
/// # Methods
/// - assign: Assign a key to a topic.
/// - assign: Assign a key to a member.
/// - get_members: Get the members that can be assigned to.
/// - set_members: Set the members that can be assigned to.
/// # Notes
/// An assignment policy is not responsible for creating the topics it assigns to.
/// It is the responsibility of the caller to ensure that the topics exist.
/// An assignment policy must be Send.
pub(crate) trait AssignmentPolicy: Send {
pub(crate) trait AssignmentPolicy: Send + Sync {
fn assign(&self, key: &str) -> Result<String, AssignmentError>;
fn get_members(&self) -> Vec<String>;
fn set_members(&mut self, members: Vec<String>);
Expand All @@ -45,16 +38,7 @@ pub(crate) struct RendezvousHashingAssignmentPolicy {
}

impl RendezvousHashingAssignmentPolicy {
// Rust beginners note
// The reason we take String and not &str is because we need to put the strings into our
// struct, and we can't do that with references so rather than clone the strings, we just
// take ownership of them and put the responsibility on the caller to clone them if they
// need to. This is the general pattern we should follow in rust - put the burden of cloning
// on the caller, and if they don't need to clone, they can pass ownership.
pub(crate) fn new(
pulsar_tenant: String,
pulsar_namespace: String,
) -> RendezvousHashingAssignmentPolicy {
pub(crate) fn new() -> RendezvousHashingAssignmentPolicy {
return RendezvousHashingAssignmentPolicy {
hasher: Murmur3Hasher {},
members: vec![],
Expand All @@ -67,9 +51,11 @@ impl RendezvousHashingAssignmentPolicy {
}

#[async_trait]
impl Configurable for RendezvousHashingAssignmentPolicy {
async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> {
let assignment_policy_config = match &worker_config.assignment_policy {
impl Configurable<AssignmentPolicyConfig> for RendezvousHashingAssignmentPolicy {
async fn try_from_config(
config: &AssignmentPolicyConfig,
) -> Result<Self, Box<dyn ChromaError>> {
let assignment_policy_config = match config {
AssignmentPolicyConfig::RendezvousHashing(config) => config,
};
let hasher = match assignment_policy_config.hasher {
Expand All @@ -84,9 +70,8 @@ impl Configurable for RendezvousHashingAssignmentPolicy {

impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
fn assign(&self, key: &str) -> Result<String, AssignmentError> {
let topics = self.get_members();
let topic = assign(key, topics, &self.hasher);
return topic;
let members = self.get_members();
assign(key, members, &self.hasher)
}

fn get_members(&self) -> Vec<String> {
Expand Down
16 changes: 15 additions & 1 deletion rust/worker/src/assignment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
pub(crate) mod assignment_policy;
pub(crate) mod config;
mod rendezvous_hash;
pub(crate) mod rendezvous_hash;

use crate::{config::Configurable, errors::ChromaError};

use self::{assignment_policy::AssignmentPolicy, config::AssignmentPolicyConfig};

pub(crate) async fn from_config(
config: &AssignmentPolicyConfig,
) -> Result<Box<dyn AssignmentPolicy>, Box<dyn ChromaError>> {
match &config {
crate::assignment::config::AssignmentPolicyConfig::RendezvousHashing(_) => Ok(Box::new(
assignment_policy::RendezvousHashingAssignmentPolicy::try_from_config(config).await?,
)),
}
}
77 changes: 58 additions & 19 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use super::scheduler::Scheduler;
use super::scheduler_policy::LasCompactionTimeSchedulerPolicy;
use crate::compactor::types::CompactionJob;
use crate::compactor::types::ScheduleMessage;
use crate::config::CompactionServiceConfig;
use crate::config::Configurable;
use crate::errors::ChromaError;
use crate::errors::ErrorCodes;
use crate::execution::operator::TaskMessage;
use crate::execution::orchestration::CompactOrchestrator;
use crate::execution::orchestration::CompactionResponse;
use crate::log::log::Log;
use crate::memberlist::Memberlist;
use crate::system::Component;
use crate::system::ComponentContext;
use crate::system::Handler;
Expand All @@ -33,7 +35,6 @@ pub(crate) struct CompactionManager {
dispatcher: Option<Box<dyn Receiver<TaskMessage>>>,
// Config
compaction_manager_queue_size: usize,
max_concurrent_jobs: usize,
compaction_interval: Duration,
}

Expand All @@ -56,7 +57,6 @@ impl CompactionManager {
scheduler: Scheduler,
log: Box<dyn Log>,
compaction_manager_queue_size: usize,
max_concurrent_jobs: usize,
compaction_interval: Duration,
) -> Self {
CompactionManager {
Expand All @@ -65,7 +65,6 @@ impl CompactionManager {
log,
dispatcher: None,
compaction_manager_queue_size,
max_concurrent_jobs,
compaction_interval,
}
}
Expand Down Expand Up @@ -152,33 +151,51 @@ impl CompactionManager {
}

#[async_trait]
impl Configurable for CompactionManager {
impl Configurable<CompactionServiceConfig> for CompactionManager {
async fn try_from_config(
config: &crate::config::WorkerConfig,
config: &crate::config::CompactionServiceConfig,
) -> Result<Self, Box<dyn ChromaError>> {
let sysdb = match crate::sysdb::from_config(&config).await {
let sysdb_config = &config.sysdb;
let sysdb = match crate::sysdb::from_config(sysdb_config).await {
Ok(sysdb) => sysdb,
Err(err) => {
return Err(err);
}
};
let log = match crate::log::from_config(&config).await {
let log_config = &config.log;
let log = match crate::log::from_config(log_config).await {
Ok(log) => log,
Err(err) => {
return Err(err);
}
};

let my_ip = config.my_ip.clone();
let policy = Box::new(LasCompactionTimeSchedulerPolicy {});
let scheduler = Scheduler::new(log.clone(), sysdb.clone(), policy, 1000);
let compaction_interval_sec = config.compactor.compaction_interval_sec;
let max_concurrent_jobs = config.compactor.max_concurrent_jobs;
let compaction_manager_queue_size = config.compactor.compaction_manager_queue_size;

let assignment_policy_config = &config.assignment_policy;
let assignment_policy = match crate::assignment::from_config(assignment_policy_config).await
{
Ok(assignment_policy) => assignment_policy,
Err(err) => {
return Err(err);
}
};
let scheduler = Scheduler::new(
my_ip,
log.clone(),
sysdb.clone(),
policy,
max_concurrent_jobs,
assignment_policy,
);
Ok(CompactionManager::new(
scheduler,
log,
compaction_manager_queue_size,
max_concurrent_jobs,
Duration::from_secs(compaction_interval_sec),
))
}
Expand All @@ -188,7 +205,7 @@ impl Configurable for CompactionManager {
#[async_trait]
impl Component for CompactionManager {
fn queue_size(&self) -> usize {
1000 // TODO: make configurable
self.compaction_manager_queue_size
}

async fn on_start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
Expand Down Expand Up @@ -220,9 +237,17 @@ impl Handler<ScheduleMessage> for CompactionManager {
}
}

#[async_trait]
impl Handler<Memberlist> for CompactionManager {
async fn handle(&mut self, message: Memberlist, ctx: &ComponentContext<CompactionManager>) {
self.scheduler.set_memberlist(message);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::assignment::assignment_policy::RendezvousHashingAssignmentPolicy;
use crate::execution::dispatcher::Dispatcher;
use crate::log::log::InMemoryLog;
use crate::log::log::InternalLogRecord;
Expand Down Expand Up @@ -306,17 +331,31 @@ mod tests {
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

let my_ip = "127.0.0.1".to_string();
let compaction_manager_queue_size = 1000;
let max_concurrent_jobs = 10;
let compaction_interval = Duration::from_secs(1);

// Set assignment policy
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new());
assignment_policy.set_members(vec![my_ip.clone()]);

let mut scheduler = Scheduler::new(
my_ip.clone(),
log.clone(),
sysdb.clone(),
Box::new(LasCompactionTimeSchedulerPolicy {}),
max_concurrent_jobs,
assignment_policy,
);
// Set memberlist
scheduler.set_memberlist(vec![my_ip.clone()]);

let mut manager = CompactionManager::new(
Scheduler::new(
log.clone(),
sysdb.clone(),
Box::new(LasCompactionTimeSchedulerPolicy {}),
1000,
),
scheduler,
log,
1000,
10,
Duration::from_secs(1),
compaction_manager_queue_size,
compaction_interval,
);

let system = System::new();
Expand Down
Loading

0 comments on commit 1cd2ced

Please sign in to comment.