Skip to content

Commit

Permalink
feat: support to reject write after flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 23, 2024
1 parent bbb1412 commit 650b2bb
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 23 deletions.
6 changes: 4 additions & 2 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,16 @@ pub struct DowngradeRegion {
/// `None` stands for don't flush before downgrading the region.
#[serde(default)]
pub flush_timeout: Option<Duration>,
/// Rejects all write requests after flushing.
pub reject_write: bool,
}

impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DowngradeRegion(region_id={}, flush_timeout={:?})",
self.region_id, self.flush_timeout,
"DowngradeRegion(region_id={}, flush_timeout={:?}, rejct_write={})",
self.region_id, self.flush_timeout, self.reject_write
)
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
Expand Down Expand Up @@ -414,6 +415,7 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
Expand All @@ -436,6 +438,7 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
Expand Down
52 changes: 47 additions & 5 deletions src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,34 @@ use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult;

impl HandlerContext {
async fn set_readonly_gracefully(&self, region_id: RegionId) -> InstructionReply {
match self.region_server.set_readonly_gracefully(region_id).await {
async fn downgrade_region_gracefully(&self, region_id: RegionId) -> Option<InstructionReply> {
match self
.region_server
.downgrade_region_gracefully(region_id)
.await
{
Ok(SetRegionRoleStateResponse::Success { .. }) => None,
Ok(SetRegionRoleStateResponse::NotFound) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
}))
}
Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
})),
}
}

async fn become_follower_gracefully(&self, region_id: RegionId) -> InstructionReply {
match self
.region_server
.become_follower_gracefully(region_id)
.await
{
Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
Expand Down Expand Up @@ -53,6 +79,7 @@ impl HandlerContext {
DowngradeRegion {
region_id,
flush_timeout,
reject_write,
}: DowngradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
Expand All @@ -66,11 +93,17 @@ impl HandlerContext {

// Ignores flush request
if !writable {
return self.set_readonly_gracefully(region_id).await;
return self.become_follower_gracefully(region_id).await;
}

let region_server_moved = self.region_server.clone();
if let Some(flush_timeout) = flush_timeout {
if reject_write {
if let Some(reply) = self.downgrade_region_gracefully(region_id).await {
return reply;
}
}

let register_result = self
.downgrade_tasks
.try_register(
Expand Down Expand Up @@ -108,7 +141,7 @@ impl HandlerContext {
)),
})
}
WaitResult::Finish(Ok(_)) => self.set_readonly_gracefully(region_id).await,
WaitResult::Finish(Ok(_)) => self.become_follower_gracefully(region_id).await,
WaitResult::Finish(Err(err)) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
Expand All @@ -118,7 +151,7 @@ impl HandlerContext {
}
}
} else {
self.set_readonly_gracefully(region_id).await
self.become_follower_gracefully(region_id).await
}
})
}
Expand Down Expand Up @@ -155,6 +188,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down Expand Up @@ -196,6 +230,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down Expand Up @@ -229,6 +264,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down Expand Up @@ -266,6 +302,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand All @@ -280,6 +317,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down Expand Up @@ -325,6 +363,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand All @@ -339,6 +378,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down Expand Up @@ -369,6 +409,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down Expand Up @@ -400,6 +441,7 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
Expand Down
26 changes: 25 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}

pub async fn set_readonly_gracefully(
/// Converts region to follower gracefully.
///
/// After the call returns,
/// the engine ensures no more write or flush operations will succeed in the region.
pub async fn become_follower_gracefully(
&self,
region_id: RegionId,
) -> Result<SetRegionRoleStateResponse> {
Expand All @@ -287,6 +291,26 @@ impl RegionServer {
}
}

/// Set region to downgrading gracefully.
///
/// After the call
/// returns, the engine ensures no more write operations will succeed in the region.
pub async fn downgrade_region_gracefully(
&self,
region_id: RegionId,
) -> Result<SetRegionRoleStateResponse> {
match self.inner.region_map.get(&region_id) {
Some(engine) => Ok(engine
.set_region_role_state_gracefully(
region_id,
SettableRegionRoleState::DowngradingLeader,
)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?),
None => Ok(SetRegionRoleStateResponse::NotFound),
}
}

pub fn runtime(&self) -> Runtime {
self.inner.runtime.clone()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_meta::instruction::{
};
use common_procedure::Status;
use common_telemetry::{error, info, warn};
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::{sleep, Instant};
Expand Down Expand Up @@ -95,12 +96,14 @@ impl DowngradeLeaderRegion {
&self,
ctx: &Context,
flush_timeout: Duration,
reject_write: bool,
) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::DowngradeRegion(DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
reject_write,
})
}

Expand All @@ -118,16 +121,30 @@ impl DowngradeLeaderRegion {
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let leader = &pc.from_peer;
let region_id = ctx.persistent_ctx.region_id;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Downgrade region",
})?;
let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);

let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let reject_write = if let Some(wal_option) = datanode_table_value
.region_info
.region_wal_options
.get(&region_id.region_number())
{
let options: WalOptions = serde_json::from_str(wal_option)
.with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?;
matches!(options, WalOptions::RaftEngine)
} else {
true
};

let downgrade_instruction =
self.build_downgrade_region_instruction(ctx, operation_timeout, reject_write);

let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Downgrade leader region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
Expand Down
12 changes: 1 addition & 11 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,24 +275,14 @@ impl MitoRegion {
)
}

/// Converts leader to follower.
/// You should call this method in the worker loop.
pub(crate) fn become_follower(&self) -> Result<()> {
self.compare_exchange_state(RegionLeaderState::Writable, RegionRoleState::Follower)
}

/// Sets the region to readonly gracefully. This acquires the manifest write lock.
pub(crate) async fn set_state_gracefully(&self, state: SettableRegionRoleState) {
let _manager = self.manifest_ctx.manifest_manager.write().await;
// We acquires the write lock of the manifest manager to ensure that no one is updating the manifest.
// Then we change the state.
match state {
SettableRegionRoleState::Follower => {
if self.manifest_ctx.state.load() != RegionRoleState::Follower {
if let Err(err) = self.become_follower() {
error!(err; "Failed to convert leader to follower, expect state is {:?}", RegionLeaderState::Writable);
}
}
self.manifest_ctx.state.store(RegionRoleState::Follower);
}
SettableRegionRoleState::DowngradingLeader => {
if self.manifest_ctx.state.load()
Expand Down

0 comments on commit 650b2bb

Please sign in to comment.