diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs index fe810fbf960..d374e541159 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs @@ -35,9 +35,14 @@ use super::Categorized; /// dropped and the partition is marked as `completed``. #[derive(Default)] pub struct QueueLocalState { + /// Visibility handles for all messages from the moment they are marked + /// `ready` until they are committed (`completed`) in_flight: BTreeMap, + /// Messages that were received from the queue and are ready to be read ready: VecDeque, + /// Message that is currently being read and sent to the `DocProcessor` in_progress: Option, + /// Partitions that were indexed and committed completed: BTreeSet, } @@ -119,11 +124,14 @@ pub mod test_helpers { let is_read_in_progress = self .in_progress .as_ref() - .map(|m| m.partition_id() == partition_id) + .map(|msg| msg.partition_id() == partition_id) .unwrap_or(false); let is_wait_for_commit = self.in_flight.contains_key(partition_id) && !is_read_in_progress; - let is_ready = self.ready.iter().any(|m| m.partition_id() == *partition_id); + let is_ready = self + .ready + .iter() + .any(|msg| msg.partition_id() == *partition_id); let states = [ is_completed, diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs b/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs index 47070983071..cc7ee3c5339 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs @@ -38,7 +38,8 @@ use message::{PreProcessedMessage, RawMessage}; /// The queue abstraction is based on the AWS SQS and Google Pubsub APIs. The /// only requirement of the underlying implementation is that messages exposed /// to a given consumer are hidden to other consumers for a configurable period -/// of time. Retries are +/// of time. Retries are handled by the implementation because queues might +/// behave differently (throttling, deduplication...). #[async_trait] pub trait Queue: fmt::Debug + Send + Sync + 'static { /// Poll the queue to receive messages. @@ -49,11 +50,12 @@ pub trait Queue: fmt::Debug + Send + Sync + 'static { /// queue, it should be returned as quickly as possible. async fn receive(&self) -> anyhow::Result>; - /// Try to acknowledge the messages, effectively deleting them from the queue. + /// Try to acknowledge the messages, effectively deleting them from the + /// queue. /// - /// The call might return `Ok(())` yet fail partially: - /// - if it's a transient failure? -> TODO check - /// - if the message was already acknowledged + /// The call returns `Ok(())` if: + /// - the acknowledgement of some of the messages failed due to a transient failure + /// - the message was already acknowledged async fn acknowledge(&self, ack_ids: &[&str]) -> anyhow::Result<()>; /// Modify the visibility deadline of the messages. diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs b/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs index 5caf4350d9c..fd7f5092d1c 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs @@ -149,7 +149,7 @@ impl QueueProcessor { let preprocessed_messages = raw_messages .into_iter() - .map(|m| m.pre_process(self.queue_params.message_type)) + .map(|msg| msg.pre_process(self.queue_params.message_type)) .collect::>>()?; let categorized_using_local_state = @@ -164,11 +164,13 @@ impl QueueProcessor { .await?; // Drop visibility tasks for messages that have been processed by another pipeline - let completed_visibility_tasks = categorized_using_shared_state - .already_processed - .iter() - .filter_map(|m| self.local_state.mark_completed(m.partition_id())) - .collect(); + let mut completed_visibility_tasks = Vec::new(); + for preproc_msg in &categorized_using_shared_state.already_processed { + let handle_opt = self.local_state.mark_completed(preproc_msg.partition_id()); + if let Some(handle) = handle_opt { + completed_visibility_tasks.push(handle); + } + } acknowledge_and_abort(&*self.queue, completed_visibility_tasks).await?; // Acknowledge messages that have been processed by another pipeline diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs index 63e4e139ffa..76054a798ea 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs @@ -109,7 +109,7 @@ impl QueueSharedState for QueueSharedStateImpl { messages: Vec, ) -> anyhow::Result> { let mut message_map = - BTreeMap::from_iter(messages.into_iter().map(|m| (m.partition_id(), m))); + BTreeMap::from_iter(messages.into_iter().map(|msg| (msg.partition_id(), msg))); let partition_ids = message_map.keys().cloned().collect(); let Categorized { diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs b/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs index 802c4ac254b..8b1cf283517 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs @@ -76,8 +76,8 @@ impl Queue for SqsQueue { res.messages .unwrap_or_default() .into_iter() - .map(|m| { - let delivery_attempts: usize = m + .map(|msg| { + let delivery_attempts: usize = msg .attributes .as_ref() .and_then(|attrs| { @@ -85,10 +85,10 @@ impl Queue for SqsQueue { }) .and_then(|s| s.parse().ok()) .unwrap_or(0); - let ack_id = m + let ack_id = msg .receipt_handle .context("missing receipt_handle in received message")?; - let message_id = m + let message_id = msg .message_id .context("missing message_id in received message")?; Ok(RawMessage { @@ -98,7 +98,7 @@ impl Queue for SqsQueue { initial_deadline, delivery_attempts, }, - payload: OwnedBytes::new(m.body.unwrap_or_default().into_bytes()), + payload: OwnedBytes::new(msg.body.unwrap_or_default().into_bytes()), }) }) .collect() diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs b/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs index bcb1819c67f..6d94884434b 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs @@ -41,7 +41,7 @@ pub fn spawn_visibility_task( initial_deadline: Instant, publish_lock: PublishLock, ) -> VisibilityTaskHandle { - let task_handle = tokio::spawn(handle_visibility( + let task_handle = tokio::spawn(extend_visibility_loop( queue, ack_id.clone(), initial_deadline, @@ -58,7 +58,7 @@ pub fn spawn_visibility_task( /// - we don't want to fail the pipeline if we fail to extend the visibility of a message that is /// still waiting for processing /// - the Processor must also be notified that it shouldn't process this message anymore -async fn handle_visibility( +async fn extend_visibility_loop( queue: Arc, ack_id: String, initial_deadline: Instant,