diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index c6ef6cb3f6db..a1c1788827c9 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -129,8 +129,10 @@ impl RegionAliveKeeper { let request = RegionRequest::Close(RegionCloseRequest {}); if let Err(e) = self.region_server.handle_request(region_id, request).await { if e.status_code() != StatusCode::RegionNotFound { - let _ = self.region_server.set_writable(region_id, false); - error!(e; "Failed to close staled region {}, set region to readonly.",region_id); + let _ = self + .region_server + .set_region_role(region_id, RegionRole::Follower); + error!(e; "Failed to close staled region {}, convert region to readonly.",region_id); } } } @@ -378,7 +380,7 @@ impl CountdownTask { } }, Some(CountdownCommand::Reset((role, deadline))) => { - let _ = self.region_server.set_writable(self.region_id, role.writable()); + let _ = self.region_server.set_region_role(self.region_id, role); trace!( "Reset deadline of region {region_id} to approximately {} seconds later.", (deadline - Instant::now()).as_secs_f32(), @@ -399,8 +401,8 @@ impl CountdownTask { } } () = &mut countdown => { - warn!("The region {region_id} lease is expired, set region to readonly."); - let _ = self.region_server.set_writable(self.region_id, false); + warn!("The region {region_id} lease is expired, convert region to readonly."); + let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower); // resets the countdown. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); countdown.as_mut().reset(far_future); @@ -436,7 +438,9 @@ mod test { .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); - region_server.set_writable(region_id, true).unwrap(); + region_server + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); // Register a region before starting. alive_keeper.register_region(region_id).await; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 149aa44ebe34..1d14d0e1bab3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -47,7 +47,7 @@ use servers::server::ServerHandlers; use servers::Mode; use snafu::{ensure, OptionExt, ResultExt}; use store_api::path_utils::{region_dir, WAL_DIR}; -use store_api::region_engine::RegionEngineRef; +use store_api::region_engine::{RegionEngineRef, RegionRole}; use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; use tokio::fs; @@ -543,9 +543,9 @@ async fn open_all_regions( for region_id in open_regions { if open_with_writable { - if let Err(e) = region_server.set_writable(region_id, true) { + if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) { error!( - e; "failed to set writable for region {region_id}" + e; "failed to convert region {region_id} to leader" ); } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index d23615eb13d8..3099ec281f5b 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -153,6 +153,7 @@ mod tests { use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; use store_api::path_utils::region_dir; + use store_api::region_engine::RegionRole; use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; use tokio::sync::mpsc::{self, Receiver}; @@ -295,7 +296,9 @@ mod tests { } assert_matches!( - region_server.set_writable(region_id, true).unwrap_err(), + region_server + .set_region_role(region_id, RegionRole::Leader) + .unwrap_err(), error::Error::RegionNotFound { .. } ); } diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index 4ca6e39a22b7..7f39d8c655fb 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -56,7 +56,7 @@ impl HandlerContext { }: DowngradeRegion, ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { - let Some(writable) = self.region_server.is_writable(region_id) else { + let Some(writable) = self.region_server.is_region_leader(region_id) else { return InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: false, diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index 0d1ef0476c95..9acb3da9c348 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -31,7 +31,7 @@ impl HandlerContext { }: UpgradeRegion, ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { - let Some(writable) = self.region_server.is_writable(region_id) else { + let Some(writable) = self.region_server.is_region_leader(region_id) else { return InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: false, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 47e4f553bc10..1152667bd1a7 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -254,8 +254,7 @@ impl RegionServer { .collect() } - pub fn is_writable(&self, region_id: RegionId) -> Option { - // TODO(weny): Finds a better way. + pub fn is_region_leader(&self, region_id: RegionId) -> Option { self.inner.region_map.get(®ion_id).and_then(|engine| { engine.role(region_id).map(|role| match role { RegionRole::Follower => false, @@ -264,14 +263,14 @@ impl RegionServer { }) } - pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> { + pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> { let engine = self .inner .region_map .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })?; engine - .set_writable(region_id, writable) + .set_region_role(region_id, role) .with_context(|_| HandleRegionRequestSnafu { region_id }) } @@ -790,7 +789,7 @@ impl RegionServerInner { info!("Region {region_id} is deregistered from engine {engine_type}"); self.region_map .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); + .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower)); self.event_listener.on_region_deregistered(region_id); } RegionChange::Catchup => { diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 95ccfce29fb2..229c4805ecaf 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -220,7 +220,7 @@ impl RegionEngine for MockRegionEngine { Ok(()) } - fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, _region_id: RegionId, _role: RegionRole) -> Result<(), BoxedError> { Ok(()) } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 038a2ccd801a..bb31ad1e694a 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -113,9 +113,9 @@ impl RegionEngine for FileRegionEngine { None } - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { self.inner - .set_writable(region_id, writable) + .set_region_role(region_id, role) .map_err(BoxedError::new) } @@ -190,7 +190,7 @@ impl EngineInner { Ok(()) } - fn set_writable(&self, _region_id: RegionId, _writable: bool) -> EngineResult<()> { + fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> { // TODO(zhongzc): Improve the semantics and implementation of this API. Ok(()) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 407e81a12c66..7f02300a0846 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -201,14 +201,14 @@ impl RegionEngine for MetricEngine { Ok(()) } - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { // ignore the region not found error for x in [ utils::to_metadata_region_id(region_id), utils::to_data_region_id(region_id), region_id, ] { - if let Err(e) = self.inner.mito.set_writable(x, writable) + if let Err(e) = self.inner.mito.set_region_role(x, role) && e.status_code() != StatusCode::RegionNotFound { return Err(e); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7cac84af616b..60a2a97848a0 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -440,14 +440,14 @@ impl EngineInner { Ok(scan_region) } - /// Set writable mode for a region. - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> { + /// Converts the [`RegionRole`]. + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> { let region = self .workers .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - region.set_writable(writable); + region.set_region_role(role); Ok(()) } @@ -560,9 +560,9 @@ impl RegionEngine for MitoEngine { size.try_into().ok() } - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { self.inner - .set_writable(region_id, writable) + .set_region_role(region_id, role) .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index b48dc2ccfb08..2e75bf19faa0 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -24,7 +24,7 @@ use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::ColumnMetadata; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest, }; @@ -213,8 +213,10 @@ async fn test_put_after_alter() { ) .await .unwrap(); - // Set writable. - engine.set_writable(region_id, true).unwrap(); + // Convert region to leader. + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); // Put with old schema. let rows = Rows { diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index 36c41299bb3b..9ebbbbd6a023 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -22,7 +22,7 @@ use common_recordbatch::RecordBatches; use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use rstest::rstest; use rstest_reuse::{self, apply}; -use store_api::region_engine::{RegionEngine, SetRegionRoleStateResponse}; +use store_api::region_engine::{RegionEngine, RegionRole, SetRegionRoleStateResponse}; use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; @@ -247,7 +247,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option info!( - "Set region {} to writable, previous state: {:?}", - self.region_id, state - ), - Err(state) => { - if state != RegionRoleState::Leader(RegionLeaderState::Writable) { - warn!( - "Failed to set region {} to writable, current state: {:?}", - self.region_id, state - ) + /// Sets the [`RegionRole`]. + pub(crate) fn set_region_role(&self, next_role: RegionRole) { + match next_role { + RegionRole::Follower => { + self.manifest_ctx.state.store(RegionRoleState::Follower); + } + RegionRole::Leader => { + // Only sets the region to writable if it is read only. + // This prevents others updating the manifest. + match self.manifest_ctx.state.compare_exchange( + RegionRoleState::Follower, + RegionRoleState::Leader(RegionLeaderState::Writable), + ) { + Ok(state) => info!( + "Convert region {} to leader, previous role state: {:?}", + self.region_id, state + ), + Err(state) => { + if state != RegionRoleState::Leader(RegionLeaderState::Writable) { + warn!( + "Failed to convert region {} to leader, current role state: {:?}", + self.region_id, state + ) + } } } } - } else { - self.manifest_ctx.state.store(RegionRoleState::Follower); } } @@ -301,8 +304,6 @@ impl MitoRegion { } } } - - self.set_writable(false); } /// Switches the region state to `RegionState::Writable` if the current state is `expect`. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 822dce95d71f..15465feadba2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -28,6 +28,7 @@ use snafu::{ensure, OptionExt}; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::region_engine::RegionRole; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; @@ -171,8 +172,8 @@ impl RegionOpener { &expect.column_metadatas, &expect.primary_key, )?; - // To keep consistence with Create behavior, set the opened Region writable. - region.set_writable(true); + // To keep consistence with Create behavior, set the opened Region to RegionRole::Leader. + region.set_region_role(RegionRole::Leader); return Ok(region); } Ok(None) => { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index cd449e53fae6..0bd85747c0f1 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -52,7 +52,7 @@ use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rstest_reuse::template; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, @@ -1114,6 +1114,8 @@ pub async fn reopen_region( .unwrap(); if writable { - engine.set_writable(region_id, true).unwrap(); + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); } } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 505c994d3607..4c7efbde4db8 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -20,6 +20,7 @@ use common_telemetry::info; use common_telemetry::tracing::warn; use snafu::ensure; use store_api::logstore::LogStore; +use store_api::region_engine::RegionRole; use store_api::region_request::{AffectedRows, RegionCatchupRequest}; use store_api::storage::RegionId; use tokio::time::Instant; @@ -112,7 +113,7 @@ impl RegionWorkerLoop { } if request.set_writable { - region.set_writable(true); + region.set_region_role(RegionRole::Leader); } Ok(0) diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 5e5de855e542..d8b100d1711a 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -89,7 +89,7 @@ impl RegionEngine for MetaRegionEngine { Ok(()) } - fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { + fn set_region_role(&self, _region_id: RegionId, _role: RegionRole) -> Result<(), BoxedError> { unimplemented!() } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 42f8aae3f4c1..eebf0d53fba6 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -307,12 +307,12 @@ pub trait RegionEngine: Send + Sync { /// Stops the engine async fn stop(&self) -> Result<(), BoxedError>; - /// Sets writable mode for a region. + /// Sets [RegionRole] for a region. /// /// The engine checks whether the region is writable before writing to the region. Setting /// the region as readonly doesn't guarantee that write operations in progress will not /// take effect. - fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>; + fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>; /// Sets region role state gracefully. ///