Skip to content

Commit

Permalink
refactor: use set_region_role instead of set_writable
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 23, 2024
1 parent 29b4ec8 commit bbb1412
Show file tree
Hide file tree
Showing 21 changed files with 96 additions and 73 deletions.
16 changes: 10 additions & 6 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ impl RegionAliveKeeper {
let request = RegionRequest::Close(RegionCloseRequest {});
if let Err(e) = self.region_server.handle_request(region_id, request).await {
if e.status_code() != StatusCode::RegionNotFound {
let _ = self.region_server.set_writable(region_id, false);
error!(e; "Failed to close staled region {}, set region to readonly.",region_id);
let _ = self
.region_server
.set_region_role(region_id, RegionRole::Follower);
error!(e; "Failed to close staled region {}, convert region to readonly.",region_id);
}
}
}
Expand Down Expand Up @@ -378,7 +380,7 @@ impl CountdownTask {
}
},
Some(CountdownCommand::Reset((role, deadline))) => {
let _ = self.region_server.set_writable(self.region_id, role.writable());
let _ = self.region_server.set_region_role(self.region_id, role);
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
Expand All @@ -399,8 +401,8 @@ impl CountdownTask {
}
}
() = &mut countdown => {
warn!("The region {region_id} lease is expired, set region to readonly.");
let _ = self.region_server.set_writable(self.region_id, false);
warn!("The region {region_id} lease is expired, convert region to readonly.");
let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower);
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
Expand Down Expand Up @@ -436,7 +438,9 @@ mod test {
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
region_server.set_writable(region_id, true).unwrap();
region_server
.set_region_role(region_id, RegionRole::Leader)
.unwrap();

// Register a region before starting.
alive_keeper.register_region(region_id).await;
Expand Down
6 changes: 3 additions & 3 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use servers::server::ServerHandlers;
use servers::Mode;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
use store_api::region_engine::RegionEngineRef;
use store_api::region_engine::{RegionEngineRef, RegionRole};
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use tokio::fs;
Expand Down Expand Up @@ -543,9 +543,9 @@ async fn open_all_regions(

for region_id in open_regions {
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
error!(
e; "failed to set writable for region {region_id}"
e; "failed to convert region {region_id} to leader"
);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ mod tests {
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::path_utils::region_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver};
Expand Down Expand Up @@ -295,7 +296,9 @@ mod tests {
}

assert_matches!(
region_server.set_writable(region_id, true).unwrap_err(),
region_server
.set_region_role(region_id, RegionRole::Leader)
.unwrap_err(),
error::Error::RegionNotFound { .. }
);
}
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl HandlerContext {
}: DowngradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_writable(region_id) else {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat/handler/upgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl HandlerContext {
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_writable(region_id) else {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
Expand Down
9 changes: 4 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ impl RegionServer {
.collect()
}

pub fn is_writable(&self, region_id: RegionId) -> Option<bool> {
// TODO(weny): Finds a better way.
pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
self.inner.region_map.get(&region_id).and_then(|engine| {
engine.role(region_id).map(|role| match role {
RegionRole::Follower => false,
Expand All @@ -264,14 +263,14 @@ impl RegionServer {
})
}

pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
let engine = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine
.set_writable(region_id, writable)
.set_region_role(region_id, role)
.with_context(|_| HandleRegionRequestSnafu { region_id })
}

Expand Down Expand Up @@ -790,7 +789,7 @@ impl RegionServerInner {
info!("Region {region_id} is deregistered from engine {engine_type}");
self.region_map
.remove(&region_id)
.map(|(id, engine)| engine.set_writable(id, false));
.map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
self.event_listener.on_region_deregistered(region_id);
}
RegionChange::Catchup => {
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl RegionEngine for MockRegionEngine {
Ok(())
}

fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> {
fn set_region_role(&self, _region_id: RegionId, _role: RegionRole) -> Result<(), BoxedError> {
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ impl RegionEngine for FileRegionEngine {
None
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
self.inner
.set_writable(region_id, writable)
.set_region_role(region_id, role)
.map_err(BoxedError::new)
}

Expand Down Expand Up @@ -190,7 +190,7 @@ impl EngineInner {
Ok(())
}

fn set_writable(&self, _region_id: RegionId, _writable: bool) -> EngineResult<()> {
fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> {
// TODO(zhongzc): Improve the semantics and implementation of this API.
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ impl RegionEngine for MetricEngine {
Ok(())
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
// ignore the region not found error
for x in [
utils::to_metadata_region_id(region_id),
utils::to_data_region_id(region_id),
region_id,
] {
if let Err(e) = self.inner.mito.set_writable(x, writable)
if let Err(e) = self.inner.mito.set_region_role(x, role)
&& e.status_code() != StatusCode::RegionNotFound
{
return Err(e);
Expand Down
10 changes: 5 additions & 5 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,14 @@ impl EngineInner {
Ok(scan_region)
}

/// Set writable mode for a region.
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
/// Converts the [`RegionRole`].
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;

region.set_writable(writable);
region.set_region_role(role);
Ok(())
}

Expand Down Expand Up @@ -560,9 +560,9 @@ impl RegionEngine for MitoEngine {
size.try_into().ok()
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
self.inner
.set_writable(region_id, writable)
.set_region_role(region_id, role)
.map_err(BoxedError::new)
}

Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
};
Expand Down Expand Up @@ -213,8 +213,10 @@ async fn test_put_after_alter() {
)
.await
.unwrap();
// Set writable.
engine.set_writable(region_id, true).unwrap();
// Convert region to leader.
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();

// Put with old schema.
let rows = Rows {
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/catchup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_recordbatch::RecordBatches;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::region_engine::{RegionEngine, SetRegionRoleStateResponse};
use store_api::region_engine::{RegionEngine, RegionRole, SetRegionRoleStateResponse};
use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -247,7 +247,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
assert_matches!(err, error::Error::UnexpectedReplay { .. });

// It should ignore requests to writable regions.
region.set_writable(true);
region.set_region_role(RegionRole::Leader);
let resp = follower_engine
.handle_request(
region_id,
Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
};
Expand Down Expand Up @@ -302,8 +302,10 @@ async fn test_readonly_during_compaction() {
// Waits until the engine receives compaction finished request.
listener.wait_handle_finished().await;

// Sets the region to read only mode.
engine.set_writable(region_id, false).unwrap();
// Converts region to follower.
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
// Wakes up the listener.
listener.wake();

Expand Down
10 changes: 7 additions & 3 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ async fn test_engine_open_empty() {
.await
.unwrap_err();
assert_eq!(StatusCode::RegionNotFound, err.status_code());
let err = engine.set_writable(region_id, true).unwrap_err();
let err = engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap_err();
assert_eq!(StatusCode::RegionNotFound, err.status_code());
let role = engine.role(region_id);
assert_eq!(role, None);
Expand Down Expand Up @@ -134,8 +136,10 @@ async fn test_engine_open_readonly() {
assert_eq!(StatusCode::RegionNotReady, err.status_code());

assert_eq!(Some(RegionRole::Follower), engine.role(region_id));
// Set writable and write.
engine.set_writable(region_id, true).unwrap();
// Converts region to leader.
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
assert_eq!(Some(RegionRole::Leader), engine.role(region_id));

put_rows(&engine, region_id, rows).await;
Expand Down
8 changes: 6 additions & 2 deletions src/mito2/src/engine/set_readonly_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::{RegionEngine, SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_engine::{
RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -74,7 +76,9 @@ async fn test_set_readonly_gracefully() {

assert_eq!(error.status_code(), StatusCode::RegionNotReady);

engine.set_writable(region_id, true).unwrap();
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();

put_rows(&engine, region_id, rows).await;

Expand Down
49 changes: 25 additions & 24 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use smallvec::{smallvec, SmallVec};
use snafu::{ensure, OptionExt};
use store_api::logstore::provider::Provider;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::SettableRegionRoleState;
use store_api::region_engine::{RegionRole, SettableRegionRoleState};
use store_api::storage::RegionId;

use crate::access_layer::AccessLayerRef;
Expand Down Expand Up @@ -200,30 +200,33 @@ impl MitoRegion {
self.manifest_ctx.state.load()
}

/// Sets the writable state.
pub(crate) fn set_writable(&self, writable: bool) {
if writable {
// Only sets the region to writable if it is read only.
// This prevents others updating the manifest.
match self.manifest_ctx.state.compare_exchange(
RegionRoleState::Follower,
RegionRoleState::Leader(RegionLeaderState::Writable),
) {
Ok(state) => info!(
"Set region {} to writable, previous state: {:?}",
self.region_id, state
),
Err(state) => {
if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
warn!(
"Failed to set region {} to writable, current state: {:?}",
self.region_id, state
)
/// Sets the [`RegionRole`].
pub(crate) fn set_region_role(&self, next_role: RegionRole) {
match next_role {
RegionRole::Follower => {
self.manifest_ctx.state.store(RegionRoleState::Follower);
}
RegionRole::Leader => {
// Only sets the region to writable if it is read only.
// This prevents others updating the manifest.
match self.manifest_ctx.state.compare_exchange(
RegionRoleState::Follower,
RegionRoleState::Leader(RegionLeaderState::Writable),
) {
Ok(state) => info!(
"Convert region {} to leader, previous role state: {:?}",
self.region_id, state
),
Err(state) => {
if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
warn!(
"Failed to convert region {} to leader, current role state: {:?}",
self.region_id, state
)
}
}
}
}
} else {
self.manifest_ctx.state.store(RegionRoleState::Follower);
}
}

Expand Down Expand Up @@ -301,8 +304,6 @@ impl MitoRegion {
}
}
}

self.set_writable(false);
}

/// Switches the region state to `RegionState::Writable` if the current state is `expect`.
Expand Down
Loading

0 comments on commit bbb1412

Please sign in to comment.