Skip to content

Commit

Permalink
refactor: move waiters from notifier to Job
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Jun 27, 2024
1 parent d299592 commit dfb24c3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,14 @@ impl CompactionScheduler {
compaction_region: compaction_region.clone(),
picker_output: picker_output.clone(),
start_time,
waiters,
};

let result = remote_job_scheduler
.schedule(
RemoteJob::CompactionJob(remote_compaction_job),
Box::new(DefaultNotifier {
request_sender: request_sender.clone(),
waiters,
}),
)
.await;
Expand Down
23 changes: 12 additions & 11 deletions src/mito2/src/schedule/remote_job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;

use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::PickerOutput;
use crate::compaction::{
compactor::CompactionRegion, picker::PickerOutput,
};
use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
use crate::manifest::action::RegionEdit;
use crate::metrics::COMPACTION_FAILURE_COUNT;
Expand Down Expand Up @@ -61,7 +62,7 @@ pub trait RemoteJobScheduler: Send + Sync + 'static {
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
/// Notify the mito engine that a remote job is completed.
async fn notify(&mut self, result: RemoteJobResult);
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
}

/// Unique id for a remote job.
Expand All @@ -82,19 +83,19 @@ impl From<JobId> for String {
}

/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job.
#[derive(Clone)]
#[allow(dead_code)]
pub enum RemoteJob {
CompactionJob(CompactionJob),
}

/// CompactionJob is a remote job that compacts a set of files in a compaction service.
#[derive(Clone)]
#[allow(dead_code)]
pub struct CompactionJob {
pub compaction_region: CompactionRegion,
pub picker_output: PickerOutput,
pub start_time: Instant,
/// Send the result of the compaction job to these waiters.
pub waiters: Vec<OutputTx>,
}

/// RemoteJobResult is the result of a remote job.
Expand All @@ -114,29 +115,29 @@ pub struct CompactionJobResult {

/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
pub(crate) struct DefaultNotifier {
/// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed.
pub(crate) request_sender: Sender<WorkerRequest>,
pub(crate) waiters: Vec<OutputTx>,
}

impl DefaultNotifier {
fn on_failure(&mut self, err: Arc<Error>, region_id: RegionId) {
fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in self.waiters.drain(..) {
for waiter in waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
}
}
}

#[async_trait::async_trait]
impl Notifier for DefaultNotifier {
async fn notify(&mut self, result: RemoteJobResult) {
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
match result {
RemoteJobResult::CompactionJobResult(result) => {
let notify = {
match result.region_edit {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: result.region_id,
senders: std::mem::take(&mut self.waiters),
senders: waiters,
start_time: result.start_time,
edit,
}),
Expand All @@ -146,7 +147,7 @@ impl Notifier for DefaultNotifier {
result.region_id, err
);
let err = Arc::new(err);
self.on_failure(err.clone(), result.region_id);
self.on_failure(err.clone(), result.region_id, waiters);
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: result.region_id,
err,
Expand Down

0 comments on commit dfb24c3

Please sign in to comment.