Skip to content

Commit

Permalink
test: add unit test for should_reject_write
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 23, 2024
1 parent c7b70be commit 8972277
Showing 1 changed file with 64 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_telemetry::{error, info, warn};
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::time::{sleep, Instant};

use super::update_metadata::UpdateMetadata;
Expand Down Expand Up @@ -107,6 +108,23 @@ impl DowngradeLeaderRegion {
})
}

async fn should_reject_write(ctx: &mut Context, region_id: RegionId) -> Result<bool> {
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let reject_write = if let Some(wal_option) = datanode_table_value
.region_info
.region_wal_options
.get(&region_id.region_number())
{
let options: WalOptions = serde_json::from_str(wal_option)
.with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?;
matches!(options, WalOptions::RaftEngine)
} else {
true
};

Ok(reject_write)
}

/// Tries to downgrade a leader region.
///
/// Retry:
Expand All @@ -127,20 +145,7 @@ impl DowngradeLeaderRegion {
.context(error::ExceededDeadlineSnafu {
operation: "Downgrade region",
})?;

let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let reject_write = if let Some(wal_option) = datanode_table_value
.region_info
.region_wal_options
.get(&region_id.region_number())
{
let options: WalOptions = serde_json::from_str(wal_option)
.with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?;
matches!(options, WalOptions::RaftEngine)
} else {
true
};

let reject_write = Self::should_reject_write(ctx, region_id).await?;
let downgrade_instruction =
self.build_downgrade_region_instruction(ctx, operation_timeout, reject_write);

Expand Down Expand Up @@ -263,6 +268,7 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_wal::options::KafkaWalOptions;
use store_api::storage::RegionId;
use tokio::time::Instant;

Expand Down Expand Up @@ -298,7 +304,7 @@ mod tests {
assert!(!err.is_retryable());
}

async fn prepare_table_metadata(ctx: &Context) {
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let table_info =
new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
let region_routes = vec![RegionRoute {
Expand All @@ -311,12 +317,47 @@ mod tests {
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
wal_options,
)
.await
.unwrap();
}

#[tokio::test]
async fn test_should_reject_writes() {
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let wal_options =
HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]);
prepare_table_metadata(&ctx, wal_options).await;

let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id)
.await
.unwrap();
assert!(reject_write);

// Remote WAL
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let wal_options = HashMap::from([(
1,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "my_topic".to_string(),
}))
.unwrap(),
)]);
prepare_table_metadata(&ctx, wal_options).await;

let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id)
.await
.unwrap();
assert!(!reject_write);
}

#[tokio::test]
async fn test_pusher_dropped() {
let state = DowngradeLeaderRegion::default();
Expand All @@ -325,7 +366,7 @@ mod tests {

let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context();

let (tx, rx) = tokio::sync::mpsc::channel(1);
Expand All @@ -348,7 +389,7 @@ mod tests {
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);

let err = state.downgrade_region(&mut ctx).await.unwrap_err();
Expand All @@ -372,7 +413,7 @@ mod tests {

let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand All @@ -399,7 +440,7 @@ mod tests {

let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand Down Expand Up @@ -427,7 +468,7 @@ mod tests {

let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand Down Expand Up @@ -461,7 +502,7 @@ mod tests {

let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand Down Expand Up @@ -554,7 +595,7 @@ mod tests {

let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx).await;
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand Down

0 comments on commit 8972277

Please sign in to comment.