Skip to content

Commit

Permalink
Update routing tables via Chitchat (#4241)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Dec 6, 2023
1 parent 972e447 commit 3242dde
Show file tree
Hide file tree
Showing 18 changed files with 1,204 additions and 785 deletions.
3 changes: 1 addition & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ base64 = "0.21"
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "72a994f" }
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "a3e3f8b" }
chrono = { version = "0.4.23", default-features = false, features = [
"clock",
"std",
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use anyhow::Context;
use chitchat::transport::Transport;
use chitchat::{
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, ClusterStateSnapshot,
FailureDetectorConfig, ListenerHandle, NodeState,
FailureDetectorConfig, KeyChangeEvent, ListenerHandle, NodeState,
};
use futures::Stream;
use itertools::Itertools;
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Cluster {
pub async fn subscribe(
&self,
key_prefix: &str,
callback: impl Fn(&str, &str) + Send + Sync + 'static,
callback: impl Fn(KeyChangeEvent) + Send + Sync + 'static,
) -> ListenerHandle {
self.chitchat()
.await
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod node;
pub use chitchat::transport::ChannelTransport;
use chitchat::transport::UdpTransport;
use chitchat::FailureDetectorConfig;
pub use chitchat::ListenerHandle;
pub use chitchat::{KeyChangeEvent, ListenerHandle};
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::indexing::CpuCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ mod tests {
ingestion_rate: RateMibPerSec(1),
}]);
let local_shards_update = LocalShardsUpdate {
leader_id: "test-ingester".into(),
source_uid: source_uid.clone(),
shard_infos,
};
Expand Down Expand Up @@ -1227,6 +1228,7 @@ mod tests {
},
]);
let local_shards_update = LocalShardsUpdate {
leader_id: "test-ingester".into(),
source_uid: source_uid.clone(),
shard_infos,
};
Expand All @@ -1248,6 +1250,7 @@ mod tests {
},
]);
let local_shards_update = LocalShardsUpdate {
leader_id: "test-ingester".into(),
source_uid: source_uid.clone(),
shard_infos,
};
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ async-trait = { workspace = true }
backoff = { workspace = true, optional = true }
bytes = { workspace = true }
bytesize = { workspace = true }
chitchat = { workspace = true }
fail = { workspace = true }
flume = { workspace = true }
fnv = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,9 +857,8 @@ mod tests {
use std::path::Path;
use std::time::Duration;

use chitchat::transport::ChannelTransport;
use quickwit_actors::{Health, ObservationType, Supervisable, Universe, HEARTBEAT};
use quickwit_cluster::create_cluster_for_test;
use quickwit_cluster::{create_cluster_for_test, ChannelTransport};
use quickwit_common::rand::append_random_suffix;
use quickwit_config::{
IngestApiConfig, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams,
Expand Down
12 changes: 5 additions & 7 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ use std::fmt::Debug;

use anyhow::Context;
use async_trait::async_trait;
use chitchat::ListenerHandle;
use fnv::FnvHashMap;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, SpawnContext};
use quickwit_cluster::Cluster;
use quickwit_cluster::{Cluster, ListenerHandle};
use quickwit_common::pubsub::{Event, EventBroker};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::types::{IndexUid, Position, ShardId, SourceUid};
Expand Down Expand Up @@ -117,13 +116,13 @@ impl Actor for ShardPositionsService {
let mailbox = ctx.mailbox().clone();
self.cluster_listener_handle_opt = Some(
self.cluster
.subscribe(SHARD_POSITIONS_PREFIX, move |key, value| {
let shard_positions= match parse_shard_positions_from_kv(key, value) {
.subscribe(SHARD_POSITIONS_PREFIX, move |event| {
let shard_positions= match parse_shard_positions_from_kv(event.key, event.value) {
Ok(shard_positions) => {
shard_positions
}
Err(error) => {
error!(key=key, value=value, error=%error, "failed to parse shard positions from cluster kv");
error!(key=event.key, value=event.value, error=%error, "failed to parse shard positions from cluster kv");
return;
}
};
Expand Down Expand Up @@ -276,9 +275,8 @@ impl ShardPositionsService {
mod tests {
use std::time::Duration;

use chitchat::transport::ChannelTransport;
use quickwit_actors::Universe;
use quickwit_cluster::create_cluster_for_test;
use quickwit_cluster::{create_cluster_for_test, ChannelTransport};
use quickwit_common::pubsub::EventBroker;
use quickwit_proto::types::IndexUid;

Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use bytes::Bytes;
use chitchat::transport::ChannelTransport;
use quickwit_actors::{Mailbox, Universe};
use quickwit_cluster::create_cluster_for_test;
use quickwit_cluster::{create_cluster_for_test, ChannelTransport};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
Expand Down
18 changes: 10 additions & 8 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX;
use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator};
use quickwit_common::tower::Rate;
use quickwit_proto::ingest::ShardState;
use quickwit_proto::types::{split_queue_id, QueueId, ShardId, SourceUid};
use quickwit_proto::types::{split_queue_id, NodeId, QueueId, ShardId, SourceUid};
use serde::{Deserialize, Serialize, Serializer};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -276,8 +276,7 @@ fn parse_key(key: &str) -> Option<SourceUid> {

#[derive(Debug, Clone)]
pub struct LocalShardsUpdate {
// TODO: add leader ID in order to update routing table.
// leader_id: NodeId,
pub leader_id: NodeId,
pub source_uid: SourceUid,
pub shard_infos: ShardInfos,
}
Expand All @@ -289,16 +288,19 @@ pub async fn setup_local_shards_update_listener(
event_broker: EventBroker,
) -> ListenerHandle {
cluster
.subscribe(INGESTER_PRIMARY_SHARDS_PREFIX, move |key, value| {
let Some(source_uid) = parse_key(key) else {
warn!("failed to parse source UID `{key}`");
.subscribe(INGESTER_PRIMARY_SHARDS_PREFIX, move |event| {
let Some(source_uid) = parse_key(event.key) else {
warn!("failed to parse source UID `{}`", event.key);
return;
};
let Ok(shard_infos) = serde_json::from_str::<ShardInfos>(value) else {
warn!("failed to parse shard infos `{value}`");
let Ok(shard_infos) = serde_json::from_str::<ShardInfos>(event.value) else {
warn!("failed to parse shard infos `{}`", event.value);
return;
};
let leader_id: NodeId = event.node.node_id.clone().into();

let local_shards_update = LocalShardsUpdate {
leader_id,
source_uid,
shard_infos,
};
Expand Down
Loading

0 comments on commit 3242dde

Please sign in to comment.