Skip to content

Commit

Permalink
feat: refine region state checks and handle stalled requests
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 12, 2024
1 parent 84aa5b7 commit 3f31b05
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 20 deletions.
20 changes: 20 additions & 0 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,26 @@ impl RegionMap {
Ok(region)
}

/// Gets region by region id.
///
/// Calls the callback if the region does not exist.
pub(crate) fn get_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })
{
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
}
}
}

/// Gets writable region by region id.
///
/// Calls the callback if the region does not exist or is readonly.
Expand Down
39 changes: 36 additions & 3 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod handle_open;
mod handle_truncate;
mod handle_write;

use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -579,7 +580,7 @@ type RequestBuffer = Vec<WorkerRequest>;
#[derive(Default)]
pub(crate) struct StalledRequests {
/// Stalled requests.
pub(crate) requests: Vec<SenderWriteRequest>,
pub(crate) requests: HashMap<RegionId, Vec<SenderWriteRequest>>,
/// Estimated size of all stalled requests.
pub(crate) estimated_size: usize,
}
Expand All @@ -591,9 +592,39 @@ impl StalledRequests {
.iter()
.map(|req| req.request.estimated_size())
.sum();
self.requests.append(requests);
for req in requests.drain(..) {
self.requests
.entry(req.request.region_id)
.or_default()
.push(req);
}

self.estimated_size += size;
}

/// Pushes a stalled request to the buffer.
pub(crate) fn push(&mut self, req: SenderWriteRequest) {
let size = req.request.estimated_size();
self.requests
.entry(req.request.region_id)
.or_default()
.push(req);
self.estimated_size += size;
}

/// Removes stalled requests of specific region.
pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec<SenderWriteRequest> {
if let Some(requests) = self.requests.remove(region_id) {
let size: usize = requests
.iter()
.map(|req| req.request.estimated_size())
.sum();
self.estimated_size -= size;
requests
} else {
vec![]
}
}
}

/// Background worker loop to handle requests.
Expand Down Expand Up @@ -854,7 +885,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req),
BackgroundNotify::RegionChange(req) => {
self.handle_manifest_region_change_result(req).await
}
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use common_telemetry::{debug, info};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest};
use store_api::storage::RegionId;
Expand All @@ -32,7 +33,7 @@ use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_alter_request(
&mut self,
region_id: RegionId,
Expand Down
12 changes: 10 additions & 2 deletions src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::collections::{HashMap, VecDeque};

use common_telemetry::{info, warn};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

use crate::cache::file_cache::{FileType, IndexKey};
Expand Down Expand Up @@ -74,7 +75,7 @@ impl RegionEditQueue {
}
}

impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
Expand Down Expand Up @@ -265,10 +266,14 @@ impl<S> RegionWorkerLoop<S> {
}

/// Handles region change result.
pub(crate) fn handle_manifest_region_change_result(&self, change_result: RegionChangeResult) {
pub(crate) async fn handle_manifest_region_change_result(
&mut self,
change_result: RegionChangeResult,
) {
let region = match self.regions.get_region(change_result.region_id) {
Some(region) => region,
None => {
self.reject_region_stalled_requests(&change_result.region_id);
change_result.sender.send(
RegionNotFoundSnafu {
region_id: change_result.region_id,
Expand All @@ -294,6 +299,9 @@ impl<S> RegionWorkerLoop<S> {

// Sets the region as writable.
region.switch_state_to_writable(RegionLeaderState::Altering);
// Handles the stalled requests.
self.handle_region_stalled_requests(&change_result.region_id)
.await;

change_result.sender.send(change_result.result.map(|_| 0));
}
Expand Down
72 changes: 58 additions & 14 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;

use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result};
use crate::error::{InvalidRequestSnafu, RegionLeaderStateSnafu, RejectWriteSnafu, Result};
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
Expand All @@ -47,9 +48,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// The memory pressure is still too high, reject write requests.
reject_write_requests(write_requests);
// Also reject all stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
reject_write_requests(stalled.requests);
self.reject_stalled_requests();
return;
}

Expand Down Expand Up @@ -124,7 +123,32 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
for (_, requests) in stalled.requests {
self.handle_write_requests(requests, false).await;
}
}

/// Rejects all stalled requests.
pub(crate) fn reject_stalled_requests(&mut self) {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
for (_, requests) in stalled.requests {
reject_write_requests(requests);
}
}

/// Rejects a specific region's stalled requests.
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
let requests = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
reject_write_requests(requests);
}

/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
let requests = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
self.handle_write_requests(requests, true).await;
}
}

Expand Down Expand Up @@ -152,19 +176,39 @@ impl<S> RegionWorkerLoop<S> {
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self
.regions
.writable_region_or(region_id, &mut sender_req.sender)
.get_region_or(region_id, &mut sender_req.sender)
else {
// No such region or the region is read only.
// No such region.
continue;
};
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
region.provider.clone(),
);

let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
region.provider.clone(),
);

e.insert(region_ctx);
e.insert(region_ctx);
}
RegionRoleState::Leader(RegionLeaderState::Altering) => {
self.stalled_count.add(1);
self.stalled_requests.push(sender_req);
continue;
}
state => {
// The region is not writable.
sender_req.sender.send(
RegionLeaderStateSnafu {
region_id,
state,
expect: RegionLeaderState::Writable,
}
.fail(),
);
continue;
}
}
}

// Safety: Now we ensure the region exists.
Expand Down

0 comments on commit 3f31b05

Please sign in to comment.