Skip to content

Commit

Permalink
feat: open region in background (#4052)
Browse files Browse the repository at this point in the history
* feat: open region in background

* feat: trace opening regions

* feat: wait for the opening region

* feat: let engine to handle the future open request

* fix: fix `test_region_registering`
  • Loading branch information
WenyXu authored May 28, 2024
1 parent d386067 commit 4aa756c
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 37 deletions.
21 changes: 8 additions & 13 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,7 @@ impl RegionServerInner {
let engine = match region_change {
RegionChange::Register(attribute) => match current_region_status {
Some(status) => match status.clone() {
RegionEngineWithStatus::Registering(_) => {
return Ok(CurrentEngine::EarlyReturn(0))
}
RegionEngineWithStatus::Registering(engine) => engine,
RegionEngineWithStatus::Deregistering(_) => {
return error::RegionBusySnafu { region_id }.fail()
}
Expand Down Expand Up @@ -781,34 +779,32 @@ mod tests {
let mut mock_region_server = mock_region_server();
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
let engine_name = engine.name();

mock_region_server.register_engine(engine.clone());

let region_id = RegionId::new(1, 1);
let builder = CreateRequestBuilder::new();
let create_req = builder.build();

// Tries to create/open a registering region.
mock_region_server.inner.region_map.insert(
region_id,
RegionEngineWithStatus::Registering(engine.clone()),
);

let response = mock_region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
assert_eq!(response.affected_rows, 0);

let status = mock_region_server
.inner
.region_map
.get(&region_id)
.unwrap()
.clone();
assert!(matches!(status, RegionEngineWithStatus::Ready(_)));

assert!(matches!(status, RegionEngineWithStatus::Registering(_)));

mock_region_server.inner.region_map.insert(
region_id,
RegionEngineWithStatus::Registering(engine.clone()),
);
let response = mock_region_server
.handle_request(
region_id,
Expand All @@ -822,14 +818,13 @@ mod tests {
.await
.unwrap();
assert_eq!(response.affected_rows, 0);

let status = mock_region_server
.inner
.region_map
.get(&region_id)
.unwrap()
.clone();
assert!(matches!(status, RegionEngineWithStatus::Registering(_)));
assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
}

#[tokio::test]
Expand Down Expand Up @@ -1020,7 +1015,7 @@ mod tests {
region_change: RegionChange::Register(RegionAttribute::Mito),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
assert_matches!(current_engine, CurrentEngine::Engine(_));
}),
},
CurrentEngineTest {
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ impl MitoEngine {
self.inner.workers.is_region_exists(region_id)
}

/// Returns true if the specific region exists.
pub fn is_region_opening(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_opening(region_id)
}

/// Returns the region disk/memory usage information.
pub async fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
let region = self
Expand Down
86 changes: 86 additions & 0 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;

use crate::config::MitoConfig;
use crate::error;
use crate::test_util::{
build_rows, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};
Expand Down Expand Up @@ -319,3 +321,87 @@ async fn test_open_region_skip_wal_replay() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_open_region_wait_for_opening_region_ok() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
let (tx, rx) = oneshot::channel();
let opening_regions = worker.opening_regions().clone();
opening_regions.insert_sender(region_id, tx.into());
assert!(engine.is_region_opening(region_id));

let handle_open = tokio::spawn(async move {
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
});

// Wait for conditions
while opening_regions.sender_len(region_id) != 2 {
tokio::time::sleep(Duration::from_millis(100)).await;
}

let senders = opening_regions.remove_sender(region_id);
for sender in senders {
sender.send(Ok(0));
}

assert_eq!(handle_open.await.unwrap().unwrap().affected_rows, 0);
assert_eq!(rx.await.unwrap().unwrap(), 0);
}

#[tokio::test]
async fn test_open_region_wait_for_opening_region_err() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-err");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
let (tx, rx) = oneshot::channel();
let opening_regions = worker.opening_regions().clone();
opening_regions.insert_sender(region_id, tx.into());
assert!(engine.is_region_opening(region_id));

let handle_open = tokio::spawn(async move {
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
});

// Wait for conditions
while opening_regions.sender_len(region_id) != 2 {
tokio::time::sleep(Duration::from_millis(100)).await;
}

let senders = opening_regions.remove_sender(region_id);
for sender in senders {
sender.send(Err(error::RegionNotFoundSnafu { region_id }.build()));
}

assert_eq!(
handle_open.await.unwrap().unwrap_err().status_code(),
StatusCode::RegionNotFound
);
assert_eq!(
rx.await.unwrap().unwrap_err().status_code(),
StatusCode::RegionNotFound
);
}
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to open region"))]
OpenRegion {
#[snafu(implicit)]
location: Location,
source: Arc<Error>,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -783,6 +790,7 @@ impl ErrorExt for Error {
| Recv { .. }
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
OpenRegion { source, .. } => source.status_code(),
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
Expand Down
57 changes: 56 additions & 1 deletion src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub(crate) mod opener;
pub mod options;
pub(crate) mod version;

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
Expand All @@ -35,7 +36,7 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;

Expand Down Expand Up @@ -471,6 +472,60 @@ impl RegionMap {

pub(crate) type RegionMapRef = Arc<RegionMap>;

/// Opening regions
#[derive(Debug, Default)]
pub(crate) struct OpeningRegions {
regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
}

impl OpeningRegions {
/// Registers `sender` for an opening region; Otherwise, it returns `None`.
pub(crate) fn wait_for_opening_region(
&self,
region_id: RegionId,
sender: OptionOutputTx,
) -> Option<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
match regions.entry(region_id) {
Entry::Occupied(mut senders) => {
senders.get_mut().push(sender);
None
}
Entry::Vacant(_) => Some(sender),
}
}

/// Returns true if the region exists.
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
let regions = self.regions.read().unwrap();
regions.contains_key(&region_id)
}

/// Inserts a new region into the map.
pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
let mut regions = self.regions.write().unwrap();
regions.insert(region, vec![sender]);
}

/// Remove region by id.
pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
regions.remove(&region_id).unwrap_or_default()
}

#[cfg(test)]
pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
let regions = self.regions.read().unwrap();
if let Some(senders) = regions.get(&region_id) {
senders.len()
} else {
0
}
}
}

pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;

#[cfg(test)]
mod tests {
use crossbeam_utils::atomic::AtomicCell;
Expand Down
10 changes: 9 additions & 1 deletion src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
/// Write ahead log.
///
/// All regions in the engine shares the same WAL instance.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Wal<S> {
/// The underlying log store.
store: Arc<S>,
Expand All @@ -62,6 +62,14 @@ impl<S> Wal<S> {
}
}

impl<S> Clone for Wal<S> {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
}
}
}

impl<S: LogStore> Wal<S> {
/// Returns a writer to write to the WAL.
pub fn writer(&self) -> WalWriter<S> {
Expand Down
Loading

0 comments on commit 4aa756c

Please sign in to comment.