diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d7f2ea034d75..62912b8ffb10 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -33,6 +33,8 @@ mod create_test; #[cfg(test)] mod drop_test; #[cfg(test)] +mod edit_region_test; +#[cfg(test)] mod filter_deleted_test; #[cfg(test)] mod flush_test; @@ -88,7 +90,7 @@ use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner}; use crate::region::RegionUsage; -use crate::request::WorkerRequest; +use crate::request::{RegionEditRequest, WorkerRequest}; use crate::wal::entry_distributor::{ build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, }; @@ -196,11 +198,11 @@ impl MitoEngine { ); let (tx, rx) = oneshot::channel(); - let request = WorkerRequest::EditRegion { + let request = WorkerRequest::EditRegion(RegionEditRequest { region_id, edit, tx, - }; + }); self.inner .workers .submit_to_worker(region_id, request) diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs new file mode 100644 index 000000000000..8dd682a37269 --- /dev/null +++ b/src/mito2/src/engine/edit_region_test.rs @@ -0,0 +1,120 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use object_store::ObjectStore; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::RegionId; +use tokio::sync::Barrier; + +use crate::config::MitoConfig; +use crate::engine::MitoEngine; +use crate::manifest::action::RegionEdit; +use crate::region::MitoRegionRef; +use crate::sst::file::{FileId, FileMeta}; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_edit_region_concurrently() { + const EDITS_PER_TASK: usize = 10; + let tasks_count = 10; + + // A task that creates SST files and edits the region with them. + struct Task { + region: MitoRegionRef, + ssts: Vec, + } + + impl Task { + async fn create_ssts(&mut self, object_store: &ObjectStore) { + for _ in 0..EDITS_PER_TASK { + let file = FileMeta { + region_id: self.region.region_id, + file_id: FileId::random(), + level: 0, + ..Default::default() + }; + object_store + .write( + &format!("{}/{}.parquet", self.region.region_dir(), file.file_id), + b"x".as_slice(), + ) + .await + .unwrap(); + self.ssts.push(file); + } + } + + async fn edit_region(self, engine: MitoEngine) { + for sst in self.ssts { + let edit = RegionEdit { + files_to_add: vec![sst], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + engine + .edit_region(self.region.region_id, edit) + .await + .unwrap(); + } + } + } + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let mut tasks = Vec::with_capacity(tasks_count); + let object_store = env.get_object_store().unwrap(); + for _ in 0..tasks_count { + let mut task = Task { + region: region.clone(), + ssts: Vec::new(), + }; + task.create_ssts(&object_store).await; + tasks.push(task); + } + + let mut futures = Vec::with_capacity(tasks_count); + let barrier = Arc::new(Barrier::new(tasks_count)); + for task in tasks { + futures.push(tokio::spawn({ + let barrier = barrier.clone(); + let engine = engine.clone(); + async move { + barrier.wait().await; + task.edit_region(engine).await; + } + })); + } + futures::future::join_all(futures).await; + + assert_eq!( + region.version().ssts.levels()[0].files.len(), + tasks_count * EDITS_PER_TASK + ); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 140966ac336f..2e5826df4dc6 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -842,6 +842,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Region {} is busy", region_id))] + RegionBusy { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -973,6 +980,7 @@ impl ErrorExt for Error { | FulltextFinish { source, .. } | ApplyFulltextIndex { source, .. } => source.status_code(), DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal, + RegionBusy { .. } => StatusCode::RegionBusy, } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 780c85b1d1e0..d88bc994e97e 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -494,11 +494,7 @@ pub(crate) enum WorkerRequest { Stop, /// Use [RegionEdit] to edit a region directly. - EditRegion { - region_id: RegionId, - edit: RegionEdit, - tx: Sender>, - }, + EditRegion(RegionEditRequest), } impl WorkerRequest { @@ -762,6 +758,15 @@ pub(crate) struct RegionChangeResult { pub(crate) result: Result<()>, } +/// Request to edit a region directly. +#[derive(Debug)] +pub(crate) struct RegionEditRequest { + pub(crate) region_id: RegionId, + pub(crate) edit: RegionEdit, + /// The sender to notify the result to the region engine. + pub(crate) tx: Sender>, +} + /// Notifies the regin the result of editing region. #[derive(Debug)] pub(crate) struct RegionEditResult { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 82b48bcebb29..3aff7764f082 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -61,6 +61,7 @@ use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::wal::Wal; +use crate::worker::handle_manifest::RegionEditQueues; /// Identifier for a worker. pub(crate) type WorkerId = u32; @@ -441,6 +442,7 @@ impl WorkerStarter { flush_receiver: self.flush_receiver, stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]), region_count: REGION_COUNT.with_label_values(&[&id_string]), + region_edit_queues: RegionEditQueues::default(), }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -629,6 +631,8 @@ struct RegionWorkerLoop { stalled_count: IntGauge, /// Gauge of regions in the worker. region_count: IntGauge, + /// Queues for region edit requests. + region_edit_queues: RegionEditQueues, } impl RegionWorkerLoop { @@ -727,12 +731,8 @@ impl RegionWorkerLoop { WorkerRequest::SetReadonlyGracefully { region_id, sender } => { self.set_readonly_gracefully(region_id, sender).await; } - WorkerRequest::EditRegion { - region_id, - edit, - tx, - } => { - self.handle_region_edit(region_id, edit, tx).await; + WorkerRequest::EditRegion(request) => { + self.handle_region_edit(request).await; } // We receive a stop signal, but we still want to process remaining // requests. The worker thread will then check the running flag and @@ -824,7 +824,7 @@ impl RegionWorkerLoop { BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await, BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req), - BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req), + BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await, } } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index e12f139b5b71..4ca2fc9c9fcb 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -16,38 +16,89 @@ //! //! It updates the manifest and applies the changes to the region in background. +use std::collections::{HashMap, VecDeque}; + use common_telemetry::{info, warn}; use snafu::ensure; use store_api::storage::RegionId; -use tokio::sync::oneshot::Sender; -use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{InvalidRequestSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; use crate::region::{MitoRegionRef, RegionState}; use crate::request::{ - BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditResult, TruncateResult, - WorkerRequest, + BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult, + TruncateResult, WorkerRequest, }; use crate::worker::RegionWorkerLoop; +pub(crate) type RegionEditQueues = HashMap; + +/// A queue for temporary store region edit requests, if the region is in the "Editing" state. +/// When the current region edit request is completed, the next (if there exists) request in the +/// queue will be processed. +/// Everything is done in the region worker loop. +pub(crate) struct RegionEditQueue { + region_id: RegionId, + requests: VecDeque, +} + +impl RegionEditQueue { + const QUEUE_MAX_LEN: usize = 128; + + fn new(region_id: RegionId) -> Self { + Self { + region_id, + requests: VecDeque::new(), + } + } + + fn enqueue(&mut self, request: RegionEditRequest) { + if self.requests.len() > Self::QUEUE_MAX_LEN { + let _ = request.tx.send( + RegionBusySnafu { + region_id: self.region_id, + } + .fail(), + ); + return; + }; + self.requests.push_back(request); + } + + fn dequeue(&mut self) -> Option { + self.requests.pop_front() + } +} + impl RegionWorkerLoop { /// Handles region edit request. - pub(crate) async fn handle_region_edit( - &self, - region_id: RegionId, - edit: RegionEdit, - sender: Sender>, - ) { - let region = match self.regions.writable_region(region_id) { - Ok(region) => region, - Err(e) => { - let _ = sender.send(Err(e)); - return; - } + pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) { + let region_id = request.region_id; + let Some(region) = self.regions.get_region(region_id) else { + let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail()); + return; }; + if !region.is_writable() { + if region.state() == RegionState::Editing { + self.region_edit_queues + .entry(region_id) + .or_insert_with(|| RegionEditQueue::new(region_id)) + .enqueue(request); + } else { + let _ = request.tx.send(RegionBusySnafu { region_id }.fail()); + } + return; + } + + let RegionEditRequest { + region_id: _, + edit, + tx: sender, + } = request; + // Marks the region as editing. if let Err(e) = region.set_editing() { let _ = sender.send(Err(e)); @@ -79,7 +130,7 @@ impl RegionWorkerLoop { } /// Handles region edit result. - pub(crate) fn handle_region_edit_result(&self, edit_result: RegionEditResult) { + pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) { let region = match self.regions.get_region(edit_result.region_id) { Some(region) => region, None => { @@ -104,6 +155,12 @@ impl RegionWorkerLoop { region.switch_state_to_writable(RegionState::Editing); let _ = edit_result.sender.send(edit_result.result); + + if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) { + if let Some(request) = edit_queue.dequeue() { + self.handle_region_edit(request).await; + } + } } /// Writes truncate action to the manifest and then applies it to the region in background.