Skip to content

Commit

Permalink
Revert "Fix race condition that was surfaced due to the debouncing of…
Browse files Browse the repository at this point in the history
… the"

This reverts commit 351fcae.
  • Loading branch information
fulmicoton committed Mar 6, 2024
1 parent 351fcae commit c41795e
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 108 deletions.
54 changes: 18 additions & 36 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use std::time::Duration;
use anyhow::Context;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use futures::StreamExt;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox, Supervisor, Universe, WeakMailbox
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe,
WeakMailbox,
};
use quickwit_cluster::{
ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode,
Expand Down Expand Up @@ -96,7 +97,6 @@ pub struct ControlPlane {
rebuild_plan_debouncer: Debouncer,
}


impl fmt::Debug for ControlPlane {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ControlPlane").finish()
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Actor for ControlPlane {
.await
.context("failed to initialize the model")?;

let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);

self.ingest_controller.sync_with_all_ingesters(&self.model);

Expand Down Expand Up @@ -313,15 +313,9 @@ impl ControlPlane {
///
/// This method includes some debouncing logic. Every call will be followed by a cooldown
/// period.
///
/// This method returns a future that can awaited to ensure that the relevant rebuild plan
/// operation has been executed.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) -> impl Future<Output = ()> {
let current_generation_processed_waiter =
self.indexing_scheduler.next_rebuild_tracker.current_generation_processed_waiter();
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
self.rebuild_plan_debouncer
.self_send_with_cooldown::<RebuildPlan>(ctx);
current_generation_processed_waiter
}
}

Expand Down Expand Up @@ -376,7 +370,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
ctx.progress(),
)
.await?;
let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);
Ok(())
}
}
Expand Down Expand Up @@ -452,22 +446,17 @@ fn convert_metastore_error<T>(
// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
impl Handler<CreateIndexRequest> for ControlPlane {
type Reply = ControlPlaneResult<CreateIndexResponse>;


async fn handle_message(
async fn handle(
&mut self,
request: CreateIndexRequest,
reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
) -> Result<Self::Reply, ActorExitStatus> {
let response = match self.metastore.create_index(request).await {
Ok(response) => response,
Err(metastore_error) => {
reply(convert_metastore_error(metastore_error)?);
return Ok(());
},
Err(metastore_error) => return convert_metastore_error(metastore_error),
};
let index_metadata: IndexMetadata =
match serde_utils::from_json_str(&response.index_metadata_json) {
Expand All @@ -483,16 +472,9 @@ impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
self.model.add_index(index_metadata);

if should_rebuild_plan {
let wait_for_debounced = self.rebuild_plan_debounced(ctx);
tokio::task::spawn(async move {
wait_for_debounced.await;
reply(Ok(response));
});
} else {
reply(Ok(response));
self.rebuild_plan_debounced(ctx);
}

Ok(())
Ok(Ok(response))
}
}

Expand Down Expand Up @@ -526,7 +508,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -562,7 +544,7 @@ impl Handler<AddSourceRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -590,7 +572,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;

if mutation_occured {
let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);
}
Ok(Ok(EmptyResponse {}))
}
Expand Down Expand Up @@ -638,7 +620,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {

self.model.delete_source(&source_uid);

let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);
let response = EmptyResponse {};

Ok(Ok(response))
Expand Down Expand Up @@ -674,7 +656,7 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
return Ok(Err(control_plane_error));
}
};
let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);
Ok(Ok(response))
}
}
Expand Down Expand Up @@ -708,7 +690,7 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
self.ingest_controller
.handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress())
.await;
let _ = self.rebuild_plan_debounced(ctx);
self.rebuild_plan_debounced(ctx);
Ok(Ok(()))
}
}
Expand Down
70 changes: 2 additions & 68 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod scheduling;
use std::cmp::Ordering;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
Expand Down Expand Up @@ -57,64 +56,6 @@ pub struct IndexingSchedulerState {
pub last_applied_plan_timestamp: Option<Instant>,
}


pub struct ChangeTracker {
generation_processed_tx: Arc<Mutex<tokio::sync::watch::Sender<usize>>>,
generation_processed_rx: tokio::sync::watch::Receiver<usize>,
generation: usize,
}

impl Default for ChangeTracker {
fn default() -> Self {
let (generation_processed_tx, generation_processed_rx) = tokio::sync::watch::channel(0);
ChangeTracker {
generation_processed_tx: Arc::new(Mutex::new(generation_processed_tx)),
generation_processed_rx,
generation: 1,
}
}
}

impl ChangeTracker {
pub fn current_generation_processed_waiter(&mut self) -> impl std::future::Future<Output = ()> {
let mut generation_processed_rx = self.generation_processed_rx.clone();
let current_generation = self.generation;
async move {
loop {
if *generation_processed_rx.borrow() >= current_generation {
return;
}
if generation_processed_rx.changed().await.is_err() {
return;
}
}
}
}

pub fn start_process_generation(&mut self) -> Arc<NotifyChangeOnDrop> {
let generation = self.generation;
self.generation += 1;
Arc::new(NotifyChangeOnDrop {
generation,
generation_processed_tx: self.generation_processed_tx.clone(),
})
}
}

pub struct NotifyChangeOnDrop {
generation: usize,
generation_processed_tx: Arc<Mutex<tokio::sync::watch::Sender<usize>>>,
}

impl Drop for NotifyChangeOnDrop {
fn drop(&mut self) {
let generation_processed_tx = self.generation_processed_tx.lock().unwrap();
if self.generation < *generation_processed_tx.borrow() {
return;
}
let _ = generation_processed_tx.send(self.generation);
}
}
/// The [`IndexingScheduler`] is responsible for listing indexing tasks and assiging them to
/// indexers.
/// We call this duty `scheduling`. Contrary to what the name suggests, most indexing tasks are
Expand Down Expand Up @@ -159,7 +100,6 @@ pub struct IndexingScheduler {
self_node_id: NodeId,
indexer_pool: IndexerPool,
state: IndexingSchedulerState,
pub(crate) next_rebuild_tracker: ChangeTracker,
}

impl fmt::Debug for IndexingScheduler {
Expand Down Expand Up @@ -247,7 +187,6 @@ impl IndexingScheduler {
self_node_id,
indexer_pool,
state: IndexingSchedulerState::default(),
next_rebuild_tracker: ChangeTracker::default(),
}
}

Expand All @@ -263,8 +202,6 @@ impl IndexingScheduler {
pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();

let notify_on_drop = self.next_rebuild_tracker.start_process_generation();

let sources = get_sources_to_schedule(model);

let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
Expand Down Expand Up @@ -302,7 +239,7 @@ impl IndexingScheduler {
return;
}
}
self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop));
self.apply_physical_indexing_plan(&indexers, new_physical_plan);
self.state.num_schedule_indexing_plan += 1;
}

Expand Down Expand Up @@ -346,7 +283,7 @@ impl IndexingScheduler {
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone(), None);
self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone());
}
}

Expand All @@ -358,14 +295,12 @@ impl IndexingScheduler {
&mut self,
indexers: &[IndexerNodeInfo],
new_physical_plan: PhysicalIndexingPlan,
notify_on_drop: Option<Arc<NotifyChangeOnDrop>>
) {
debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan");
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() {
// We don't want to block on a slow indexer so we apply this change asynchronously
// TODO not blocking is cool, but we need to make sure there is not accumulation
// possible here.
let notify_on_drop = notify_on_drop.clone();
tokio::spawn({
let indexer = indexers
.iter()
Expand All @@ -387,7 +322,6 @@ impl IndexingScheduler {
"failed to apply indexing plan to indexer"
);
}
drop(notify_on_drop);
}
});
}
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_actors::{
};
use quickwit_common::runtimes::RuntimeType;
use quickwit_common::tower::Cost;
use tracing::{error, info};
use tracing::info;
use ulid::Ulid;

use crate::metrics::INGEST_METRICS;
Expand Down Expand Up @@ -153,7 +153,6 @@ impl IngestApiService {
.find(|index_id| !self.queues.queue_exists(index_id));

if let Some(index_id) = first_non_existing_queue_opt {
error!(index_id=%index_id, "failed to find index");
return Err(IngestServiceError::IndexNotFound {
index_id: index_id.to_string(),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ json:
- rfc3339
fast_precision: seconds
fast: true
sleep_after: 3
---
# Create empty index
method: POST
Expand All @@ -44,10 +45,12 @@ json:
- rfc3339
fast_precision: seconds
fast: true
sleep_after: 3
---
# Ingest documents
method: POST
endpoint: aggregations/ingest
num_retries: 10
params:
commit: force
ndjson:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ json:
---
method: POST
endpoint: defaultsearchfields/ingest
num_retries: 10
params:
commit: force
ndjson:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ json:
- name: created_at
type: datetime
fast: true
sleep_after: 3
---
# Create index
method: POST
Expand All @@ -35,7 +36,7 @@ json:
indexed: true
- name: public
type: bool
fast: false
fast: false
indexed: true
- name: created_at
type: datetime
Expand All @@ -47,11 +48,14 @@ json:
fast:
normalizer: lowercase
record: position
sleep_after: 3
---
# Ingest documents
method: POST
endpoint: _bulk
num_retries: 10
params:
refresh: "true"
headers: {"Content-Type": "application/json", "content-encoding": "gzip"}
body_from_file: gharchive-bulk.json.gz
sleep_after: 3
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ json:
- name: login
type: text
fast: true
sleep_after: 6
---
# Ingest documents in index 1
method: POST
Expand Down
Loading

0 comments on commit c41795e

Please sign in to comment.