Skip to content

Commit

Permalink
Exposing published positions through chitchat.
Browse files Browse the repository at this point in the history
The info is transmitted via an EventSubscriber at the same time as regular truncate:

upon fetching the starting offset when assigning a shard to an IngestSource
upon suggest truncate.
This PR does not include any code to remove sources yet.

Refactoring:
Moving SourceUid to quickwit-proto
Making it more difficult to misuse pubsub
- Added #[must_use] for methods returning the handle
- Internalizing the `Clone` mecanic. A "normal" Clone could yield
unexpected result.
  • Loading branch information
fulmicoton committed Nov 15, 2023
1 parent d288ce4 commit 63f55fd
Show file tree
Hide file tree
Showing 18 changed files with 759 additions and 90 deletions.
10 changes: 10 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ impl Cluster {
.set(key, value);
}

pub async fn get_self_key_value(&self, key: &str) -> Option<String> {
self.chitchat()
.await
.lock()
.await
.self_node_state()
.get_versioned(key)
.map(|versioned_value| versioned_value.value.clone())
}

/// Waits until the predicate holds true for the set of ready members.
pub async fn wait_for_ready_members<F>(
&self,
Expand Down
112 changes: 100 additions & 12 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,54 @@

use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::Mutex as TokioMutex;

use crate::type_map::TypeMap;

pub trait Event: fmt::Debug + Clone + Send + Sync + 'static {}

#[async_trait]
pub trait EventSubscriber<E>: fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
pub trait EventSubscriber<E>: Send + Sync + 'static {
async fn handle_event(&mut self, event: E);
}

dyn_clone::clone_trait_object!(<E> EventSubscriber<E>);
struct ClosureSubscriber<E, F> {
callback: Arc<F>,
_phantom: PhantomData<E>,
}

impl<E, F> Clone for ClosureSubscriber<E, F> {
fn clone(&self) -> Self {
ClosureSubscriber {
callback: self.callback.clone(),
_phantom: self._phantom,
}
}
}

impl<E, F> fmt::Debug for ClosureSubscriber<E, F> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ClosureSubscriber")
.field("callback", &std::any::type_name::<F>())
.finish()
}
}

#[async_trait]
impl<E: Sync + Send + 'static, F: Fn(E) + Sync + Send + 'static> EventSubscriber<E>
for ClosureSubscriber<E, F>
{
async fn handle_event(&mut self, event: E) {
(self.callback)(event);
}
}

type EventSubscriptions<E> = HashMap<usize, EventSubscription<E>>;

Expand All @@ -59,6 +91,7 @@ struct InnerEventBroker {

impl EventBroker {
/// Subscribes to an event type.
#[must_use]
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
where E: Event {
let mut subscriptions = self
Expand All @@ -76,8 +109,7 @@ impl EventBroker {
.fetch_add(1, Ordering::Relaxed);

let subscription = EventSubscription {
subscription_id,
subscriber: Box::new(subscriber),
subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))),
};
let typed_subscriptions = subscriptions
.get_mut::<EventSubscriptions<E>>()
Expand All @@ -99,6 +131,21 @@ impl EventBroker {
}
}

/// Subscribe to an event with a function callback.
#[must_use]
pub fn subscribe_fn<E>(
&self,
callback_fn: impl Fn(E) + Sync + Send + 'static,
) -> EventSubscriptionHandle
where
E: Event,
{
self.subscribe(ClosureSubscriber {
callback: Arc::new(callback_fn),
_phantom: Default::default(),
})
}

/// Publishes an event.
pub fn publish<E>(&self, event: E)
where E: Event {
Expand All @@ -111,20 +158,18 @@ impl EventBroker {
if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
for subscription in typed_subscriptions.values() {
let event = event.clone();
let mut subscriber = subscription.subscriber.clone();
let subscriber_clone = subscription.subscriber.clone();
tokio::spawn(tokio::time::timeout(Duration::from_secs(600), async move {
subscriber.handle_event(event).await;
let mut subscriber_lock = subscriber_clone.lock().await;
subscriber_lock.handle_event(event).await;
}));
}
}
}
}

#[derive(Debug)]
struct EventSubscription<E> {
#[allow(dead_code)]
subscription_id: usize, // Used for the `Debug` implementation.
subscriber: Box<dyn EventSubscriber<E>>,
subscriber: Arc<TokioMutex<Box<dyn EventSubscriber<E>>>>,
}

pub struct EventSubscriptionHandle {
Expand All @@ -135,6 +180,12 @@ pub struct EventSubscriptionHandle {

impl EventSubscriptionHandle {
pub fn cancel(self) {}

/// By default, dropping an event cancels the subscription.
/// `forever` consumes the handle and avoid drop
pub fn forever(mut self) {
self.broker = Weak::new();
}
}

impl Drop for EventSubscriptionHandle {
Expand Down Expand Up @@ -184,15 +235,52 @@ mod tests {
let event = MyEvent { value: 42 };
event_broker.publish(event);

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 42);

subscription_handle.cancel();

let event = MyEvent { value: 1337 };
event_broker.publish(event);

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 42);
}

#[tokio::test]
async fn test_event_broker_handle_drop() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
drop(event_broker.subscribe_fn::<MyEvent>(move |event| {
tx.send(event.value).unwrap();
}));
event_broker.publish(MyEvent { value: 42 });
assert!(rx.recv().await.is_none());
}

#[tokio::test]
async fn test_event_broker_handle_cancel() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
event_broker
.subscribe_fn::<MyEvent>(move |event| {
tx.send(event.value).unwrap();
})
.cancel();
event_broker.publish(MyEvent { value: 42 });
assert!(rx.recv().await.is_none());
}

#[tokio::test]
async fn test_event_broker_handle_forever() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
event_broker
.subscribe_fn::<MyEvent>(move |event| {
tx.send(event.value).unwrap();
})
.forever();
event_broker.publish(MyEvent { value: 42 });
assert_eq!(rx.recv().await, Some(42));
}
}
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32
/// In order to amortized search with scroll, we fetch more documents than are
/// being requested.
pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to publish the shard positions.
pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:";
1 change: 0 additions & 1 deletion quickwit/quickwit-common/src/tower/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ mod tests {

impl Event for MyEvent {}

#[derive(Debug, Clone)]
struct MySubscriber {
counter: Arc<AtomicUsize>,
}
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ use quickwit_proto::metastore::{
self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError,
MetastoreService, MetastoreServiceClient, SourceType,
};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId, SourceUid};
use serde::Serialize;
use tracing::{error, info, warn};

use crate::SourceUid;

type NextShardId = ShardId;
#[derive(Debug, Eq, PartialEq)]
struct ShardTableEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,9 @@ mod tests {
use proptest::{prop_compose, proptest};
use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::types::IndexUid;
use quickwit_proto::types::{IndexUid, SourceUid};

use super::*;
use crate::SourceUid;

#[test]
fn test_indexing_plans_diff() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::num::NonZeroU32;

use fnv::FnvHashMap;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::{IndexUid, ShardId};
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};
use scheduling_logic_model::{IndexerOrd, SourceOrd};
use tracing::error;
use tracing::log::warn;
Expand All @@ -34,7 +34,6 @@ use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
SchedulingProblem, SchedulingSolution,
};
use crate::SourceUid;

/// If we have several pipelines below this threshold we
/// reduce the number of pipelines.
Expand Down Expand Up @@ -530,13 +529,12 @@ mod tests {

use fnv::FnvHashMap;
use quickwit_proto::indexing::{mcpu, IndexingTask};
use quickwit_proto::types::{IndexUid, ShardId};
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};

use super::{
build_physical_indexing_plan, group_shards_into_pipelines, indexing_task,
spread_shards_optimally, SourceToSchedule, SourceToScheduleType,
};
use crate::SourceUid;

#[test]
fn test_spread_shard_optimally() {
Expand Down
9 changes: 0 additions & 9 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ pub(crate) mod metrics;

use quickwit_common::tower::Pool;
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask};
use quickwit_proto::types::{IndexUid, SourceId};

/// It can however appear only once in a given index.
/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is.
#[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)]
pub struct SourceUid {
pub index_uid: IndexUid,
pub source_id: SourceId,
}

/// Indexer-node specific information stored in the pool of available indexer nodes
#[derive(Debug, Clone)]
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ impl IndexingPipeline {
ingester_pool: self.params.ingester_pool.clone(),
queues_dir_path: self.params.queues_dir_path.clone(),
storage_resolver: self.params.source_storage_resolver.clone(),
event_broker: self.params.event_broker.clone(),
}),
source_checkpoint,
))
Expand Down
19 changes: 13 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use quickwit_actors::{
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle};
use quickwit_common::temp_dir;
use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
Expand All @@ -56,7 +56,10 @@ use tracing::{debug, error, info, warn};

use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::MergePlanner;
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::models::{
DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, PublishedShardPositions,
SpawnPipeline,
};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};
Expand Down Expand Up @@ -117,6 +120,7 @@ pub struct IndexingService {
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
event_broker: EventBroker,
_event_subscription_handle: EventSubscriptionHandle,
}

impl Debug for IndexingService {
Expand Down Expand Up @@ -144,6 +148,8 @@ impl IndexingService {
storage_resolver: StorageResolver,
event_broker: EventBroker,
) -> anyhow::Result<IndexingService> {
let published_shard_positions = PublishedShardPositions::new(cluster.clone());
let event_subscription_handle = event_broker.subscribe(published_shard_positions);
let split_store_space_quota = SplitStoreQuota::new(
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
Expand Down Expand Up @@ -175,6 +181,7 @@ impl IndexingService {
merge_pipeline_handles: HashMap::new(),
cooperative_indexing_permits,
event_broker,
_event_subscription_handle: event_subscription_handle,
})
}

Expand Down Expand Up @@ -418,10 +425,10 @@ impl IndexingService {
let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self
.indexing_pipelines
.iter()
.flat_map(|(pipeline, (_, pipeline_handle))| {
let indexer_metrics = pipeline_handle.last_observation();
let pipeline_metrics = indexer_metrics.pipeline_metrics_opt?;
Some((pipeline, pipeline_metrics))
.filter_map(|(pipeline_id, (_, pipeline_handle))| {
let indexing_statistics = pipeline_handle.last_observation();
let pipeline_metrics = indexing_statistics.pipeline_metrics_opt?;
Some((pipeline_id, pipeline_metrics))
})
.collect();
self.cluster
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,6 @@ mod tests {
Ok(())
}

#[derive(Clone)]
struct ReportSplitListener {
report_splits_tx: flume::Sender<ReportSplitsRequest>,
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod processed_doc;
mod publish_lock;
mod publisher_message;
mod raw_doc_batch;
mod shard_positions;
mod split_attrs;

pub use indexed_split::{
Expand All @@ -49,6 +50,7 @@ pub use publish_lock::{NewPublishLock, PublishLock};
pub use publisher_message::SplitsUpdate;
use quickwit_proto::types::PublishToken;
pub use raw_doc_batch::RawDocBatch;
pub use shard_positions::{PublishedShardPositions, PublishedShardPositionsUpdate};
pub use split_attrs::{create_split_metadata, SplitAttrs};

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 63f55fd

Please sign in to comment.