diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index cd0cb3710223..8b1b543780e5 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -44,6 +44,8 @@ mod time_provider; pub mod wal; mod worker; +pub use worker::{DdlInterceptor, DdlInterceptorRef}; + #[cfg_attr(doc, aquamarine::aquamarine)] /// # Mito developer document /// diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d88bc994e97e..3c53841e3a69 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -599,7 +599,7 @@ impl WorkerRequest { /// DDL request to a region. #[derive(Debug)] -pub(crate) enum DdlRequest { +pub enum DdlRequest { Create(RegionCreateRequest), Drop(RegionDropRequest), Open((RegionOpenRequest, Option)), @@ -613,7 +613,7 @@ pub(crate) enum DdlRequest { /// Sender and Ddl request. #[derive(Debug)] -pub(crate) struct SenderDdlRequest { +pub struct SenderDdlRequest { /// Region id of the request. pub(crate) region_id: RegionId, /// Result sender. @@ -622,6 +622,12 @@ pub(crate) struct SenderDdlRequest { pub(crate) request: DdlRequest, } +impl SenderDdlRequest { + pub fn ddl_request(&self) -> &DdlRequest { + &self.request + } +} + /// Notification from a background job. #[derive(Debug)] pub(crate) enum BackgroundNotify { diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index e869e5ee1a5c..022b29a4b951 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -96,7 +96,7 @@ impl WalEntryDistributor { /// Receives the Wal entries from [WalEntryDistributor]. #[derive(Debug)] -pub(crate) struct WalEntryReceiver { +pub struct WalEntryReceiver { /// Receives the [Entry] from the [WalEntryDistributor]. entry_receiver: Option>, /// Sends the `start_id` to the [WalEntryDistributor]. @@ -176,7 +176,7 @@ pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048; /// | /// // may deadlock | /// distributor.distribute().await; | -/// | +/// | /// | /// receivers[0].read().await | /// ``` diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3aff7764f082..9fa8202de412 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -443,6 +443,7 @@ impl WorkerStarter { stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]), region_count: REGION_COUNT.with_label_values(&[&id_string]), region_edit_queues: RegionEditQueues::default(), + plugins: self.plugins.clone(), }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -633,6 +634,7 @@ struct RegionWorkerLoop { region_count: IntGauge, /// Queues for region edit requests. region_edit_queues: RegionEditQueues, + plugins: Plugins, } impl RegionWorkerLoop { @@ -757,6 +759,21 @@ impl RegionWorkerLoop { } for ddl in ddl_requests { + if let Some(interceptor) = self.plugins.get::() { + match interceptor.pre_handle_ddl(&ddl).await { + Ok(is_skip) if !is_skip => { + ddl.sender.send(Ok(0)); + info!("Skip DDL request due to interceptor, detail: {:?}", ddl); + continue; + } + Err(e) => { + ddl.sender.send(Err(e)); + continue; + } + _ => {} + } + } + let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, @@ -952,6 +969,16 @@ impl WorkerListener { } } +#[async_trait::async_trait] +pub trait DdlInterceptor { + /// Returns Ok(true) if the DDL request should be handled. + /// Returns Ok(false) if the DDL request should be skipped. + /// Returns error if some errors occurred in pre_handle_ddl method. + async fn pre_handle_ddl(&self, req: &SenderDdlRequest) -> Result; +} + +pub type DdlInterceptorRef = Arc; + #[cfg(test)] mod tests { use super::*;