Skip to content

Commit

Permalink
feat: migrate local WAL regions (#4715)
Browse files Browse the repository at this point in the history
* feat: allow to flush region before migrating

* fix: fix unit tests

* feat: allow to set `flush_timeout`

* feat: skip to replay memtable

* fix: fix unit tests

* test: add more tests

* refactor: simplify timeout logical

* test: add unit tests

* test: add unit tests

* chore: update comments

* fix: fix unit tests

* fix: fmt and clippy

* feat: change default timeout to 30s

* fix: throw `ExceededDeadline` error

* test: add tests for `downgrade_region_with_retry`

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: update proto to `3633474`

* refactor: refactor `upgrade_region_with_retry`

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Sep 20, 2024
1 parent 0c9b8eb commit 163cea8
Show file tree
Hide file tree
Showing 23 changed files with 847 additions and 271 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "796ce9b003c6689e853825f649e03543c81ede99" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "36334744c7020734dcb4a6b8d24d52ae7ed53fe1" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
21 changes: 8 additions & 13 deletions src/common/function/src/table/migrate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use session::context::QueryContextRef;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;

const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
const DEFAULT_TIMEOUT_SECS: u64 = 30;

/// A function to migrate a region from source peer to target peer.
/// Returns the submitted procedure id if success. Only available in cluster mode.
///
/// - `migrate_region(region_id, from_peer, to_peer)`, with default replay WAL timeout(10 seconds).
/// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))`
/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(30 seconds).
/// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))`.
///
/// The parameters:
/// - `region_id`: the region id
Expand All @@ -48,18 +48,13 @@ pub(crate) async fn migrate_region(
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (region_id, from_peer, to_peer, replay_timeout) = match params.len() {
let (region_id, from_peer, to_peer, timeout) = match params.len() {
3 => {
let region_id = cast_u64(&params[0])?;
let from_peer = cast_u64(&params[1])?;
let to_peer = cast_u64(&params[2])?;

(
region_id,
from_peer,
to_peer,
Some(DEFAULT_REPLAY_TIMEOUT_SECS),
)
(region_id, from_peer, to_peer, Some(DEFAULT_TIMEOUT_SECS))
}

4 => {
Expand All @@ -82,14 +77,14 @@ pub(crate) async fn migrate_region(
}
};

match (region_id, from_peer, to_peer, replay_timeout) {
(Some(region_id), Some(from_peer), Some(to_peer), Some(replay_timeout)) => {
match (region_id, from_peer, to_peer, timeout) {
(Some(region_id), Some(from_peer), Some(to_peer), Some(timeout)) => {
let pid = procedure_service_handler
.migrate_region(MigrateRegionRequest {
region_id,
from_peer,
to_peer,
replay_timeout: Duration::from_secs(replay_timeout),
timeout: Duration::from_secs(timeout),
})
.await?;

Expand Down
13 changes: 11 additions & 2 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,20 @@ impl OpenRegion {
pub struct DowngradeRegion {
/// The [RegionId].
pub region_id: RegionId,
/// The timeout of waiting for flush the region.
///
/// `None` stands for don't flush before downgrading the region.
#[serde(default)]
pub flush_timeout: Option<Duration>,
}

impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DowngradeRegion(region_id={})", self.region_id)
write!(
f,
"DowngradeRegion(region_id={}, flush_timeout={:?})",
self.region_id, self.flush_timeout,
)
}
}

Expand All @@ -152,7 +161,7 @@ pub struct UpgradeRegion {
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
#[serde(with = "humantime_serde")]
pub wait_for_replay_timeout: Option<Duration>,
pub replay_timeout: Option<Duration>,
/// The hint for replaying memtable.
#[serde(default)]
pub location_id: Option<u64>,
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/rpc/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct MigrateRegionRequest {
pub region_id: u64,
pub from_peer: u64,
pub to_peer: u64,
pub replay_timeout: Duration,
pub timeout: Duration,
}

/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
Expand Down
24 changes: 22 additions & 2 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::region_server::RegionServer;
pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
}

/// Handler of the instruction.
Expand All @@ -47,12 +48,22 @@ pub type InstructionHandler =
pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
}

impl HandlerContext {
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
RegionId::new(region_ident.table_id, region_ident.region_number)
}

#[cfg(test)]
pub fn new_for_test(region_server: RegionServer) -> Self {
Self {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
}
}
}

impl RegionHeartbeatResponseHandler {
Expand All @@ -61,6 +72,7 @@ impl RegionHeartbeatResponseHandler {
Self {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
}
}

Expand Down Expand Up @@ -107,11 +119,13 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
})
.await;

Expand All @@ -129,6 +143,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
Expand Down Expand Up @@ -197,6 +212,7 @@ mod tests {
// Downgrade region
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
Expand All @@ -205,7 +221,7 @@ mod tests {
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
replay_timeout: None,
location_id: None,
});
assert!(
Expand Down Expand Up @@ -392,7 +408,10 @@ mod tests {
// Should be ok, if we try to downgrade it twice.
for _ in 0..2 {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id });
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_secs(1)),
});

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
Expand All @@ -413,6 +432,7 @@ mod tests {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
Expand Down
Loading

0 comments on commit 163cea8

Please sign in to comment.