Skip to content

Commit

Permalink
Integrating the finalize merge policy into merge policies themselves.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 9, 2024
1 parent b111c03 commit 9a50f0f
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 86 deletions.
3 changes: 0 additions & 3 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ pub struct IndexingSettings {
pub merge_policy: MergePolicyConfig,
#[serde(default)]
pub resources: IndexingResources,
#[serde(default = "MergePolicyConfig::noop")]
pub finalize_merge_policy: MergePolicyConfig,
}

impl IndexingSettings {
Expand Down Expand Up @@ -163,7 +161,6 @@ impl Default for IndexingSettings {
docstore_compression_level: Self::default_docstore_compression_level(),
split_num_docs_target: Self::default_split_num_docs_target(),
merge_policy: MergePolicyConfig::default(),
finalize_merge_policy: MergePolicyConfig::noop(),
resources: IndexingResources::default(),
}
}
Expand Down
15 changes: 15 additions & 0 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use std::time::Duration;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

fn is_zero(value: &usize) -> bool {
*value == 0
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ConstWriteAmplificationMergePolicyConfig {
Expand All @@ -42,6 +46,15 @@ pub struct ConstWriteAmplificationMergePolicyConfig {
#[serde(deserialize_with = "parse_human_duration")]
#[serde(serialize_with = "serialize_duration")]
pub maturation_period: Duration,
#[serde(default)]
#[serde(skip_serializing_if = "is_zero")]
pub max_finalize_merge_operations: usize,
/// Splits with a number of docs higher than
/// `max_finalize_split_num_docs` will not be considered
/// for finalize split merge operations.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_finalize_split_num_docs: Option<usize>,
}

impl Default for ConstWriteAmplificationMergePolicyConfig {
Expand All @@ -51,6 +64,8 @@ impl Default for ConstWriteAmplificationMergePolicyConfig {
merge_factor: default_merge_factor(),
max_merge_factor: default_max_merge_factor(),
maturation_period: default_maturation_period(),
max_finalize_merge_operations: 0,
max_finalize_split_num_docs: None,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {

use super::{IndexingPipeline, *};
use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::{default_merge_policy, NopMergePolicy};
use crate::merge_policy::default_merge_policy;

#[test]
fn test_wait_duration() {
Expand Down Expand Up @@ -908,7 +908,6 @@ mod tests {
metastore: metastore.clone(),
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
finalize_merge_policy: Arc::new(NopMergePolicy),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
Expand Down
13 changes: 3 additions & 10 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,8 @@ impl IndexingService {
let message = format!("failed to spawn indexing pipeline: {error}");
IndexingError::Internal(message)
})?;
let merge_policy = crate::merge_policy::merge_policy_from_settings(
index_config.indexing_settings.merge_policy.clone(),
&index_config.indexing_settings,
);
let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings(
index_config.indexing_settings.finalize_merge_policy.clone(),
&index_config.indexing_settings,
);
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
Expand All @@ -306,7 +300,6 @@ impl IndexingService {
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
finalize_merge_policy,
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
Expand Down Expand Up @@ -1203,7 +1196,7 @@ mod tests {

#[tokio::test]
async fn test_indexing_service_apply_plan() {
const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64;
const PARAMS_FINGERPRINT: u64 = 3865067856550546352;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down
46 changes: 34 additions & 12 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

use super::merge_planner::RunFinalizeMergePolicy;
use super::publisher::DisconnectMergePlanner;
use super::MergeSchedulerService;
use super::{MergeSchedulerService, RunFinalizeMergePolicyAndQuit};
use crate::actors::indexing_pipeline::wait_duration_before_retry;
use crate::actors::merge_split_downloader::MergeSplitDownloader;
use crate::actors::publisher::PublisherType;
Expand Down Expand Up @@ -114,7 +113,7 @@ pub struct MergePipeline {
kill_switch: KillSwitch,
/// Immature splits passed to the merge planner the first time the pipeline is spawned.
initial_immature_splits_opt: Option<Vec<SplitMetadata>>,
// After it is set to true, we don't respawn.
// After it is set to true, we don't respawn pipeline actors if they fail.
shutdown_initiated: bool,
}

Expand Down Expand Up @@ -352,7 +351,6 @@ impl MergePipeline {
&self.params.pipeline_id,
immature_splits,
self.params.merge_policy.clone(),
self.params.finalize_merge_policy.clone(),
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
Expand Down Expand Up @@ -434,6 +432,7 @@ impl MergePipeline {
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 });
}
Health::Success => {
info!(index_uid=%self.params.pipeline_id.index_uid, "merge pipeline success, shutting down");
return Err(ActorExitStatus::Success);
}
}
Expand Down Expand Up @@ -497,17 +496,29 @@ impl Handler<FinishPendingMergesAndShutdownPipeline> for MergePipeline {
_: FinishPendingMergesAndShutdownPipeline,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
info!(index_uid=%self.params.pipeline_id.index_uid, "shutdown merge pipeline initiated");
// From now on, we will not respawn the pipeline if it fails.
self.shutdown_initiated = true;
if let Some(handles) = &self.handles_opt {
// This disconnects the merge planner from the merge publisher,
// breaking the merge planner pipeline loop.
//
// As a result, the pipeline will naturally terminate
// once all of the pending / ongoing merge operations are completed.
let _ = handles
.merge_publisher
.mailbox()
.send_message(DisconnectMergePlanner);
.send_message(DisconnectMergePlanner)
.await;

// We also initiate the merge planner finalization routine.
// Depending on the merge policy, it may emit a few more merge
// operations.
let _ = handles
.merge_planner
.mailbox()
.send_message(RunFinalizeMergePolicy);
.send_message(RunFinalizeMergePolicyAndQuit)
.await;
} else {
// we won't respawn the pipeline in the future, so there is nothing
// to do here.
Expand Down Expand Up @@ -561,7 +572,6 @@ pub struct MergePipelineParams {
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub finalize_merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
Expand All @@ -583,7 +593,8 @@ mod tests {
use quickwit_storage::RamStorage;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::{default_merge_policy, nop_merge_policy};
use crate::actors::{MergePlanner, Publisher};
use crate::merge_policy::default_merge_policy;
use crate::IndexingSplitStore;

#[tokio::test]
Expand Down Expand Up @@ -624,18 +635,29 @@ mod tests {
merge_scheduler_service: universe.get_or_spawn_one(),
split_store,
merge_policy: default_merge_policy(),
finalize_merge_policy: nop_merge_policy(),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
};
let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx());
let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline);
let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.quit().await;
let _merge_planner_mailbox = pipeline.merge_planner_mailbox().clone();
let (pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline);
pipeline_mailbox
.ask(super::FinishPendingMergesAndShutdownPipeline)
.await
.unwrap();

let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await;
assert_eq!(pipeline_statistics.generation, 1);
assert_eq!(pipeline_statistics.num_spawn_attempts, 1);
assert_eq!(pipeline_statistics.num_published_splits, 0);
assert!(matches!(pipeline_exit_status, ActorExitStatus::Quit));
assert!(matches!(pipeline_exit_status, ActorExitStatus::Success));

// Checking that the merge pipeline actors have been properly cleaned up.
assert!(universe.get_one::<MergePlanner>().is_none());
assert!(universe.get_one::<Publisher>().is_none());
assert!(universe.get_one::<MergePipeline>().is_none());

universe.assert_quit().await;
Ok(())
}
Expand Down
40 changes: 11 additions & 29 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::models::NewSplits;
use crate::MergePolicy;

#[derive(Debug)]
pub(crate) struct RunFinalizeMergePolicy;
pub(crate) struct RunFinalizeMergePolicyAndQuit;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MergePartition {
Expand Down Expand Up @@ -81,7 +81,6 @@ pub struct MergePlanner {
known_split_ids_recompute_attempt_id: usize,

merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,

merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
Expand Down Expand Up @@ -132,12 +131,12 @@ impl Actor for MergePlanner {
}

#[async_trait]
impl Handler<RunFinalizeMergePolicy> for MergePlanner {
impl Handler<RunFinalizeMergePolicyAndQuit> for MergePlanner {
type Reply = ();

async fn handle(
&mut self,
_plan_merge: RunFinalizeMergePolicy,
_plan_merge: RunFinalizeMergePolicyAndQuit,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// Note we ignore messages that could be coming from a different incarnation.
Expand Down Expand Up @@ -193,7 +192,6 @@ impl MergePlanner {
pipeline_id: &MergePipelineId,
immature_splits: Vec<SplitMetadata>,
merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
) -> MergePlanner {
Expand All @@ -206,7 +204,6 @@ impl MergePlanner {
known_split_ids_recompute_attempt_id: 0,
partitioned_young_splits: Default::default(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
merge_scheduler_service,
ongoing_merge_operations_inventory: Inventory::default(),
Expand Down Expand Up @@ -302,12 +299,12 @@ impl MergePlanner {
let mut merge_operations = Vec::new();
for young_splits in self.partitioned_young_splits.values_mut() {
if !young_splits.is_empty() {
let merge_policy = if is_finalize {
&self.finalize_merge_policy
let operations = if is_finalize {
self.merge_policy.finalize_operations(young_splits)
} else {
&self.merge_policy
self.merge_policy.operations(young_splits)
};
merge_operations.extend(merge_policy.operations(young_splits));
merge_operations.extend(operations);
}
ctx.record_progress();
ctx.yield_now().await;
Expand Down Expand Up @@ -380,7 +377,7 @@ mod tests {

use crate::actors::MergePlanner;
use crate::merge_policy::{
merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy,
merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy,
};
use crate::models::NewSplits;

Expand Down Expand Up @@ -441,7 +438,6 @@ mod tests {
&pipeline_id,
Vec::new(),
merge_policy,
Arc::new(NopMergePolicy),
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -562,15 +558,11 @@ mod tests {
2,
),
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -658,17 +650,11 @@ mod tests {
2,
),
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(
indexing_settings.finalize_merge_policy.clone(),
&indexing_settings,
);
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -729,15 +715,11 @@ mod tests {
2,
),
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineParams};
pub use indexing_service::{IndexingService, IndexingServiceCounters, INDEXING_DIR_NAME};
pub use merge_executor::{combine_partition_ids, merge_split_attrs, MergeExecutor};
pub use merge_pipeline::MergePipeline;
pub use merge_planner::MergePlanner;
pub(crate) use merge_planner::{MergePlanner, RunFinalizeMergePolicyAndQuit};
pub use merge_scheduler_service::{schedule_merge, MergePermit, MergeSchedulerService};
pub use merge_split_downloader::MergeSplitDownloader;
pub use packager::Packager;
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ impl Handler<DisconnectMergePlanner> for Publisher {
_: DisconnectMergePlanner,
_ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
info!("disconnecting merge planner mailbox.");
self.merge_planner_mailbox_opt.take();
info!("disconnecting merge planner mailbox");
self.merge_planner_mailbox_opt = None;
Ok(())
}
}
Expand Down
Loading

0 comments on commit 9a50f0f

Please sign in to comment.