Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce the interface of RemoteJobScheduler #4181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
0d5b790
refactor: add Compactor trait
zyy17 Jun 2, 2024
a91c5d6
chore: add compact() in Compactor trait and expose compaction module
zyy17 Jun 2, 2024
37b0464
refactor: add CompactionRequest and open_compaction_region
zyy17 Jun 2, 2024
014bc22
refactor: export the compaction api
zyy17 Jun 3, 2024
f9d1e66
refactor: add DefaultCompactor::new_from_request
zyy17 Jun 3, 2024
3eb0dab
refactor: no need to pass mito_config in open_compaction_region()
zyy17 Jun 3, 2024
8cd55a2
refactor: CompactionRequest -> &CompactionRequest
zyy17 Jun 3, 2024
891426d
fix: typo
zyy17 Jun 3, 2024
8ff9c6f
docs: add docs for public apis
zyy17 Jun 4, 2024
acabc07
refactor: remove 'Picker' from Compactor
zyy17 Jun 4, 2024
de8496c
chore: add logs
zyy17 Jun 4, 2024
6bee93e
chore: change pub attribute for Picker
zyy17 Jun 4, 2024
a63c9f5
refactor: remove do_merge_ssts()
zyy17 Jun 4, 2024
3a360c0
refactor: update comments
zyy17 Jun 4, 2024
35feb02
refactor: use CompactionRegion argument in Picker
zyy17 Jun 5, 2024
55033f9
chore: make compaction module public and remove unnessary clone
zyy17 Jun 6, 2024
15c102c
refactor: move build_compaction_task() in CompactionScheduler{}
zyy17 Jun 6, 2024
8ad6fff
chore: use in open_compaction_region() and add some comments for pub…
zyy17 Jun 6, 2024
26ae2a9
refactor: add 'manifest_dir()' in store-api
zyy17 Jun 6, 2024
9a05b6a
refactor: move the default implementation to DefaultCompactor
zyy17 Jun 6, 2024
130fb90
refactor: remove Options from MergeOutput
zyy17 Jun 6, 2024
e141e91
chore: minor modification
zyy17 Jun 6, 2024
e3e6f12
fix: clippy errors
zyy17 Jun 6, 2024
93f5a74
fix: unit test errors
zyy17 Jun 6, 2024
17bf852
refactor: remove 'manifest_dir()' from store-api crate(already have o…
zyy17 Jun 6, 2024
48f9398
refactor: use 'region_dir' in CompactionRequest
zyy17 Jun 6, 2024
2f6d9ef
refactor: refine naming
zyy17 Jun 6, 2024
861ea41
chore: sync main branch
zyy17 Jun 7, 2024
d26c64a
refactor: refine naming
zyy17 Jun 7, 2024
4968186
refactor: remove clone()
zyy17 Jun 7, 2024
574fd9f
chore: add comments
zyy17 Jun 7, 2024
77c085c
refactor: add PickerOutput field in CompactorRequest
zyy17 Jun 7, 2024
f8d88a2
chore: sync main branch
zyy17 Jun 16, 2024
f92c729
feat: introduce RemoteJobScheduler
zyy17 Jun 7, 2024
c0c7b6d
feat: add RemoteJobScheudler in schedule_compaction_request()
zyy17 Jun 7, 2024
84a8ff7
refactor: use Option type for senders field of CompactionFinished
zyy17 Jun 8, 2024
f421d08
refactor: modify CompactionJob
zyy17 Jun 8, 2024
6f24668
refactor: schedule remote compaction job by options
zyy17 Jun 9, 2024
cbe2115
refactor: remove unused Options
zyy17 Jun 9, 2024
66ee079
build: remove unused log
zyy17 Jun 9, 2024
3c183ff
refactor: fallback to local compaction if the remote compaction failed
zyy17 Jun 9, 2024
90a4794
fix: clippy errors
zyy17 Jun 17, 2024
c67b9ad
chore: sync main branch
zyy17 Jun 17, 2024
0f5c4d3
refactor: add plugins in mito2
zyy17 Jun 17, 2024
6ffdae2
refactor: add from_u64() for JobId
zyy17 Jun 18, 2024
06fc1e4
refactor: make schedule module public
zyy17 Jun 18, 2024
01b20b2
refactor: add error for RemoteJobScheduler
zyy17 Jun 18, 2024
b740c41
refactor: add Notifier
zyy17 Jun 18, 2024
4404973
refactor: use Arc for Notifier
zyy17 Jun 18, 2024
25bfdf9
chore: sync main branch
zyy17 Jun 19, 2024
1d6efa4
refactor: add 'remote_compaction' in compaction options
zyy17 Jun 19, 2024
07459d3
fix: clippy errors
zyy17 Jun 19, 2024
d08b56e
fix: unrecognized table option
zyy17 Jun 19, 2024
840e642
refactor: add 'start_time' in CompactionJob
zyy17 Jun 22, 2024
fc0e927
refactor: modify error type of RemoteJobScheduler
zyy17 Jun 22, 2024
2731e76
Merge branch 'main' into feat/add-experimental-remote-job-scheduler
zyy17 Jun 24, 2024
1a8a742
chore: revert changes for request
zyy17 Jun 26, 2024
bf55576
refactor: code refactor by review comment
zyy17 Jun 26, 2024
617f8f4
refactor: use string type for JobId
zyy17 Jun 26, 2024
f220543
Merge branch 'main' into feat/add-experimental-remote-job-scheduler
zyy17 Jun 26, 2024
2f92206
refactor: add 'waiters' field in DefaultNotifier
zyy17 Jun 26, 2024
fbef517
fix: build error
zyy17 Jun 26, 2024
613edc8
refactor: take coderabbit's review comment
zyy17 Jun 27, 2024
4e3ccdf
refactor: use uuid::Uuid as JobId
zyy17 Jun 27, 2024
d299592
refactor: return waiters when schedule failed and add on_failure for …
zyy17 Jun 27, 2024
ca1c8fa
refactor: move waiters from notifier to Job
zyy17 Jun 27, 2024
5246a22
refactor: use ObjectStoreManagerRef in open_compaction_region()
zyy17 Jun 28, 2024
a9b2d5e
Merge branch 'main' into feat/add-experimental-remote-job-scheduler
zyy17 Jul 1, 2024
6800383
refactor: implement for JobId and adds related unit tests
zyy17 Jul 1, 2024
6502caa
fix: run unit tests failed
zyy17 Jul 1, 2024
305ee27
Merge branch 'main' into feat/add-experimental-remote-job-scheduler
zyy17 Jul 1, 2024
0f9335d
refactor: add RemoteJobSchedulerError
zyy17 Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ impl DatanodeBuilder {
);

let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines = Self::build_store_engines(opts, object_store_manager).await?;
let engines =
Self::build_store_engines(opts, object_store_manager, self.plugins.clone()).await?;
for engine in engines {
region_server.register_engine(engine);
}
Expand All @@ -357,14 +358,19 @@ impl DatanodeBuilder {
async fn build_store_engines(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut engines = vec![];
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
let mito_engine =
Self::build_mito_engine(opts, object_store_manager.clone(), config.clone())
.await?;
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
config.clone(),
plugins.clone(),
)
.await?;

let metric_engine = MetricEngine::new(mito_engine.clone());
engines.push(Arc::new(mito_engine) as _);
Expand All @@ -387,6 +393,7 @@ impl DatanodeBuilder {
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
plugins: Plugins,
) -> Result<MitoEngine> {
let mito_engine = match &opts.wal {
DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
Expand All @@ -395,6 +402,7 @@ impl DatanodeBuilder {
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
Expand All @@ -403,6 +411,7 @@ impl DatanodeBuilder {
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
Expand Down
150 changes: 101 additions & 49 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use api::v1::region::compact_request;
use common_telemetry::{debug, error};
use common_base::Plugins;
use common_telemetry::{debug, error, info};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
Expand Down Expand Up @@ -59,6 +60,9 @@ use crate::region::options::MergeMode;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::version::LevelMeta;
Expand Down Expand Up @@ -103,6 +107,8 @@ pub(crate) struct CompactionScheduler {
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
/// Plugins for the compaction scheduler.
plugins: Plugins,
}

impl CompactionScheduler {
Expand All @@ -112,6 +118,7 @@ impl CompactionScheduler {
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
plugins: Plugins,
) -> Self {
Self {
scheduler,
Expand All @@ -120,12 +127,13 @@ impl CompactionScheduler {
cache_manager,
engine_config,
listener,
plugins,
}
}

/// Schedules a compaction for the region.
#[allow(clippy::too_many_arguments)]
pub(crate) fn schedule_compaction(
pub(crate) async fn schedule_compaction(
&mut self,
region_id: RegionId,
compact_options: compact_request::Options,
Expand Down Expand Up @@ -153,10 +161,11 @@ impl CompactionScheduler {
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request, compact_options)
.await
}

/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) fn on_compaction_finished(
pub(crate) async fn on_compaction_finished(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
Expand All @@ -175,10 +184,13 @@ impl CompactionScheduler {
self.listener.clone(),
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
) {
if let Err(e) = self
.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
)
.await
{
error!(e; "Failed to schedule next compaction for region {}", region_id);
}
}
Expand Down Expand Up @@ -219,48 +231,13 @@ impl CompactionScheduler {
/// Schedules a compaction request.
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(
async fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = new_picker(options.clone(), &request.current_version.options.compaction);
let region_id = request.region_id();
let Some(mut task) = self.build_compaction_task(request, options) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};

// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
task.run().await;
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);
e
})
}

fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};

// Notifies all pending tasks.
status.on_failure(err);
}

fn build_compaction_task(
&self,
req: CompactionRequest,
options: compact_request::Options,
) -> Option<Box<dyn CompactionTask>> {
let picker = new_picker(options, &req.current_version.options.compaction);
let region_id = req.region_id();
let CompactionRequest {
engine_config,
current_version,
Expand All @@ -271,7 +248,7 @@ impl CompactionScheduler {
cache_manager,
manifest_ctx,
listener,
} = req;
} = request;
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
Expand Down Expand Up @@ -304,20 +281,88 @@ impl CompactionScheduler {
for waiter in waiters {
waiter.send(Ok(0));
}
return None;
self.region_status.remove(&region_id);
return Ok(());
};

// If specified to run compaction remotely, we schedule the compaction job remotely.
// It will fall back to local compaction if there is no remote job scheduler.
let waiters = if current_version.options.compaction.remote_compaction() {
if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
let remote_compaction_job = CompactionJob {
compaction_region: compaction_region.clone(),
picker_output: picker_output.clone(),
start_time,
waiters,
};

let result = remote_job_scheduler
.schedule(
RemoteJob::CompactionJob(remote_compaction_job),
Box::new(DefaultNotifier {
request_sender: request_sender.clone(),
}),
)
.await;

match result {
Ok(job_id) => {
info!(
"Scheduled remote compaction job {} for region {}",
job_id, region_id
);
return Ok(());
}
Err(e) => {
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);

// Return the waiters back to the caller for local compaction.
e.waiters
}
}
} else {
debug!(
"Remote compaction is not enabled, fallback to local compaction for region {}",
region_id
);
waiters
}
} else {
waiters
};

let task = CompactionTaskImpl {
// Create a local compaction task.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create a local compaction task.
// Creates a local compaction task.

let mut local_compaction_task = Box::new(CompactionTaskImpl {
request_sender,
waiters,
start_time,
listener,
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor {}),
});

// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
local_compaction_task.run().await;
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);
e
})
}

fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};

Some(Box::new(task))
// Notifies all pending tasks.
status.on_failure(err);
zyy17 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -602,6 +647,7 @@ mod tests {
waiter,
&manifest_ctx,
)
.await
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
Expand All @@ -620,6 +666,7 @@ mod tests {
waiter,
&manifest_ctx,
)
.await
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
Expand Down Expand Up @@ -659,6 +706,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
Expand Down Expand Up @@ -687,6 +735,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
Expand All @@ -698,7 +747,9 @@ mod tests {
.is_some());

// On compaction finished and schedule next compaction.
scheduler.on_compaction_finished(region_id, &manifest_ctx);
scheduler
.on_compaction_finished(region_id, &manifest_ctx)
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
Expand All @@ -718,6 +769,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(scheduler
Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use std::time::Instant;

use api::region::RegionResponse;
use async_trait::async_trait;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
Expand Down Expand Up @@ -107,11 +108,14 @@ impl MitoEngine {
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
zyy17 marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<MitoEngine> {
config.sanitize(data_home)?;

Ok(MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?),
inner: Arc::new(
EngineInner::new(config, log_store, object_store_manager, plugins).await?,
),
})
}

Expand Down Expand Up @@ -273,11 +277,13 @@ impl EngineInner {
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager, plugins)
.await?,
config,
wal_raw_entry_reader,
})
Expand Down
Loading