Skip to content

Commit

Permalink
Shorten visibility extension task
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 88225cb commit 935e7fa
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 319 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl SourceParams {
Self::Void(VoidSourceParams)
}

/// Checkpoints can be stored either in the index metadata (false) or the shard table (true).
pub fn use_shard_api(&self) -> bool {
match self {
SourceParams::File(_) => false,
Expand Down
168 changes: 61 additions & 107 deletions quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,138 +21,92 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};

use quickwit_metastore::checkpoint::PartitionId;

use super::message::{CheckpointedMessage, InProgressMessage, PreProcessedMessage};
use super::visibility::VisibilityTaskHandle;
use super::Categorized;
use super::message::{InProgressMessage, ReadyMessage};

/// Tracks the state of the the queue messages that is known to the owning
/// indexing pipeline.
/// Tracks the state of the queue messages that are known to the owning indexing
/// pipeline.
///
/// Messages first land in the `ready` queue. In parallel, they are also
/// recorded as `in_flight`, where the visibility handle is kept alive. Messages
/// are then moved to `in_progress` to track the reader's progress. It's only
/// when the events are published that the in-flight visiblity handles are
/// dropped and the partition is marked as `completed``.
/// Messages first land in the `ready_for_read` queue. They are then moved to
/// `read_in_progress` to track the reader's progress. Once the reader reaches
/// EOF, the message is transitioned as `awaiting_commit`. Once the message is
/// known to be fully indexed and committed (e.g after receiving the
/// `suggest_truncate` call), it is moved to `completed`.
#[derive(Default)]
pub struct QueueLocalState {
/// Visibility handles for all messages from the moment they are marked
/// `ready` until they are committed (`completed`)
in_flight: BTreeMap<PartitionId, VisibilityTaskHandle>,
/// Messages that were received from the queue and are ready to be read
ready: VecDeque<CheckpointedMessage>,
ready_for_read: VecDeque<ReadyMessage>,
/// Message that is currently being read and sent to the `DocProcessor`
in_progress: Option<InProgressMessage>,
/// Partitions that were indexed and committed
read_in_progress: Option<InProgressMessage>,
/// Partitions that were read and are still being indexed, with their
/// associated ack_id
awaiting_commit: BTreeMap<PartitionId, String>,
/// Partitions that were fully indexed and committed
completed: BTreeSet<PartitionId>,
}

impl QueueLocalState {
/// Split the message list into:
/// - processable (returned)
/// - already processed (returned)
/// - tracked as in progress (dropped)
pub fn filter_completed(
&self,
messages: Vec<PreProcessedMessage>,
) -> Categorized<PreProcessedMessage, PreProcessedMessage> {
let mut processable = Vec::new();
let mut completed = Vec::new();
for message in messages {
let partition_id = message.partition_id();
if self.completed.contains(&partition_id) {
completed.push(message);
} else if self.in_flight.contains_key(&partition_id) {
// already in progress, drop the message
} else {
processable.push(message);
}
}
Categorized {
already_processed: completed,
processable,
}
pub fn is_ready_for_read(&self, partition_id: &PartitionId) -> bool {
self.ready_for_read
.iter()
.any(|msg| &msg.partition_id() == partition_id)
}

pub fn set_ready_messages(
&mut self,
ready_messages: Vec<(CheckpointedMessage, VisibilityTaskHandle)>,
) {
for (message, handle) in ready_messages {
let partition_id = message.partition_id();
self.in_flight.insert(partition_id, handle);
self.ready.push_back(message)
}
pub fn is_read_in_progress(&self, partition_id: &PartitionId) -> bool {
self.read_in_progress
.as_ref()
.map_or(false, |msg| msg.partition_id() == partition_id)
}

pub fn in_progress_mut(&mut self) -> Option<&mut InProgressMessage> {
self.in_progress.as_mut()
pub fn is_awating_commit(&self, partition_id: &PartitionId) -> bool {
self.awaiting_commit.contains_key(partition_id)
}

pub fn set_in_progress(&mut self, in_progress: Option<InProgressMessage>) {
self.in_progress = in_progress;
pub fn is_completed(&self, partition_id: &PartitionId) -> bool {
self.completed.contains(partition_id)
}

pub fn get_ready_message(&mut self) -> Option<CheckpointedMessage> {
self.ready.pop_front()
pub fn is_tracked(&self, partition_id: &PartitionId) -> bool {
self.is_ready_for_read(partition_id)
|| self.is_read_in_progress(partition_id)
|| self.is_awating_commit(partition_id)
|| self.is_completed(partition_id)
}

pub fn mark_completed(&mut self, partition_id: PartitionId) -> Option<VisibilityTaskHandle> {
let visibility_handle = self.in_flight.remove(&partition_id);
self.completed.insert(partition_id);
visibility_handle
pub fn set_ready_for_read(&mut self, ready_messages: Vec<ReadyMessage>) {
for message in ready_messages {
self.ready_for_read.push_back(message)
}
}
}

#[cfg(test)]
pub mod test_helpers {
use quickwit_metastore::checkpoint::PartitionId;

use super::*;

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum LocalStateForPartition {
Unknown,
Ready,
ReadInProgress,
WaitForCommit,
Completed,
pub fn get_ready_for_read(&mut self) -> Option<ReadyMessage> {
while let Some(msg) = self.ready_for_read.pop_front() {
// don't return messages for which we didn't manage to extend the
// visibility, they will pop up in the queue again anyway
if !msg.visibility_handle.extension_failed() {
return Some(msg);
}
}
None
}

impl QueueLocalState {
pub fn state(&self, partition_id: &PartitionId) -> LocalStateForPartition {
let is_completed = self.completed.contains(partition_id);
let is_read_in_progress = self
.in_progress
.as_ref()
.map(|msg| msg.partition_id() == partition_id)
.unwrap_or(false);
let is_wait_for_commit =
self.in_flight.contains_key(partition_id) && !is_read_in_progress;
let is_ready = self
.ready
.iter()
.any(|msg| msg.partition_id() == *partition_id);

let states = [
is_completed,
is_read_in_progress,
is_wait_for_commit,
is_ready,
];
let simulteanous_states = states.into_iter().filter(|x| *x).count();
assert!(simulteanous_states <= 1);
pub fn read_in_progress_mut(&mut self) -> Option<&mut InProgressMessage> {
self.read_in_progress.as_mut()
}

if is_completed {
LocalStateForPartition::Completed
} else if is_ready {
LocalStateForPartition::Ready
} else if is_read_in_progress {
LocalStateForPartition::ReadInProgress
} else if is_wait_for_commit {
LocalStateForPartition::WaitForCommit
} else {
LocalStateForPartition::Unknown
}
pub fn replace_currently_read(&mut self, in_progress: Option<InProgressMessage>) {
if let Some(just_finished) = self.read_in_progress.take() {
self.awaiting_commit.insert(
just_finished.partition_id().clone(),
just_finished.ack_id().to_string(),
);
}
self.read_in_progress = in_progress;
}

/// Returns the ack_id if that message was awaiting_commit
pub fn mark_completed(&mut self, partition_id: PartitionId) -> Option<String> {
let ack_id_opt = self.awaiting_commit.remove(&partition_id);
self.completed.insert(partition_id);
ack_id_opt
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ impl Queue for MemoryQueue {
Ok(vec![])
}

async fn acknowledge(&self, ack_ids: &[&str]) -> anyhow::Result<()> {
async fn acknowledge(&self, ack_ids: &Vec<String>) -> anyhow::Result<()> {
let mut inner_state = self.inner_state.lock().unwrap();
for ack_id in ack_ids {
if let Some(msg) = inner_state.in_flight.remove(*ack_id) {
if let Some(msg) = inner_state.in_flight.remove(ack_id) {
inner_state.acked.push(msg);
}
}
Expand Down
55 changes: 34 additions & 21 deletions quickwit/quickwit-indexing/src/source/queue_sources/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use quickwit_proto::types::Position;
use quickwit_storage::{OwnedBytes, StorageResolver};
use serde_json::Value;

use super::visibility::VisibilityTaskHandle;
use crate::actors::DocProcessor;
use crate::source::doc_file_reader::DocFileReader;
use crate::source::{BatchBuilder, SourceContext, BATCH_NUM_BYTES_LIMIT};
Expand Down Expand Up @@ -118,12 +119,13 @@ fn uri_from_s3_notification(message: &OwnedBytes) -> anyhow::Result<Uri> {

/// A message for which we know as much of the global processing status as
/// possible and that is now ready to be processed.
pub struct CheckpointedMessage {
pub struct ReadyMessage {
pub position: Position,
pub content: PreProcessedMessage,
pub visibility_handle: VisibilityTaskHandle,
}

impl CheckpointedMessage {
impl ReadyMessage {
pub async fn start_processing(
self,
storage_resolver: &StorageResolver,
Expand All @@ -141,13 +143,17 @@ impl CheckpointedMessage {
};
let reader =
DocFileReader::from_uri(storage_resolver, &uri, current_offset).await?;
Ok(Some(InProgressMessage::ObjectUri(ObjectUriInProgress {
reader,
current_offset,
Ok(Some(InProgressMessage {
progress_tracker: ProgressTracker::ObjectUri(ObjectUriInProgress {
reader,
current_offset,

source_type,
is_eof: false,
}),
partition_id,
source_type,
is_eof: false,
})))
visibility_handle: self.visibility_handle,
}))
}
}
}
Expand All @@ -157,8 +163,14 @@ impl CheckpointedMessage {
}
}

// A message that is actively being read
pub enum InProgressMessage {
/// A message that is actively being read
pub struct InProgressMessage {
partition_id: PartitionId,
visibility_handle: VisibilityTaskHandle,
progress_tracker: ProgressTracker,
}

pub enum ProgressTracker {
ObjectUri(ObjectUriInProgress),
}

Expand All @@ -168,31 +180,31 @@ impl InProgressMessage {
doc_processor_mailbox: &Mailbox<DocProcessor>,
source_ctx: &SourceContext,
) -> anyhow::Result<()> {
match self {
Self::ObjectUri(in_progress) => {
match &mut self.progress_tracker {
ProgressTracker::ObjectUri(in_progress) => {
in_progress
.process_uri_batch(doc_processor_mailbox, source_ctx)
.process_uri_batch(doc_processor_mailbox, source_ctx, self.partition_id.clone())
.await
}
}
}

pub fn is_eof(&self) -> bool {
match self {
Self::ObjectUri(in_progress) => in_progress.is_eof,
match &self.progress_tracker {
ProgressTracker::ObjectUri(in_progress) => in_progress.is_eof,
}
}

#[cfg(test)]
pub fn partition_id(&self) -> &PartitionId {
match self {
Self::ObjectUri(in_progress) => &in_progress.partition_id,
}
&self.partition_id
}

pub fn ack_id(&self) -> &str {
self.visibility_handle.ack_id()
}
}

pub struct ObjectUriInProgress {
partition_id: PartitionId,
reader: DocFileReader,
current_offset: usize,
source_type: SourceType,
Expand All @@ -204,6 +216,7 @@ impl ObjectUriInProgress {
&mut self,
doc_processor_mailbox: &Mailbox<DocProcessor>,
source_ctx: &SourceContext,
partition_id: PartitionId,
) -> anyhow::Result<()> {
let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize;
let mut new_offset = self.current_offset;
Expand All @@ -226,7 +239,7 @@ impl ObjectUriInProgress {
batch_builder
.checkpoint_delta
.record_partition_delta(
self.partition_id.clone(),
partition_id,
Position::offset(self.current_offset),
to_position,
)
Expand Down
21 changes: 2 additions & 19 deletions quickwit/quickwit-indexing/src/source/queue_sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::fmt;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use message::{PreProcessedMessage, RawMessage};
use message::RawMessage;

/// The queue abstraction is based on the AWS SQS and Google Pubsub APIs. The
/// only requirement of the underlying implementation is that messages exposed
Expand All @@ -56,7 +56,7 @@ pub trait Queue: fmt::Debug + Send + Sync + 'static {
/// The call returns `Ok(())` if:
/// - the acknowledgement of some of the messages failed due to a transient failure
/// - the message was already acknowledged
async fn acknowledge(&self, ack_ids: &[&str]) -> anyhow::Result<()>;
async fn acknowledge(&self, ack_ids: &Vec<String>) -> anyhow::Result<()>;

/// Modify the visibility deadline of the messages.
///
Expand All @@ -71,20 +71,3 @@ pub trait Queue: fmt::Debug + Send + Sync + 'static {
suggested_deadline: Duration,
) -> anyhow::Result<Instant>;
}

pub struct Categorized<U, V> {
pub processable: Vec<U>,
pub already_processed: Vec<V>,
}

/// Acknowledges a list of messages
pub async fn acknowledge(
queue: &dyn Queue,
messages: Vec<PreProcessedMessage>,
) -> anyhow::Result<()> {
let ack_ids = messages
.iter()
.map(|message| message.metadata.ack_id.as_str())
.collect::<Vec<_>>();
queue.acknowledge(&ack_ids).await
}
Loading

0 comments on commit 935e7fa

Please sign in to comment.