Skip to content

Commit

Permalink
Fix race condition that was surfaced due to the debouncing of the
Browse files Browse the repository at this point in the history
control plane build plan.

In Ingest v1, queues are created on apply plans.
This means that right after creating an index, documents cannot be
inserted.

This was true before, but it became truely apparent after we
added debouncing of control plane apply plans.
  • Loading branch information
fulmicoton committed Mar 6, 2024
1 parent 55b10d2 commit 351fcae
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 38 deletions.
54 changes: 36 additions & 18 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use std::time::Duration;
use anyhow::Context;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{Future, StreamExt};
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe,
WeakMailbox,
Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox, Supervisor, Universe, WeakMailbox
};
use quickwit_cluster::{
ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode,
Expand Down Expand Up @@ -97,6 +96,7 @@ pub struct ControlPlane {
rebuild_plan_debouncer: Debouncer,
}


impl fmt::Debug for ControlPlane {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ControlPlane").finish()
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Actor for ControlPlane {
.await
.context("failed to initialize the model")?;

self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);

self.ingest_controller.sync_with_all_ingesters(&self.model);

Expand Down Expand Up @@ -313,9 +313,15 @@ impl ControlPlane {
///
/// This method includes some debouncing logic. Every call will be followed by a cooldown
/// period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
///
/// This method returns a future that can awaited to ensure that the relevant rebuild plan
/// operation has been executed.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) -> impl Future<Output = ()> {
let current_generation_processed_waiter =
self.indexing_scheduler.next_rebuild_tracker.current_generation_processed_waiter();
self.rebuild_plan_debouncer
.self_send_with_cooldown::<RebuildPlan>(ctx);
current_generation_processed_waiter
}
}

Expand Down Expand Up @@ -370,7 +376,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
ctx.progress(),
)
.await?;
self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);
Ok(())
}
}
Expand Down Expand Up @@ -446,17 +452,22 @@ fn convert_metastore_error<T>(
// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
impl Handler<CreateIndexRequest> for ControlPlane {
impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
type Reply = ControlPlaneResult<CreateIndexResponse>;

async fn handle(

async fn handle_message(
&mut self,
request: CreateIndexRequest,
reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
) -> Result<(), ActorExitStatus> {
let response = match self.metastore.create_index(request).await {
Ok(response) => response,
Err(metastore_error) => return convert_metastore_error(metastore_error),
Err(metastore_error) => {
reply(convert_metastore_error(metastore_error)?);
return Ok(());
},
};
let index_metadata: IndexMetadata =
match serde_utils::from_json_str(&response.index_metadata_json) {
Expand All @@ -472,9 +483,16 @@ impl Handler<CreateIndexRequest> for ControlPlane {
self.model.add_index(index_metadata);

if should_rebuild_plan {
self.rebuild_plan_debounced(ctx);
let wait_for_debounced = self.rebuild_plan_debounced(ctx);
tokio::task::spawn(async move {
wait_for_debounced.await;
reply(Ok(response));
});
} else {
reply(Ok(response));
}
Ok(Ok(response))

Ok(())
}
}

Expand Down Expand Up @@ -508,7 +526,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -544,7 +562,7 @@ impl Handler<AddSourceRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -572,7 +590,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;

if mutation_occured {
self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);
}
Ok(Ok(EmptyResponse {}))
}
Expand Down Expand Up @@ -620,7 +638,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {

self.model.delete_source(&source_uid);

self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);
let response = EmptyResponse {};

Ok(Ok(response))
Expand Down Expand Up @@ -656,7 +674,7 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
return Ok(Err(control_plane_error));
}
};
self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);
Ok(Ok(response))
}
}
Expand Down Expand Up @@ -690,7 +708,7 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
self.ingest_controller
.handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress())
.await;
self.rebuild_plan_debounced(ctx);
let _ = self.rebuild_plan_debounced(ctx);
Ok(Ok(()))
}
}
Expand Down
70 changes: 68 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod scheduling;
use std::cmp::Ordering;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
Expand Down Expand Up @@ -56,6 +57,64 @@ pub struct IndexingSchedulerState {
pub last_applied_plan_timestamp: Option<Instant>,
}


pub struct ChangeTracker {
generation_processed_tx: Arc<Mutex<tokio::sync::watch::Sender<usize>>>,
generation_processed_rx: tokio::sync::watch::Receiver<usize>,
generation: usize,
}

impl Default for ChangeTracker {
fn default() -> Self {
let (generation_processed_tx, generation_processed_rx) = tokio::sync::watch::channel(0);
ChangeTracker {
generation_processed_tx: Arc::new(Mutex::new(generation_processed_tx)),
generation_processed_rx,
generation: 1,
}
}
}

impl ChangeTracker {
pub fn current_generation_processed_waiter(&mut self) -> impl std::future::Future<Output = ()> {
let mut generation_processed_rx = self.generation_processed_rx.clone();
let current_generation = self.generation;
async move {
loop {
if *generation_processed_rx.borrow() >= current_generation {
return;
}
if generation_processed_rx.changed().await.is_err() {
return;
}
}
}
}

pub fn start_process_generation(&mut self) -> Arc<NotifyChangeOnDrop> {
let generation = self.generation;
self.generation += 1;
Arc::new(NotifyChangeOnDrop {
generation,
generation_processed_tx: self.generation_processed_tx.clone(),
})
}
}

pub struct NotifyChangeOnDrop {
generation: usize,
generation_processed_tx: Arc<Mutex<tokio::sync::watch::Sender<usize>>>,
}

impl Drop for NotifyChangeOnDrop {
fn drop(&mut self) {
let generation_processed_tx = self.generation_processed_tx.lock().unwrap();
if self.generation < *generation_processed_tx.borrow() {
return;
}
let _ = generation_processed_tx.send(self.generation);
}
}
/// The [`IndexingScheduler`] is responsible for listing indexing tasks and assiging them to
/// indexers.
/// We call this duty `scheduling`. Contrary to what the name suggests, most indexing tasks are
Expand Down Expand Up @@ -100,6 +159,7 @@ pub struct IndexingScheduler {
self_node_id: NodeId,
indexer_pool: IndexerPool,
state: IndexingSchedulerState,
pub(crate) next_rebuild_tracker: ChangeTracker,
}

impl fmt::Debug for IndexingScheduler {
Expand Down Expand Up @@ -187,6 +247,7 @@ impl IndexingScheduler {
self_node_id,
indexer_pool,
state: IndexingSchedulerState::default(),
next_rebuild_tracker: ChangeTracker::default(),
}
}

Expand All @@ -202,6 +263,8 @@ impl IndexingScheduler {
pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();

let notify_on_drop = self.next_rebuild_tracker.start_process_generation();

let sources = get_sources_to_schedule(model);

let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
Expand Down Expand Up @@ -239,7 +302,7 @@ impl IndexingScheduler {
return;
}
}
self.apply_physical_indexing_plan(&indexers, new_physical_plan);
self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop));
self.state.num_schedule_indexing_plan += 1;
}

Expand Down Expand Up @@ -283,7 +346,7 @@ impl IndexingScheduler {
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone());
self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone(), None);
}
}

Expand All @@ -295,12 +358,14 @@ impl IndexingScheduler {
&mut self,
indexers: &[IndexerNodeInfo],
new_physical_plan: PhysicalIndexingPlan,
notify_on_drop: Option<Arc<NotifyChangeOnDrop>>
) {
debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan");
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() {
// We don't want to block on a slow indexer so we apply this change asynchronously
// TODO not blocking is cool, but we need to make sure there is not accumulation
// possible here.
let notify_on_drop = notify_on_drop.clone();
tokio::spawn({
let indexer = indexers
.iter()
Expand All @@ -322,6 +387,7 @@ impl IndexingScheduler {
"failed to apply indexing plan to indexer"
);
}
drop(notify_on_drop);
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_actors::{
};
use quickwit_common::runtimes::RuntimeType;
use quickwit_common::tower::Cost;
use tracing::info;
use tracing::{error, info};
use ulid::Ulid;

use crate::metrics::INGEST_METRICS;
Expand Down Expand Up @@ -153,6 +153,7 @@ impl IngestApiService {
.find(|index_id| !self.queues.queue_exists(index_id));

if let Some(index_id) = first_non_existing_queue_opt {
error!(index_id=%index_id, "failed to find index");
return Err(IngestServiceError::IndexNotFound {
index_id: index_id.to_string(),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ json:
- rfc3339
fast_precision: seconds
fast: true
sleep_after: 3
---
# Create empty index
method: POST
Expand All @@ -45,12 +44,10 @@ json:
- rfc3339
fast_precision: seconds
fast: true
sleep_after: 3
---
# Ingest documents
method: POST
endpoint: aggregations/ingest
num_retries: 10
params:
commit: force
ndjson:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ json:
---
method: POST
endpoint: defaultsearchfields/ingest
num_retries: 10
params:
commit: force
ndjson:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ json:
- name: created_at
type: datetime
fast: true
sleep_after: 3
---
# Create index
method: POST
Expand All @@ -36,7 +35,7 @@ json:
indexed: true
- name: public
type: bool
fast: false
fast: false
indexed: true
- name: created_at
type: datetime
Expand All @@ -48,14 +47,11 @@ json:
fast:
normalizer: lowercase
record: position
sleep_after: 3
---
# Ingest documents
method: POST
endpoint: _bulk
num_retries: 10
params:
refresh: "true"
headers: {"Content-Type": "application/json", "content-encoding": "gzip"}
body_from_file: gharchive-bulk.json.gz
sleep_after: 3
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ json:
- name: login
type: text
fast: true
sleep_after: 6
---
# Ingest documents in index 1
method: POST
Expand Down
Loading

0 comments on commit 351fcae

Please sign in to comment.