diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 0c9f95533cf..2c10fc48b0b 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -238,6 +238,16 @@ impl Cluster { .set(key, value); } + pub async fn get_self_key_value(&self, key: &str) -> Option { + self.chitchat() + .await + .lock() + .await + .self_node_state() + .get_versioned(key) + .map(|versioned_value| versioned_value.value.clone()) + } + /// Waits until the predicate holds true for the set of ready members. pub async fn wait_for_ready_members( &self, diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 04f82926828..81067e3cd97 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -19,22 +19,54 @@ use std::collections::HashMap; use std::fmt; +use std::fmt::Formatter; +use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use async_trait::async_trait; +use tokio::sync::Mutex as TokioMutex; use crate::type_map::TypeMap; pub trait Event: fmt::Debug + Clone + Send + Sync + 'static {} #[async_trait] -pub trait EventSubscriber: fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static { +pub trait EventSubscriber: Send + Sync + 'static { async fn handle_event(&mut self, event: E); } -dyn_clone::clone_trait_object!( EventSubscriber); +struct ClosureSubscriber { + callback: Arc, + _phantom: PhantomData, +} + +impl Clone for ClosureSubscriber { + fn clone(&self) -> Self { + ClosureSubscriber { + callback: self.callback.clone(), + _phantom: self._phantom, + } + } +} + +impl fmt::Debug for ClosureSubscriber { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("ClosureSubscriber") + .field("callback", &std::any::type_name::()) + .finish() + } +} + +#[async_trait] +impl EventSubscriber + for ClosureSubscriber +{ + async fn handle_event(&mut self, event: E) { + (self.callback)(event); + } +} type EventSubscriptions = HashMap>; @@ -59,6 +91,7 @@ struct InnerEventBroker { impl EventBroker { /// Subscribes to an event type. + #[must_use] pub fn subscribe(&self, subscriber: impl EventSubscriber) -> EventSubscriptionHandle where E: Event { let mut subscriptions = self @@ -76,8 +109,7 @@ impl EventBroker { .fetch_add(1, Ordering::Relaxed); let subscription = EventSubscription { - subscription_id, - subscriber: Box::new(subscriber), + subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))), }; let typed_subscriptions = subscriptions .get_mut::>() @@ -99,6 +131,21 @@ impl EventBroker { } } + /// Subscribe to an event with a function callback. + #[must_use] + pub fn subscribe_fn( + &self, + callback_fn: impl Fn(E) + Sync + Send + 'static, + ) -> EventSubscriptionHandle + where + E: Event, + { + self.subscribe(ClosureSubscriber { + callback: Arc::new(callback_fn), + _phantom: Default::default(), + }) + } + /// Publishes an event. pub fn publish(&self, event: E) where E: Event { @@ -111,20 +158,18 @@ impl EventBroker { if let Some(typed_subscriptions) = subscriptions.get::>() { for subscription in typed_subscriptions.values() { let event = event.clone(); - let mut subscriber = subscription.subscriber.clone(); + let subscriber_clone = subscription.subscriber.clone(); tokio::spawn(tokio::time::timeout(Duration::from_secs(600), async move { - subscriber.handle_event(event).await; + let mut subscriber_lock = subscriber_clone.lock().await; + subscriber_lock.handle_event(event).await; })); } } } } -#[derive(Debug)] struct EventSubscription { - #[allow(dead_code)] - subscription_id: usize, // Used for the `Debug` implementation. - subscriber: Box>, + subscriber: Arc>>>, } pub struct EventSubscriptionHandle { @@ -135,6 +180,12 @@ pub struct EventSubscriptionHandle { impl EventSubscriptionHandle { pub fn cancel(self) {} + + /// By default, dropping an event cancels the subscription. + /// `forever` consumes the handle and avoid drop + pub fn forever(mut self) { + self.broker = Weak::new(); + } } impl Drop for EventSubscriptionHandle { @@ -184,7 +235,7 @@ mod tests { let event = MyEvent { value: 42 }; event_broker.publish(event); - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + tokio::time::sleep(Duration::from_millis(1)).await; assert_eq!(counter.load(Ordering::Relaxed), 42); subscription_handle.cancel(); @@ -192,7 +243,44 @@ mod tests { let event = MyEvent { value: 1337 }; event_broker.publish(event); - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + tokio::time::sleep(Duration::from_millis(1)).await; assert_eq!(counter.load(Ordering::Relaxed), 42); } + + #[tokio::test] + async fn test_event_broker_handle_drop() { + let event_broker = EventBroker::default(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + drop(event_broker.subscribe_fn::(move |event| { + tx.send(event.value).unwrap(); + })); + event_broker.publish(MyEvent { value: 42 }); + assert!(rx.recv().await.is_none()); + } + + #[tokio::test] + async fn test_event_broker_handle_cancel() { + let event_broker = EventBroker::default(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + event_broker + .subscribe_fn::(move |event| { + tx.send(event.value).unwrap(); + }) + .cancel(); + event_broker.publish(MyEvent { value: 42 }); + assert!(rx.recv().await.is_none()); + } + + #[tokio::test] + async fn test_event_broker_handle_forever() { + let event_broker = EventBroker::default(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + event_broker + .subscribe_fn::(move |event| { + tx.send(event.value).unwrap(); + }) + .forever(); + event_broker.publish(MyEvent { value: 42 }); + assert_eq!(rx.recv().await, Some(42)); + } } diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index a34f713c523..5ab1ecc016f 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -35,3 +35,6 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32 /// In order to amortized search with scroll, we fetch more documents than are /// being requested. pub const SCROLL_BATCH_LEN: usize = 1_000; + +/// Prefix used in chitchat to publish the shard positions. +pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:"; diff --git a/quickwit/quickwit-common/src/tower/event_listener.rs b/quickwit/quickwit-common/src/tower/event_listener.rs index 1865d8c55ad..42c2dde224d 100644 --- a/quickwit/quickwit-common/src/tower/event_listener.rs +++ b/quickwit/quickwit-common/src/tower/event_listener.rs @@ -129,7 +129,6 @@ mod tests { impl Event for MyEvent {} - #[derive(Debug, Clone)] struct MySubscriber { counter: Arc, } diff --git a/quickwit/quickwit-control-plane/src/control_plane_model.rs b/quickwit/quickwit-control-plane/src/control_plane_model.rs index 5cf4a581ec7..d96fdcb76ae 100644 --- a/quickwit/quickwit-control-plane/src/control_plane_model.rs +++ b/quickwit/quickwit-control-plane/src/control_plane_model.rs @@ -33,12 +33,10 @@ use quickwit_proto::metastore::{ self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, MetastoreService, MetastoreServiceClient, SourceType, }; -use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId}; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId, SourceUid}; use serde::Serialize; use tracing::{error, info, warn}; -use crate::SourceUid; - type NextShardId = ShardId; #[derive(Debug, Eq, PartialEq)] struct ShardTableEntry { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 4176535111e..8f4020d81cf 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -470,11 +470,9 @@ mod tests { use proptest::{prop_compose, proptest}; use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams}; use quickwit_metastore::IndexMetadata; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, SourceUid}; use super::*; - use crate::SourceUid; - #[test] fn test_indexing_plans_diff() { { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 22cd4a6ebd6..82c6a557249 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -25,7 +25,7 @@ use std::num::NonZeroU32; use fnv::FnvHashMap; use quickwit_proto::indexing::{CpuCapacity, IndexingTask}; -use quickwit_proto::types::{IndexUid, ShardId}; +use quickwit_proto::types::{IndexUid, ShardId, SourceUid}; use scheduling_logic_model::{IndexerOrd, SourceOrd}; use tracing::error; use tracing::log::warn; @@ -34,7 +34,6 @@ use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::scheduling_logic_model::{ SchedulingProblem, SchedulingSolution, }; -use crate::SourceUid; /// If we have several pipelines below this threshold we /// reduce the number of pipelines. @@ -530,13 +529,12 @@ mod tests { use fnv::FnvHashMap; use quickwit_proto::indexing::{mcpu, IndexingTask}; - use quickwit_proto::types::{IndexUid, ShardId}; + use quickwit_proto::types::{IndexUid, ShardId, SourceUid}; use super::{ build_physical_indexing_plan, group_shards_into_pipelines, indexing_task, spread_shards_optimally, SourceToSchedule, SourceToScheduleType, }; - use crate::SourceUid; #[test] fn test_spread_shard_optimally() { diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index d800544f77b..cdb6487c352 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -26,15 +26,6 @@ pub(crate) mod metrics; use quickwit_common::tower::Pool; use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask}; -use quickwit_proto::types::{IndexUid, SourceId}; - -/// It can however appear only once in a given index. -/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is. -#[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)] -pub struct SourceUid { - pub index_uid: IndexUid, - pub source_id: SourceId, -} /// Indexer-node specific information stored in the pool of available indexer nodes #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index ad7bb4eedca..2ee55edc844 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -439,6 +439,7 @@ impl IndexingPipeline { ingester_pool: self.params.ingester_pool.clone(), queues_dir_path: self.params.queues_dir_path.clone(), storage_resolver: self.params.source_storage_resolver.clone(), + event_broker: self.params.event_broker.clone(), }), source_checkpoint, )) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index a0da3d8868f..1204339845e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -32,7 +32,7 @@ use quickwit_actors::{ }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; -use quickwit_common::pubsub::EventBroker; +use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; use quickwit_common::temp_dir; use quickwit_config::{ build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID, @@ -56,7 +56,10 @@ use tracing::{debug, error, info, warn}; use super::merge_pipeline::{MergePipeline, MergePipelineParams}; use super::MergePlanner; -use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline}; +use crate::models::{ + DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, PublishedShardPositions, + SpawnPipeline, +}; use crate::source::{AssignShards, Assignment}; use crate::split_store::{LocalSplitStore, SplitStoreQuota}; use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics}; @@ -117,6 +120,7 @@ pub struct IndexingService { merge_pipeline_handles: HashMap, cooperative_indexing_permits: Option>, event_broker: EventBroker, + _event_subscription_handle: EventSubscriptionHandle, } impl Debug for IndexingService { @@ -144,6 +148,8 @@ impl IndexingService { storage_resolver: StorageResolver, event_broker: EventBroker, ) -> anyhow::Result { + let published_shard_positions = PublishedShardPositions::new(cluster.clone()); + let event_subscription_handle = event_broker.subscribe(published_shard_positions); let split_store_space_quota = SplitStoreQuota::new( indexer_config.split_store_max_num_splits, indexer_config.split_store_max_num_bytes, @@ -175,6 +181,7 @@ impl IndexingService { merge_pipeline_handles: HashMap::new(), cooperative_indexing_permits, event_broker, + _event_subscription_handle: event_subscription_handle, }) } @@ -418,10 +425,10 @@ impl IndexingService { let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self .indexing_pipelines .iter() - .flat_map(|(pipeline, (_, pipeline_handle))| { - let indexer_metrics = pipeline_handle.last_observation(); - let pipeline_metrics = indexer_metrics.pipeline_metrics_opt?; - Some((pipeline, pipeline_metrics)) + .filter_map(|(pipeline_id, (_, pipeline_handle))| { + let indexing_statistics = pipeline_handle.last_observation(); + let pipeline_metrics = indexing_statistics.pipeline_metrics_opt?; + Some((pipeline_id, pipeline_metrics)) }) .collect(); self.cluster diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index cc4588757dd..bb875be91f4 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -912,7 +912,6 @@ mod tests { Ok(()) } - #[derive(Clone)] struct ReportSplitListener { report_splits_tx: flume::Sender, } diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index 230542ae60e..2fd95bd6ba6 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -30,6 +30,7 @@ mod processed_doc; mod publish_lock; mod publisher_message; mod raw_doc_batch; +mod shard_positions; mod split_attrs; pub use indexed_split::{ @@ -49,6 +50,7 @@ pub use publish_lock::{NewPublishLock, PublishLock}; pub use publisher_message::SplitsUpdate; use quickwit_proto::types::PublishToken; pub use raw_doc_batch::RawDocBatch; +pub use shard_positions::{PublishedShardPositions, PublishedShardPositionsUpdate}; pub use split_attrs::{create_split_metadata, SplitAttrs}; #[derive(Debug)] diff --git a/quickwit/quickwit-indexing/src/models/shard_positions.rs b/quickwit/quickwit-indexing/src/models/shard_positions.rs new file mode 100644 index 00000000000..f216bd2ed30 --- /dev/null +++ b/quickwit/quickwit-indexing/src/models/shard_positions.rs @@ -0,0 +1,192 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::fmt::Debug; + +use async_trait::async_trait; +use fnv::FnvHashMap; +use quickwit_cluster::Cluster; +use quickwit_common::pubsub::{Event, EventSubscriber}; +use quickwit_common::shared_consts::SHARD_POSITIONS_PREFIX; +use quickwit_proto::types::{Position, ShardId, SourceUid}; +use tracing::warn; + +#[derive(Debug, Clone)] +pub struct PublishedShardPositionsUpdate { + pub source_uid: SourceUid, + pub published_positions_per_shard: Vec<(ShardId, Position)>, +} + +impl Event for PublishedShardPositionsUpdate {} + +/// The published shard positions is a model unique to the indexer service instance that +/// keeps track of the latest (known) published position for the shards of all managed sources. +/// +/// It receives updates through the event broker, and only keeps the maximum published position +/// for each shard. +pub struct PublishedShardPositions { + shard_positions_per_source: FnvHashMap>, + cluster_client: Cluster, +} + +impl PublishedShardPositions { + pub fn new(cluster: Cluster) -> PublishedShardPositions { + PublishedShardPositions { + shard_positions_per_source: Default::default(), + cluster_client: cluster, + } + } +} +#[async_trait] +impl EventSubscriber for PublishedShardPositions { + async fn handle_event(&mut self, update: PublishedShardPositionsUpdate) { + let source_uid = update.source_uid.clone(); + let was_updated = self.apply_update(update); + if was_updated { + self.published_updated_positions_for_source(source_uid) + .await; + } + } +} + +impl PublishedShardPositions { + async fn published_updated_positions_for_source(&self, source_uid: SourceUid) { + let Some(shard_positions) = self.shard_positions_per_source.get(&source_uid) else { + return; + }; + let SourceUid { + index_uid, + source_id, + } = &source_uid; + let key = format!("{SHARD_POSITIONS_PREFIX}{index_uid}:{source_id}"); + let shard_positions_json = serde_json::to_string(&shard_positions).unwrap(); + self.cluster_client + .set_self_key_value(key, shard_positions_json) + .await; + } + + /// Updates the internal model holding the last position per shard, and + /// returns true if at least one of the publish position was updated. + fn apply_update(&mut self, update: PublishedShardPositionsUpdate) -> bool { + if update.published_positions_per_shard.is_empty() { + warn!("Received an empty publish shard positions update."); + return false; + } + let mut was_modified = false; + let PublishedShardPositionsUpdate { + source_uid, + published_positions_per_shard, + } = update; + if published_positions_per_shard.is_empty() { + return false; + } + let current_shard_positions = self + .shard_positions_per_source + .entry(source_uid) + .or_default(); + for (shard, new_position) in published_positions_per_shard { + match current_shard_positions.entry(shard) { + Entry::Occupied(mut occupied) => { + if *occupied.get() < new_position { + occupied.insert(new_position); + was_modified = true; + } + } + Entry::Vacant(vacant) => { + was_modified = true; + vacant.insert(new_position.clone()); + } + } + } + was_modified + } +} + +#[cfg(test)] +mod tests { + use chitchat::transport::ChannelTransport; + use quickwit_cluster::create_cluster_for_test; + use quickwit_common::pubsub::EventBroker; + use quickwit_common::shared_consts::SHARD_POSITIONS_PREFIX; + use quickwit_proto::types::IndexUid; + + use super::*; + + #[tokio::test] + async fn test_shard_positions() { + let transport = ChannelTransport::default(); + let cluster: Cluster = create_cluster_for_test(Vec::new(), &[], &transport, true) + .await + .unwrap(); + let shard_positions = PublishedShardPositions::new(cluster.clone()); + let event_broker = EventBroker::default(); + event_broker.subscribe(shard_positions).forever(); + let index_uid = IndexUid::new_with_random_ulid("index-test"); + let source_id = "test-source".to_string(); + let key = format!("{SHARD_POSITIONS_PREFIX}{index_uid}:{source_id}"); + let source_uid = SourceUid { + index_uid, + source_id, + }; + event_broker.publish(PublishedShardPositionsUpdate { + source_uid: source_uid.clone(), + published_positions_per_shard: vec![], + }); + { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert!(cluster.get_self_key_value(&key).await.is_none()); + } + { + event_broker.publish(PublishedShardPositionsUpdate { + source_uid: source_uid.clone(), + published_positions_per_shard: vec![(1, Position::Beginning)], + }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let value = cluster.get_self_key_value(&key).await.unwrap(); + assert_eq!(&value, r#"{"1":""}"#); + } + { + event_broker.publish(PublishedShardPositionsUpdate { + source_uid: source_uid.clone(), + published_positions_per_shard: vec![(1, 1_000u64.into()), (2, 2000u64.into())], + }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let value = cluster.get_self_key_value(&key).await.unwrap(); + assert_eq!( + &value, + r#"{"1":"00000000000000001000","2":"00000000000000002000"}"# + ); + } + { + event_broker.publish(PublishedShardPositionsUpdate { + source_uid: source_uid.clone(), + published_positions_per_shard: vec![(1, 999u64.into()), (3, 3000u64.into())], + }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let value = cluster.get_self_key_value(&key).await.unwrap(); + // We do not update the position that got lower, nor the position that disappeared + assert_eq!( + &value, + r#"{"1":"00000000000000001000","2":"00000000000000002000","3":"00000000000000003000"}"# + ); + } + } +} diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 57ab054bf5b..d70b62a6c37 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -17,15 +17,16 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::time::Duration; use anyhow::{bail, Context}; use async_trait::async_trait; +use fnv::FnvHashMap; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::retry::RetryParams; use quickwit_ingest::{ decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream, @@ -38,7 +39,9 @@ use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsSubrequest, AcquireShardsSubresponse, MetastoreService, MetastoreServiceClient, }; -use quickwit_proto::types::{IndexUid, NodeId, Position, PublishToken, ShardId, SourceId}; +use quickwit_proto::types::{ + IndexUid, NodeId, Position, PublishToken, ShardId, SourceId, SourceUid, +}; use serde_json::json; use tokio::time; use tracing::{debug, error, info, warn}; @@ -49,7 +52,7 @@ use super::{ BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, }; use crate::actors::DocProcessor; -use crate::models::{NewPublishLock, NewPublishToken, PublishLock}; +use crate::models::{NewPublishLock, NewPublishToken, PublishLock, PublishedShardPositionsUpdate}; pub struct IngestSourceFactory; @@ -73,8 +76,7 @@ impl TypedSourceFactory for IngestSourceFactory { #[derive(Debug, Clone)] struct ClientId { node_id: NodeId, - index_uid: IndexUid, - source_id: SourceId, + source_uid: SourceUid, pipeline_ord: usize, } @@ -83,17 +85,16 @@ impl fmt::Display for ClientId { write!( formatter, "indexer/{}/{}/{}/{}", - self.node_id, self.index_uid, self.source_id, self.pipeline_ord + self.node_id, self.source_uid.index_uid, self.source_uid.source_id, self.pipeline_ord ) } } impl ClientId { - fn new(node_id: NodeId, index_uid: IndexUid, source_id: SourceId, pipeline_ord: usize) -> Self { + fn new(node_id: NodeId, source_uid: SourceUid, pipeline_ord: usize) -> Self { Self { node_id, - index_uid, - source_id, + source_uid, pipeline_ord, } } @@ -113,6 +114,10 @@ impl ClientId { enum IndexingStatus { #[default] Active, + // We have emitted all documents of the pipeline until EOF. + // Disclaimer: a complete status does not mean that all documents have been indexed. + // Some document might be still travelling in the pipeline, and may not have been published + // yet. Complete, Error, } @@ -131,10 +136,11 @@ pub struct IngestSource { client_id: ClientId, metastore: MetastoreServiceClient, ingester_pool: IngesterPool, - assigned_shards: HashMap, + assigned_shards: FnvHashMap, fetch_stream: MultiFetchStream, publish_lock: PublishLock, publish_token: PublishToken, + event_broker: EventBroker, } impl fmt::Debug for IngestSource { @@ -147,23 +153,25 @@ impl IngestSource { pub async fn try_new( runtime_args: Arc, _checkpoint: SourceCheckpoint, - ) -> anyhow::Result { + ) -> anyhow::Result { let self_node_id: NodeId = runtime_args.node_id().into(); let client_id = ClientId::new( self_node_id.clone(), - runtime_args.index_uid().clone(), - runtime_args.source_id().to_string(), + SourceUid { + index_uid: runtime_args.index_uid().clone(), + source_id: runtime_args.source_id().to_string(), + }, runtime_args.pipeline_ord(), ); let metastore = runtime_args.metastore.clone(); let ingester_pool = runtime_args.ingester_pool.clone(); - let assigned_shards = HashMap::new(); + let assigned_shards = FnvHashMap::default(); let fetch_stream = MultiFetchStream::new(self_node_id, client_id.to_string(), ingester_pool.clone()); let publish_lock = PublishLock::default(); let publish_token = client_id.new_publish_token(); - Ok(Self { + Ok(IngestSource { client_id, metastore, ingester_pool, @@ -171,6 +179,7 @@ impl IngestSource { fetch_stream, publish_lock, publish_token, + event_broker: runtime_args.event_broker.clone(), }) } @@ -237,15 +246,22 @@ impl IngestSource { false } - async fn truncate(&self, truncation_point: &[(ShardId, Position)]) { - let mut per_ingester_subrequests: HashMap<&NodeId, Vec> = - HashMap::new(); + async fn truncate(&mut self, truncation_point: Vec<(ShardId, Position)>) { + self.event_broker.publish(PublishedShardPositionsUpdate { + source_uid: self.client_id.source_uid.clone(), + published_positions_per_shard: truncation_point.clone(), + }); + + let mut per_ingester_truncate_subrequests: FnvHashMap< + &NodeId, + Vec, + > = FnvHashMap::default(); for (shard_id, to_position_exclusive) in truncation_point { if matches!(to_position_exclusive, Position::Beginning) { continue; } - let Some(shard) = self.assigned_shards.get(shard_id) else { + let Some(shard) = self.assigned_shards.get(&shard_id) else { warn!( "failed to truncate shard: shard `{}` is no longer assigned", shard_id @@ -253,23 +269,23 @@ impl IngestSource { continue; }; let truncate_shards_subrequest = TruncateShardsSubrequest { - index_uid: self.client_id.index_uid.clone().into(), - source_id: self.client_id.source_id.clone(), - shard_id: *shard_id, + index_uid: self.client_id.source_uid.index_uid.clone().into(), + source_id: self.client_id.source_uid.source_id.clone(), + shard_id, to_position_inclusive: Some(to_position_exclusive.clone()), }; if let Some(follower_id) = &shard.follower_id_opt { - per_ingester_subrequests + per_ingester_truncate_subrequests .entry(follower_id) .or_default() .push(truncate_shards_subrequest.clone()); } - per_ingester_subrequests + per_ingester_truncate_subrequests .entry(&shard.leader_id) .or_default() .push(truncate_shards_subrequest); } - for (ingester_id, truncate_subrequests) in per_ingester_subrequests { + for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests { let Some(mut ingester) = self.ingester_pool.get(ingester_id) else { warn!( "failed to truncate shard: ingester `{}` is unavailable", @@ -399,8 +415,8 @@ impl Source for IngestSource { .await?; let acquire_shards_subrequest = AcquireShardsSubrequest { - index_uid: self.client_id.index_uid.to_string(), - source_id: self.client_id.source_id.clone(), + index_uid: self.client_id.source_uid.index_uid.to_string(), + source_id: self.client_id.source_uid.source_id.clone(), shard_ids: new_assigned_shard_ids, publish_token: self.publish_token.clone(), }; @@ -432,7 +448,6 @@ impl Source for IngestSource { .publish_position_inclusive .unwrap_or_default(); let from_position_exclusive = current_position_inclusive.clone(); - let status = if from_position_exclusive == Position::Eof { IndexingStatus::Complete } else if let Err(error) = ctx @@ -457,12 +472,12 @@ impl Source for IngestSource { leader_id, follower_id_opt, partition_id, - current_position_inclusive, + current_position_inclusive: current_position_inclusive.clone(), status, }; self.assigned_shards.insert(shard_id, assigned_shard); } - self.truncate(&truncation_point).await; + self.truncate(truncation_point).await; Ok(()) } @@ -471,13 +486,14 @@ impl Source for IngestSource { checkpoint: SourceCheckpoint, _ctx: &SourceContext, ) -> anyhow::Result<()> { - let mut truncation_point = Vec::with_capacity(checkpoint.num_partitions()); - + let mut truncation_point: Vec<(ShardId, Position)> = + Vec::with_capacity(checkpoint.num_partitions()); for (partition_id, position) in checkpoint.iter() { let shard_id = partition_id.as_u64().expect("shard ID should be a u64"); truncation_point.push((shard_id, position)); } - self.truncate(&truncation_point).await; + + self.truncate(truncation_point).await; Ok(()) } @@ -507,6 +523,7 @@ mod tests { use quickwit_proto::ingest::{IngestV2Error, MRecordBatch, Shard, ShardState}; use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse}; use quickwit_storage::StorageResolver; + use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::watch; use super::*; @@ -594,13 +611,16 @@ mod tests { let ingester_0: IngesterServiceClient = ingester_mock_0.into(); ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); - let runtime_args = Arc::new(SourceRuntimeArgs { + let event_broker = EventBroker::default(); + + let runtime_args: Arc = Arc::new(SourceRuntimeArgs { pipeline_id, source_config, metastore: MetastoreServiceClient::from(mock_metastore), ingester_pool: ingester_pool.clone(), queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), + event_broker, }); let checkpoint = SourceCheckpoint::default(); let mut source = IngestSource::try_new(runtime_args, checkpoint) @@ -660,6 +680,286 @@ mod tests { time::sleep(Duration::from_millis(1)).await; } + #[tokio::test] + async fn test_ingest_source_shard_all_eof() { + // In this test, we check that if all assigned shards are marked as EOF in the metastore + // originally, we still observe the following + // - emission of a suggest truncate + // - no stream request is emitted + // - the shard EOF position is advertised in the actor observable state. + let pipeline_id = IndexingPipelineId { + node_id: "test-node".to_string(), + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + pipeline_ord: 0, + }; + let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); + let publish_token = + "indexer/test-node/test-index:0/test-source/0/00000000000000000000000000"; + + let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore + .expect_acquire_shards() + .once() + .returning(|request| { + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.shard_ids, vec![1]); + + let response = AcquireShardsResponse { + subresponses: vec![AcquireShardsSubresponse { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + acquired_shards: vec![Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::Eof), + publish_token: Some(publish_token.to_string()), + }], + }], + }; + Ok(response) + }); + let ingester_pool = IngesterPool::default(); + + let mut ingester_mock_0 = IngesterServiceClient::mock(); + ingester_mock_0 + .expect_truncate_shards() + .once() + .returning(|request| { + assert_eq!(request.ingester_id, "test-ingester-0"); + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.shard_id, 1); + assert_eq!(subrequest.to_position_inclusive, Some(Position::Eof)); + + let response = TruncateShardsResponse {}; + Ok(response) + }); + + let ingester_0: IngesterServiceClient = ingester_mock_0.into(); + ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); + + let event_broker = EventBroker::default(); + let (shard_positions_update_tx, mut shard_positions_update_rx) = + tokio::sync::mpsc::unbounded_channel::(); + event_broker + .subscribe_fn::(move |update| { + shard_positions_update_tx.send(update).unwrap(); + }) + .forever(); + + let runtime_args = Arc::new(SourceRuntimeArgs { + pipeline_id, + source_config, + metastore: MetastoreServiceClient::from(mock_metastore), + ingester_pool: ingester_pool.clone(), + queues_dir_path: PathBuf::from("./queues"), + storage_resolver: StorageResolver::for_test(), + event_broker, + }); + let checkpoint = SourceCheckpoint::default(); + let mut source = IngestSource::try_new(runtime_args, checkpoint) + .await + .unwrap(); + + let universe = Universe::with_accelerated_time(); + let (source_mailbox, _source_inbox) = universe.create_test_mailbox::(); + let (doc_processor_mailbox, _doc_processor_inbox) = + universe.create_test_mailbox::(); + let (observable_state_tx, _observable_state_rx) = watch::channel(serde_json::Value::Null); + let ctx: SourceContext = + ActorContext::for_test(&universe, source_mailbox, observable_state_tx); + + // In this scenario, the indexer will only be able to acquire shard 1. + let assignment = Assignment { shard_ids: vec![1] }; + + source + .assign_shards(assignment, &doc_processor_mailbox, &ctx) + .await + .unwrap(); + + let PublishedShardPositionsUpdate { + source_uid, + published_positions_per_shard, + } = shard_positions_update_rx.recv().await.unwrap(); + assert_eq!(source_uid.to_string(), "test-index:0:test-source"); + assert_eq!(&published_positions_per_shard, &[(1, Position::Eof)]); + } + + #[tokio::test] + async fn test_ingest_source_shard_originally_eof() { + // In this test, we check that shards that are marked as EOF in the metastore originally + // - result in the emission of a suggest truncate + // - do not get needlessly streamed + // - have their position advertised in their observable state. + let pipeline_id = IndexingPipelineId { + node_id: "test-node".to_string(), + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + pipeline_ord: 0, + }; + let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); + let publish_token = + "indexer/test-node/test-index:0/test-source/0/00000000000000000000000000"; + + let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore + .expect_acquire_shards() + .once() + .returning(|request| { + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.shard_ids, vec![1, 2]); + + let response = AcquireShardsResponse { + subresponses: vec![AcquireShardsSubresponse { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + acquired_shards: vec![ + Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(11u64.into()), + publish_token: Some(publish_token.to_string()), + }, + Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + shard_state: ShardState::Closed as i32, + publish_position_inclusive: Some(Position::Eof), + publish_token: Some(publish_token.to_string()), + }, + ], + }], + }; + Ok(response) + }); + let ingester_pool = IngesterPool::default(); + + let mut ingester_mock_0 = IngesterServiceClient::mock(); + ingester_mock_0 + .expect_open_fetch_stream() + .once() + .returning(|request| { + assert_eq!( + request.client_id, + "indexer/test-node/test-index:0/test-source/0" + ); + assert_eq!(request.index_uid, "test-index:0"); + assert_eq!(request.source_id, "test-source"); + assert_eq!(request.shard_id, 1); + assert_eq!(request.from_position_exclusive, Some(11u64.into())); + + let (_service_stream_tx, service_stream) = ServiceStream::new_bounded(1); + Ok(service_stream) + }); + ingester_mock_0 + .expect_truncate_shards() + .once() + .returning(|mut request| { + assert_eq!(request.ingester_id, "test-ingester-0"); + assert_eq!(request.subrequests.len(), 2); + request + .subrequests + .sort_by_key(|subrequest| subrequest.shard_id); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.shard_id, 1); + assert_eq!(subrequest.to_position_inclusive, Some(11u64.into())); + + let subrequest = &request.subrequests[1]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.shard_id, 2); + assert_eq!(subrequest.to_position_inclusive, Some(Position::Eof)); + + let response = TruncateShardsResponse {}; + Ok(response) + }); + + let ingester_0: IngesterServiceClient = ingester_mock_0.into(); + ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); + + let event_broker = EventBroker::default(); + let (shard_positions_update_tx, mut shard_positions_update_rx) = + tokio::sync::mpsc::unbounded_channel::(); + event_broker + .subscribe_fn::(move |update| { + shard_positions_update_tx.send(update).unwrap(); + }) + .forever(); + + let runtime_args = Arc::new(SourceRuntimeArgs { + pipeline_id, + source_config, + metastore: MetastoreServiceClient::from(mock_metastore), + ingester_pool: ingester_pool.clone(), + queues_dir_path: PathBuf::from("./queues"), + storage_resolver: StorageResolver::for_test(), + event_broker, + }); + let checkpoint = SourceCheckpoint::default(); + let mut source = IngestSource::try_new(runtime_args, checkpoint) + .await + .unwrap(); + + let universe = Universe::with_accelerated_time(); + let (source_mailbox, _source_inbox) = universe.create_test_mailbox::(); + let (doc_processor_mailbox, _doc_processor_inbox) = + universe.create_test_mailbox::(); + let (observable_state_tx, _observable_state_rx) = watch::channel(serde_json::Value::Null); + let ctx: SourceContext = + ActorContext::for_test(&universe, source_mailbox, observable_state_tx); + + // In this scenario, the indexer will only be able to acquire shard 1. + let assignment = Assignment { + shard_ids: vec![1, 2], + }; + + assert_eq!( + shard_positions_update_rx.try_recv().unwrap_err(), + TryRecvError::Empty + ); + + source + .assign_shards(assignment, &doc_processor_mailbox, &ctx) + .await + .unwrap(); + + let PublishedShardPositionsUpdate { + source_uid: _, + published_positions_per_shard, + } = shard_positions_update_rx.recv().await.unwrap(); + + assert_eq!( + &published_positions_per_shard, + &[(1, 11u64.into()), (2, Position::Eof)] + ); + } + #[tokio::test] async fn test_ingest_source_emit_batches() { let pipeline_id = IndexingPipelineId { @@ -671,6 +971,7 @@ mod tests { let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); let mock_metastore = MetastoreServiceClient::mock(); let ingester_pool = IngesterPool::default(); + let event_broker = EventBroker::default(); let runtime_args = Arc::new(SourceRuntimeArgs { pipeline_id, @@ -679,6 +980,7 @@ mod tests { ingester_pool: ingester_pool.clone(), queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), + event_broker, }); let checkpoint = SourceCheckpoint::default(); let mut source = IngestSource::try_new(runtime_args, checkpoint) @@ -831,7 +1133,7 @@ mod tests { let subrequest_2 = &request.subrequests[2]; assert_eq!(subrequest_2.shard_id, 3); - assert_eq!(subrequest_2.to_position_inclusive, Some(33u64.into())); + assert_eq!(subrequest_2.to_position_inclusive, Some(Position::Eof)); Ok(TruncateShardsResponse {}) }); @@ -852,7 +1154,7 @@ mod tests { let subrequest_1 = &request.subrequests[1]; assert_eq!(subrequest_1.shard_id, 3); - assert_eq!(subrequest_1.to_position_inclusive, Some(33u64.into())); + assert_eq!(subrequest_1.to_position_inclusive, Some(Position::Eof)); Ok(TruncateShardsResponse {}) }); @@ -876,6 +1178,15 @@ mod tests { let ingester_3: IngesterServiceClient = ingester_mock_3.into(); ingester_pool.insert("test-ingester-3".into(), ingester_3.clone()); + let event_broker = EventBroker::default(); + let (shard_positions_update_tx, mut shard_positions_update_rx) = + tokio::sync::mpsc::unbounded_channel::(); + event_broker + .subscribe_fn::(move |update| { + shard_positions_update_tx.send(update).unwrap(); + }) + .forever(); + let runtime_args = Arc::new(SourceRuntimeArgs { pipeline_id, source_config, @@ -883,6 +1194,7 @@ mod tests { ingester_pool: ingester_pool.clone(), queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), + event_broker, }); let checkpoint = SourceCheckpoint::default(); let mut source = IngestSource::try_new(runtime_args, checkpoint) @@ -946,17 +1258,32 @@ mod tests { status: IndexingStatus::Active, }, ); + let checkpoint = SourceCheckpoint::from_iter(vec![ (1u64.into(), 11u64.into()), (2u64.into(), 22u64.into()), - (3u64.into(), 33u64.into()), + (3u64.into(), Position::Eof), (4u64.into(), 44u64.into()), (5u64.into(), Position::Beginning), (6u64.into(), 66u64.into()), ]); source.suggest_truncate(checkpoint, &ctx).await.unwrap(); - // Wait for the truncate future to complete. - time::sleep(Duration::from_millis(1)).await; + let PublishedShardPositionsUpdate { + published_positions_per_shard, + .. + } = shard_positions_update_rx.recv().await.unwrap(); + + assert_eq!( + &published_positions_per_shard, + &[ + (1u64, 11u64.into()), + (2u64, 22u64.into()), + (3u64, Position::Eof), + (4u64, 44u64.into()), + (5u64, Position::Beginning), + (6u64, 66u64.into()) + ] + ); } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index d3325fc2195..c5bd193d152 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -89,6 +89,7 @@ use once_cell::sync::OnceCell; #[cfg(feature = "pulsar")] pub use pulsar_source::{PulsarSource, PulsarSourceFactory}; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceConfig, SourceParams}; use quickwit_ingest::IngesterPool; @@ -134,6 +135,7 @@ pub struct SourceRuntimeArgs { // Ingest API queues directory path. pub queues_dir_path: PathBuf, pub storage_resolver: StorageResolver, + pub event_broker: EventBroker, } impl SourceRuntimeArgs { @@ -171,13 +173,14 @@ impl SourceRuntimeArgs { source_id: source_config.source_id.clone(), pipeline_ord: 0, }; - Arc::new(Self { + Arc::new(SourceRuntimeArgs { pipeline_id, metastore, ingester_pool: IngesterPool::default(), queues_dir_path, source_config, storage_resolver: StorageResolver::for_test(), + event_broker: EventBroker::default(), }) } } diff --git a/quickwit/quickwit-metastore/src/checkpoint.rs b/quickwit/quickwit-metastore/src/checkpoint.rs index f9862102b06..e8928a7efcc 100644 --- a/quickwit/quickwit-metastore/src/checkpoint.rs +++ b/quickwit/quickwit-metastore/src/checkpoint.rs @@ -170,7 +170,6 @@ impl IndexCheckpoint { pub struct SourceCheckpoint { per_partition: BTreeMap, } - impl SourceCheckpoint { /// Adds a partition to the checkpoint. pub fn add_partition(&mut self, partition_id: PartitionId, position: Position) { diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index 1019b8ca7b6..c950031ee37 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -20,11 +20,12 @@ use std::borrow::Borrow; use std::convert::Infallible; use std::fmt; -use std::fmt::Display; +use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::str::FromStr; use serde::{Deserialize, Deserializer, Serialize}; +use thiserror::Error; pub use ulid::Ulid; mod position; @@ -118,6 +119,21 @@ impl IndexUid { } } + pub fn parse(index_uid_str: String) -> Result { + let count_colon = index_uid_str + .as_bytes() + .iter() + .copied() + .filter(|c| *c == b':') + .count(); + if count_colon != 1 { + return Err(InvalidIndexUid { + invalid_index_uid_str: index_uid_str, + }); + } + Ok(IndexUid(index_uid_str)) + } + pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -129,22 +145,30 @@ impl From for String { } } +#[derive(Error, Debug)] +#[error("invalid index uid `{invalid_index_uid_str}`")] +pub struct InvalidIndexUid { + pub invalid_index_uid_str: String, +} + impl From<&str> for IndexUid { fn from(index_uid: &str) -> Self { - Self(index_uid.to_string()) + IndexUid::from(index_uid.to_string()) } } +// TODO remove me and only keep `TryFrom` implementation. impl From for IndexUid { fn from(index_uid: String) -> IndexUid { - let count_colon = index_uid - .as_bytes() - .iter() - .copied() - .filter(|c| *c == b':') - .count(); - assert_eq!(count_colon, 1, "invalid index UID: `{}`", index_uid); - IndexUid(index_uid) + match IndexUid::parse(index_uid) { + Ok(index_uid) => index_uid, + Err(invalid_index_uid) => { + panic!( + "invalid index uid {}", + invalid_index_uid.invalid_index_uid_str + ); + } + } } } @@ -160,6 +184,20 @@ impl PartialEq for IndexUid { } } +/// It can however appear only once in a given index. +/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is. +#[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)] +pub struct SourceUid { + pub index_uid: IndexUid, + pub source_id: SourceId, +} + +impl Display for SourceUid { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "{}:{}", self.index_uid, self.source_id) + } +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct NodeId(String); diff --git a/quickwit/quickwit-proto/src/types/position.rs b/quickwit/quickwit-proto/src/types/position.rs index c87e7440f49..1ecf16625d7 100644 --- a/quickwit/quickwit-proto/src/types/position.rs +++ b/quickwit/quickwit-proto/src/types/position.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::fmt; +use std::fmt::Display; use bytes::Bytes; use bytestring::ByteString; @@ -118,6 +119,12 @@ impl Position { } } +impl Display for Position { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + impl From for Position { fn from(offset: i64) -> Self { assert!(offset >= 0); @@ -276,6 +283,15 @@ mod tests { assert_eq!(Position::from(0u64), 0usize); } + #[test] + #[allow(clippy::cmp_owned)] + fn test_position_ord() { + assert!(Position::Beginning < Position::from(0u64)); + assert!(Position::from(0u64) < Position::from(1u64)); + assert!(Position::from(1u64) < Position::Eof); + assert!(Position::Beginning < Position::Eof); + } + #[test] fn test_position_json_serde_roundtrip() { let serialized = serde_json::to_string(&Position::Beginning).unwrap();