Skip to content

Commit

Permalink
Address smaller review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 011e908 commit 88225cb
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 21 deletions.
12 changes: 10 additions & 2 deletions quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ use super::Categorized;
/// dropped and the partition is marked as `completed``.
#[derive(Default)]
pub struct QueueLocalState {
/// Visibility handles for all messages from the moment they are marked
/// `ready` until they are committed (`completed`)
in_flight: BTreeMap<PartitionId, VisibilityTaskHandle>,
/// Messages that were received from the queue and are ready to be read
ready: VecDeque<CheckpointedMessage>,
/// Message that is currently being read and sent to the `DocProcessor`
in_progress: Option<InProgressMessage>,
/// Partitions that were indexed and committed
completed: BTreeSet<PartitionId>,
}

Expand Down Expand Up @@ -119,11 +124,14 @@ pub mod test_helpers {
let is_read_in_progress = self
.in_progress
.as_ref()
.map(|m| m.partition_id() == partition_id)
.map(|msg| msg.partition_id() == partition_id)
.unwrap_or(false);
let is_wait_for_commit =
self.in_flight.contains_key(partition_id) && !is_read_in_progress;
let is_ready = self.ready.iter().any(|m| m.partition_id() == *partition_id);
let is_ready = self
.ready
.iter()
.any(|msg| msg.partition_id() == *partition_id);

let states = [
is_completed,
Expand Down
12 changes: 7 additions & 5 deletions quickwit/quickwit-indexing/src/source/queue_sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use message::{PreProcessedMessage, RawMessage};
/// The queue abstraction is based on the AWS SQS and Google Pubsub APIs. The
/// only requirement of the underlying implementation is that messages exposed
/// to a given consumer are hidden to other consumers for a configurable period
/// of time. Retries are
/// of time. Retries are handled by the implementation because queues might
/// behave differently (throttling, deduplication...).
#[async_trait]
pub trait Queue: fmt::Debug + Send + Sync + 'static {
/// Poll the queue to receive messages.
Expand All @@ -49,11 +50,12 @@ pub trait Queue: fmt::Debug + Send + Sync + 'static {
/// queue, it should be returned as quickly as possible.
async fn receive(&self) -> anyhow::Result<Vec<RawMessage>>;

/// Try to acknowledge the messages, effectively deleting them from the queue.
/// Try to acknowledge the messages, effectively deleting them from the
/// queue.
///
/// The call might return `Ok(())` yet fail partially:
/// - if it's a transient failure? -> TODO check
/// - if the message was already acknowledged
/// The call returns `Ok(())` if:
/// - the acknowledgement of some of the messages failed due to a transient failure
/// - the message was already acknowledged
async fn acknowledge(&self, ack_ids: &[&str]) -> anyhow::Result<()>;

/// Modify the visibility deadline of the messages.
Expand Down
14 changes: 8 additions & 6 deletions quickwit/quickwit-indexing/src/source/queue_sources/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl QueueProcessor {

let preprocessed_messages = raw_messages
.into_iter()
.map(|m| m.pre_process(self.queue_params.message_type))
.map(|msg| msg.pre_process(self.queue_params.message_type))
.collect::<anyhow::Result<Vec<_>>>()?;

let categorized_using_local_state =
Expand All @@ -164,11 +164,13 @@ impl QueueProcessor {
.await?;

// Drop visibility tasks for messages that have been processed by another pipeline
let completed_visibility_tasks = categorized_using_shared_state
.already_processed
.iter()
.filter_map(|m| self.local_state.mark_completed(m.partition_id()))
.collect();
let mut completed_visibility_tasks = Vec::new();
for preproc_msg in &categorized_using_shared_state.already_processed {
let handle_opt = self.local_state.mark_completed(preproc_msg.partition_id());
if let Some(handle) = handle_opt {
completed_visibility_tasks.push(handle);
}
}
acknowledge_and_abort(&*self.queue, completed_visibility_tasks).await?;

// Acknowledge messages that have been processed by another pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl QueueSharedState for QueueSharedStateImpl {
messages: Vec<PreProcessedMessage>,
) -> anyhow::Result<Categorized<CheckpointedMessage, PreProcessedMessage>> {
let mut message_map =
BTreeMap::from_iter(messages.into_iter().map(|m| (m.partition_id(), m)));
BTreeMap::from_iter(messages.into_iter().map(|msg| (msg.partition_id(), msg)));
let partition_ids = message_map.keys().cloned().collect();

let Categorized {
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,19 @@ impl Queue for SqsQueue {
res.messages
.unwrap_or_default()
.into_iter()
.map(|m| {
let delivery_attempts: usize = m
.map(|msg| {
let delivery_attempts: usize = msg
.attributes
.as_ref()
.and_then(|attrs| {
attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount)
})
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let ack_id = m
let ack_id = msg
.receipt_handle
.context("missing receipt_handle in received message")?;
let message_id = m
let message_id = msg
.message_id
.context("missing message_id in received message")?;
Ok(RawMessage {
Expand All @@ -98,7 +98,7 @@ impl Queue for SqsQueue {
initial_deadline,
delivery_attempts,
},
payload: OwnedBytes::new(m.body.unwrap_or_default().into_bytes()),
payload: OwnedBytes::new(msg.body.unwrap_or_default().into_bytes()),
})
})
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn spawn_visibility_task(
initial_deadline: Instant,
publish_lock: PublishLock,
) -> VisibilityTaskHandle {
let task_handle = tokio::spawn(handle_visibility(
let task_handle = tokio::spawn(extend_visibility_loop(
queue,
ack_id.clone(),
initial_deadline,
Expand All @@ -58,7 +58,7 @@ pub fn spawn_visibility_task(
/// - we don't want to fail the pipeline if we fail to extend the visibility of a message that is
/// still waiting for processing
/// - the Processor must also be notified that it shouldn't process this message anymore
async fn handle_visibility(
async fn extend_visibility_loop(
queue: Arc<dyn Queue>,
ack_id: String,
initial_deadline: Instant,
Expand Down

0 comments on commit 88225cb

Please sign in to comment.