From dfb24c339349e3d6149d1f82681ff2e4f66bf6d7 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 27 Jun 2024 22:46:19 +0800 Subject: [PATCH] refactor: move waiters from notifier to Job --- src/mito2/src/compaction.rs | 2 +- .../src/schedule/remote_job_scheduler.rs | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index c147345a4ce9..8bfd159298b3 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -291,6 +291,7 @@ impl CompactionScheduler { compaction_region: compaction_region.clone(), picker_output: picker_output.clone(), start_time, + waiters, }; let result = remote_job_scheduler @@ -298,7 +299,6 @@ impl CompactionScheduler { RemoteJob::CompactionJob(remote_compaction_job), Box::new(DefaultNotifier { request_sender: request_sender.clone(), - waiters, }), ) .await; diff --git a/src/mito2/src/schedule/remote_job_scheduler.rs b/src/mito2/src/schedule/remote_job_scheduler.rs index 66856cba6879..8b90451c2520 100644 --- a/src/mito2/src/schedule/remote_job_scheduler.rs +++ b/src/mito2/src/schedule/remote_job_scheduler.rs @@ -22,8 +22,9 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::Sender; use uuid::Uuid; -use crate::compaction::compactor::CompactionRegion; -use crate::compaction::picker::PickerOutput; +use crate::compaction::{ + compactor::CompactionRegion, picker::PickerOutput, +}; use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result}; use crate::manifest::action::RegionEdit; use crate::metrics::COMPACTION_FAILURE_COUNT; @@ -61,7 +62,7 @@ pub trait RemoteJobScheduler: Send + Sync + 'static { #[async_trait::async_trait] pub trait Notifier: Send + Sync + 'static { /// Notify the mito engine that a remote job is completed. - async fn notify(&mut self, result: RemoteJobResult); + async fn notify(&self, result: RemoteJobResult, waiters: Vec); } /// Unique id for a remote job. @@ -82,19 +83,19 @@ impl From for String { } /// RemoteJob is a job that can be executed remotely. For example, a remote compaction job. -#[derive(Clone)] #[allow(dead_code)] pub enum RemoteJob { CompactionJob(CompactionJob), } /// CompactionJob is a remote job that compacts a set of files in a compaction service. -#[derive(Clone)] #[allow(dead_code)] pub struct CompactionJob { pub compaction_region: CompactionRegion, pub picker_output: PickerOutput, pub start_time: Instant, + /// Send the result of the compaction job to these waiters. + pub waiters: Vec, } /// RemoteJobResult is the result of a remote job. @@ -114,14 +115,14 @@ pub struct CompactionJobResult { /// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine. pub(crate) struct DefaultNotifier { + /// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed. pub(crate) request_sender: Sender, - pub(crate) waiters: Vec, } impl DefaultNotifier { - fn on_failure(&mut self, err: Arc, region_id: RegionId) { + fn on_failure(&self, err: Arc, region_id: RegionId, mut waiters: Vec) { COMPACTION_FAILURE_COUNT.inc(); - for waiter in self.waiters.drain(..) { + for waiter in waiters.drain(..) { waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); } } @@ -129,14 +130,14 @@ impl DefaultNotifier { #[async_trait::async_trait] impl Notifier for DefaultNotifier { - async fn notify(&mut self, result: RemoteJobResult) { + async fn notify(&self, result: RemoteJobResult, waiters: Vec) { match result { RemoteJobResult::CompactionJobResult(result) => { let notify = { match result.region_edit { Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { region_id: result.region_id, - senders: std::mem::take(&mut self.waiters), + senders: waiters, start_time: result.start_time, edit, }), @@ -146,7 +147,7 @@ impl Notifier for DefaultNotifier { result.region_id, err ); let err = Arc::new(err); - self.on_failure(err.clone(), result.region_id); + self.on_failure(err.clone(), result.region_id, waiters); BackgroundNotify::CompactionFailed(CompactionFailed { region_id: result.region_id, err,