Skip to content

Commit

Permalink
test: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 12, 2024
1 parent 3f31b05 commit 68b5bc0
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 2 deletions.
69 changes: 68 additions & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use store_api::region_request::{
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::engine::listener::AlterFlushListener;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
Expand Down Expand Up @@ -572,3 +572,70 @@ async fn test_alter_column_fulltext_options() {
check_fulltext_options(&engine, &expect_fulltext_options);
check_region_version(&engine, region_id, 1, 3, 1, 3);
}

#[tokio::test]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::new();
let listener = Arc::new(NotifyRegionChangeResultListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let engine_cloned = engine.clone();
let alter_job = tokio::spawn(async move {
let request = add_tag1();
engine_cloned
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
});

let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
let put_job = tokio::spawn(async move {
let rows = Rows {
schema: column_schemas_cloned,
rows: build_rows(0, 3),
};
put_rows(&engine_cloned, region_id, rows).await;
});

listener.wake_notify();
alter_job.await.unwrap();
put_job.await.unwrap();

let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
26 changes: 26 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub trait EventListener: Send + Sync {

/// Notifies the listener that the compaction is scheduled.
fn on_compaction_scheduled(&self, _region_id: RegionId) {}

/// Notifies the listener that region starts to send a region change result to worker.
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand Down Expand Up @@ -274,3 +277,26 @@ impl EventListener for AlterFlushListener {
self.request_begin_notify.notify_one();
}
}

#[derive(Default)]
pub struct NotifyRegionChangeResultListener {
notify: Notify,
}

impl NotifyRegionChangeResultListener {
/// Continue to sending region change result.
pub fn wake_notify(&self) {
self.notify.notify_one();
}
}

#[async_trait]
impl EventListener for NotifyRegionChangeResultListener {
async fn on_notify_region_change_result_begin(&self, region_id: RegionId) {
info!(
"Wait on notify to start notify region change result for region {}",
region_id
);
self.notify.notified().await;
}
}
9 changes: 9 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,15 @@ impl WorkerListener {
listener.on_compaction_scheduled(_region_id);
}
}

pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener
.on_notify_region_change_result_begin(_region_id)
.await;
}
}
}

#[cfg(test)]
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender.send(Err(e));
return;
}

let listener = self.listener.clone();
let request_sender = self.sender.clone();
// Now the region is in altering state.
common_runtime::spawn_global(async move {
Expand All @@ -255,6 +255,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
new_meta,
}),
};
listener
.on_notify_region_change_result_begin(region.region_id)
.await;

if let Err(res) = request_sender.send(notify).await {
warn!(
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::{hash_map, HashMap};
use std::sync::Arc;

use api::v1::OpType;
use common_telemetry::debug;
use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
Expand Down Expand Up @@ -139,13 +140,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {

/// Rejects a specific region's stalled requests.
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Rejects stalled requests for region {}", region_id);
let requests = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
reject_write_requests(requests);
}

/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
let requests = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
self.handle_write_requests(requests, true).await;
Expand Down Expand Up @@ -192,6 +195,10 @@ impl<S> RegionWorkerLoop<S> {
e.insert(region_ctx);
}
RegionRoleState::Leader(RegionLeaderState::Altering) => {
debug!(
"Region {} is altering, add request to pending writes",
region.region_id
);
self.stalled_count.add(1);
self.stalled_requests.push(sender_req);
continue;
Expand Down

0 comments on commit 68b5bc0

Please sign in to comment.