Skip to content

Commit

Permalink
add intereptor for mito2 ddl handle
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Aug 26, 2024
1 parent 3973d6b commit ecf76de
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ mod time_provider;
pub mod wal;
mod worker;

pub use worker::{DdlInterceptor, DdlInterceptorRef};

#[cfg_attr(doc, aquamarine::aquamarine)]
/// # Mito developer document
///
Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ impl WorkerRequest {

/// DDL request to a region.
#[derive(Debug)]
pub(crate) enum DdlRequest {
pub enum DdlRequest {
Create(RegionCreateRequest),
Drop(RegionDropRequest),
Open((RegionOpenRequest, Option<WalEntryReceiver>)),
Expand All @@ -613,7 +613,7 @@ pub(crate) enum DdlRequest {

/// Sender and Ddl request.
#[derive(Debug)]
pub(crate) struct SenderDdlRequest {
pub struct SenderDdlRequest {
/// Region id of the request.
pub(crate) region_id: RegionId,
/// Result sender.
Expand All @@ -622,6 +622,12 @@ pub(crate) struct SenderDdlRequest {
pub(crate) request: DdlRequest,
}

impl SenderDdlRequest {
pub fn ddl_request(&self) -> &DdlRequest {
&self.request
}
}

/// Notification from a background job.
#[derive(Debug)]
pub(crate) enum BackgroundNotify {
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/wal/entry_distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl WalEntryDistributor {

/// Receives the Wal entries from [WalEntryDistributor].
#[derive(Debug)]
pub(crate) struct WalEntryReceiver {
pub struct WalEntryReceiver {
/// Receives the [Entry] from the [WalEntryDistributor].
entry_receiver: Option<Receiver<Entry>>,
/// Sends the `start_id` to the [WalEntryDistributor].
Expand Down Expand Up @@ -176,7 +176,7 @@ pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
/// |
/// // may deadlock |
/// distributor.distribute().await; |
/// |
/// |
/// |
/// receivers[0].read().await |
/// ```
Expand Down
27 changes: 27 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ impl<S: LogStore> WorkerStarter<S> {
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
plugins: self.plugins.clone(),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -633,6 +634,7 @@ struct RegionWorkerLoop<S> {
region_count: IntGauge,
/// Queues for region edit requests.
region_edit_queues: RegionEditQueues,
plugins: Plugins,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand Down Expand Up @@ -757,6 +759,21 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}

for ddl in ddl_requests {
if let Some(interceptor) = self.plugins.get::<DdlInterceptorRef>() {
match interceptor.pre_handle_ddl(&ddl).await {
Ok(is_skip) if !is_skip => {
ddl.sender.send(Ok(0));
info!("Skip DDL request due to interceptor, detail: {:?}", ddl);
continue;
}
Err(e) => {
ddl.sender.send(Err(e));
continue;
}
_ => {}
}
}

let res = match ddl.request {
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
Expand Down Expand Up @@ -952,6 +969,16 @@ impl WorkerListener {
}
}

#[async_trait::async_trait]
pub trait DdlInterceptor {
/// Returns Ok(true) if the DDL request should be handled.
/// Returns Ok(false) if the DDL request should be skipped.
/// Returns error if some errors occurred in pre_handle_ddl method.
async fn pre_handle_ddl(&self, req: &SenderDdlRequest) -> Result<bool>;
}

pub type DdlInterceptorRef = Arc<dyn DdlInterceptor + Send + Sync>;

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit ecf76de

Please sign in to comment.