Skip to content

Commit

Permalink
Only track shard that were requested
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 30, 2024
1 parent 42c0771 commit 0a7f224
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 56 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ impl IngestRouter {
subrequests,
commit_type: commit_type as i32,
};
workbench.record_persist_request(&persist_request);
let persist_future = async move {
let persist_result = tokio::time::timeout(
PERSIST_REQUEST_TIMEOUT,
Expand Down
220 changes: 164 additions & 56 deletions quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,70 +26,90 @@ use quickwit_proto::control_plane::{
GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, PersistSuccess};
use quickwit_proto::ingest::ingester::{
PersistFailure, PersistFailureReason, PersistRequest, PersistSuccess,
};
use quickwit_proto::ingest::router::{
IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess,
};
use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause};
use quickwit_proto::types::{NodeId, Position, ShardId, SubrequestId};
use tokio::sync::Notify;
use tracing::warn;
use tracing::{error, warn};

use super::router::PersistRequestSummary;

enum PublishState {
Tracked,
AwaitingPublish(Position),
Published(Position),
}

#[derive(Default)]
struct PublishState {
awaiting_publish: HashMap<ShardId, Position>,
already_published: HashMap<ShardId, Position>,
struct ShardPublishStates {
states: HashMap<ShardId, PublishState>,
awaiting_count: usize,
}

/// A helper for awaiting shard publish events when running in `wait_for` and
/// `force` commit mode.
///
/// Registers a set of shard positions and listens to [`ShardPositionsUpdate`]
/// events to assert when all the persisted events have been published. To make
/// sure that no events are missed:
/// - the tracker should be created before the persist requests are sent
/// - `track_persisted_position` should be called for all successful persist subrequests
/// events to assert when all the persisted events have been published. To
/// ensure that no events are missed:
/// - create the tracker before any persist requests is sent
/// - call `register_requested_shard` before each persist request to ensure that
/// the associated publish events are recorded
/// - call `track_persisted_shard_position` after each successful persist subrequests
struct PublishTracker {
state: Arc<Mutex<PublishState>>,
state: Arc<Mutex<ShardPublishStates>>,
// sync::notify instead of sync::oneshot because we don't want to store the permit
publish_complete: Arc<Notify>,
_publish_listen_handle: EventSubscriptionHandle,
}

impl PublishTracker {
fn new(event_tracker: EventBroker) -> Self {
let state = Arc::new(Mutex::new(PublishState::default()));
let state = Arc::new(Mutex::new(ShardPublishStates::default()));
let state_clone = state.clone();
let publish_complete = Arc::new(Notify::new());
let publish_complete_notifier = publish_complete.clone();
let _publish_listen_handle =
event_tracker.subscribe(move |update: ShardPositionsUpdate| {
let mut state_handle = state_clone.lock().unwrap();
for (updated_shard_id, updated_position) in &update.updated_shard_positions {
if let Some(shard_position) =
state_handle.awaiting_publish.get(updated_shard_id)
{
if updated_position >= shard_position {
state_handle.awaiting_publish.remove(updated_shard_id);
if state_handle.awaiting_publish.is_empty() {
// The notification is only relevant once
// `self.wait_publish_complete()` is called.
// Before that, `state.awaiting_publish` might
// still be re-populated.
publish_complete_notifier.notify_waiters();
if let Some(publish_state) = state_handle.states.get_mut(updated_shard_id) {
match publish_state {
PublishState::AwaitingPublish(shard_position)
if updated_position >= shard_position =>
{
*publish_state = PublishState::Published(updated_position.clone());
state_handle.awaiting_count -= 1;
if state_handle.awaiting_count == 0 {
// The notification is only relevant once
// `self.wait_publish_complete()` is called.
// Before that, `state.awaiting_publish` might
// still be re-populated.
publish_complete_notifier.notify_waiters();
}
}
PublishState::Published(current_position)
if updated_position > current_position =>
{
*current_position = updated_position.clone();
}
PublishState::Tracked => {
*publish_state = PublishState::Published(updated_position.clone());
}
PublishState::Published(_) => {
// looks like a duplicate or out-of-order event
}
PublishState::AwaitingPublish(_) => {
// the shard made some progress but we are waiting for more
}
}
} else {
// Save this position update in case the publish update
// event arrived before the shard persist response. We
// might build a state that tracks irrelevant shards for
// the duration of the query but that should be fine.
state_handle
.already_published
.insert(updated_shard_id.clone(), updated_position.clone());
}
// else: this shard is not being tracked here
}
});
Self {
Expand All @@ -99,27 +119,48 @@ impl PublishTracker {
}
}

fn track_persisted_position(&self, shard_id: ShardId, new_position: Position) {
fn register_requested_shard<'a>(&'a self, shard_ids: impl IntoIterator<Item = &'a ShardId>) {
let mut state_handle = self.state.lock().unwrap();
match state_handle.already_published.get(&shard_id) {
Some(already_published_position) if new_position <= *already_published_position => {
// already published, no need to track this shard's position updates
}
_ => {
state_handle
.awaiting_publish
.insert(shard_id.clone(), new_position.clone());
for shard_id in shard_ids {
state_handle
.states
.entry(shard_id.clone())
.or_insert(PublishState::Tracked);
}
}

fn track_persisted_shard_position(&self, shard_id: ShardId, new_position: Position) {
let mut state_handle = self.state.lock().unwrap();
if let Some(publish_state) = state_handle.states.get_mut(&shard_id) {
match publish_state {
PublishState::Published(current_position) if new_position <= *current_position => {
// new position already published, no need to track it
}
PublishState::AwaitingPublish(old_position) => {
error!(
%old_position,
%new_position,
%shard_id,
"shard persisted positions should not be tracked multiple times"
);
}
PublishState::Tracked | PublishState::Published(_) => {
*publish_state = PublishState::AwaitingPublish(new_position.clone());
state_handle.awaiting_count += 1;
}
}
} else {
error!(%shard_id, "requested shards should be registered before their position is tracked")
}
}

async fn wait_publish_complete(self) {
// correctness: new shards cannot be added to `state.awaiting_publish`
// at this point because `self` is consumed. By subscribing to
// `publish_complete` before checking `awaiting_publish`, we make sure we
// don't miss the moment when it becomes empty.
// correctness: `awaiting_count` cannot be increased after this point
// because `self` is consumed. By subscribing to `publish_complete`
// before checking `awaiting_count`, we make sure we don't miss the
// moment when it becomes 0.
let notified = self.publish_complete.notified();
if self.state.lock().unwrap().awaiting_publish.is_empty() {
if self.state.lock().unwrap().awaiting_count == 0 {
return;
}
notified.await;
Expand Down Expand Up @@ -222,6 +263,16 @@ impl IngestWorkbench {
.all(|subworbench| !subworbench.is_pending())
}

pub fn record_persist_request(&self, persist_request: &PersistRequest) {
if let Some(publish_tracker) = &self.publish_tracker {
let shards = persist_request
.subrequests
.iter()
.map(|subrequest| subrequest.shard_id());
publish_tracker.register_requested_shard(shards);
}
}

pub fn record_get_or_create_open_shards_failure(
&mut self,
open_shards_failure: GetOrCreateOpenShardsFailure,
Expand Down Expand Up @@ -255,8 +306,10 @@ impl IngestWorkbench {
};
if let Some(publish_tracker) = &mut self.publish_tracker {
if let Some(position) = &persist_success.replication_position_inclusive {
publish_tracker
.track_persisted_position(persist_success.shard_id().clone(), position.clone());
publish_tracker.track_persisted_shard_position(
persist_success.shard_id().clone(),
position.clone(),
);
}
}
self.num_successes += 1;
Expand Down Expand Up @@ -485,7 +538,7 @@ impl IngestSubworkbench {
mod tests {
use std::time::Duration;

use quickwit_proto::ingest::ingester::PersistFailureReason;
use quickwit_proto::ingest::ingester::{PersistFailureReason, PersistSubrequest};
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};

use super::*;
Expand All @@ -498,11 +551,15 @@ mod tests {
let shard_id_1 = ShardId::from("test-shard-1");
let shard_id_2 = ShardId::from("test-shard-2");
let shard_id_3 = ShardId::from("test-shard-3");
let shard_id_4 = ShardId::from("test-shard-3");
let shard_id_4 = ShardId::from("test-shard-4");
let shard_id_5 = ShardId::from("test-shard-5"); // not tracked

tracker.track_persisted_position(shard_id_1.clone(), Position::offset(42usize));
tracker.track_persisted_position(shard_id_2.clone(), Position::offset(42usize));
tracker.track_persisted_position(shard_id_3.clone(), Position::offset(42usize));
tracker
.register_requested_shard([&shard_id_1, &shard_id_2, &shard_id_3, &shard_id_4].clone());

tracker.track_persisted_shard_position(shard_id_1.clone(), Position::offset(42usize));
tracker.track_persisted_shard_position(shard_id_2.clone(), Position::offset(42usize));
tracker.track_persisted_shard_position(shard_id_3.clone(), Position::offset(42usize));

event_broker.publish(ShardPositionsUpdate {
source_uid: SourceUid {
Expand All @@ -512,6 +569,7 @@ mod tests {
updated_shard_positions: vec![
(shard_id_1.clone(), Position::offset(42usize)),
(shard_id_2.clone(), Position::offset(666usize)),
(shard_id_5.clone(), Position::offset(888usize)),
]
.into_iter()
.collect(),
Expand All @@ -531,7 +589,7 @@ mod tests {
});

// persist response received after the publish event
tracker.track_persisted_position(shard_id_4.clone(), Position::offset(42usize));
tracker.track_persisted_shard_position(shard_id_4.clone(), Position::offset(42usize));

tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete())
.await
Expand All @@ -542,13 +600,15 @@ mod tests {
async fn test_publish_tracker_waits() {
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);
let shard_id_1 = ShardId::from("test-shard-1");
let shard_id_2 = ShardId::from("test-shard-2");
let position = Position::offset(42usize);

{
let event_broker = EventBroker::default();
let tracker = PublishTracker::new(event_broker.clone());
tracker.track_persisted_position(shard_id_1.clone(), position.clone());
tracker.track_persisted_position(ShardId::from("test-shard-2"), position.clone());
tracker.register_requested_shard([&shard_id_1, &shard_id_2].clone());
tracker.track_persisted_shard_position(shard_id_1.clone(), position.clone());
tracker.track_persisted_shard_position(shard_id_2.clone(), position.clone());

event_broker.publish(ShardPositionsUpdate {
source_uid: SourceUid {
Expand All @@ -567,7 +627,8 @@ mod tests {
{
let event_broker = EventBroker::default();
let tracker = PublishTracker::new(event_broker.clone());
tracker.track_persisted_position(shard_id_1.clone(), position.clone());
tracker.register_requested_shard([&shard_id_1, &shard_id_2].clone());
tracker.track_persisted_shard_position(shard_id_1.clone(), position.clone());
event_broker.publish(ShardPositionsUpdate {
source_uid: SourceUid {
index_uid: index_uid.clone(),
Expand All @@ -579,7 +640,7 @@ mod tests {
});
// sleep to make sure the event is processed
tokio::time::sleep(Duration::from_millis(50)).await;
tracker.track_persisted_position(ShardId::from("test-shard-2"), position.clone());
tracker.track_persisted_shard_position(shard_id_2.clone(), position.clone());

tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete())
.await
Expand Down Expand Up @@ -740,6 +801,23 @@ mod tests {
assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 2);
assert!(!workbench.is_complete());

let persist_request = PersistRequest {
subrequests: vec![
PersistSubrequest {
subrequest_id: 0,
shard_id: Some(shard_id_1.clone()),
..Default::default()
},
PersistSubrequest {
subrequest_id: 1,
shard_id: Some(shard_id_2.clone()),
..Default::default()
},
],
..Default::default()
};
workbench.record_persist_request(&persist_request);

let persist_success = PersistSuccess {
subrequest_id: 0,
shard_id: Some(shard_id_1.clone()),
Expand All @@ -761,6 +839,19 @@ mod tests {
replication_position_inclusive: Some(Position::offset(66usize)),
..Default::default()
};

// retry to persist shard 2

let persist_request = PersistRequest {
subrequests: vec![PersistSubrequest {
subrequest_id: 1,
shard_id: Some(shard_id_2.clone()),
..Default::default()
}],
..Default::default()
};
workbench.record_persist_request(&persist_request);

workbench.record_persist_success(persist_success);

assert!(workbench.is_complete());
Expand Down Expand Up @@ -803,6 +894,23 @@ mod tests {
let mut workbench =
IngestWorkbench::new_with_publish_tracking(ingest_subrequests, 1, event_broker.clone());

let persist_request = PersistRequest {
subrequests: vec![
PersistSubrequest {
subrequest_id: 0,
shard_id: Some(shard_id_1.clone()),
..Default::default()
},
PersistSubrequest {
subrequest_id: 1,
shard_id: Some(shard_id_2.clone()),
..Default::default()
},
],
..Default::default()
};
workbench.record_persist_request(&persist_request);

let persist_success = PersistSuccess {
subrequest_id: 0,
shard_id: Some(shard_id_1.clone()),
Expand Down

0 comments on commit 0a7f224

Please sign in to comment.