Skip to content

Commit

Permalink
feat: add logs for upgrading candidate region and updating metadata (#…
Browse files Browse the repository at this point in the history
…3077)

* feat: add logs for upgrading candidate region

* feat: add logs for update metadata

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jan 4, 2024
1 parent 4d250ed commit 284a496
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/datanode/src/heartbeat/handler/upgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::warn;
use common_telemetry::{info, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCatchupRequest, RegionRequest};

Expand Down Expand Up @@ -56,6 +56,7 @@ impl HandlerContext {
.try_register(
region_id,
Box::pin(async move {
info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
region_server_moved
.handle_request(
region_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use common_meta::key::datanode_table::RegionInfo;
use common_meta::rpc::router::RegionRoute;
use common_meta::rpc::router::{region_distribution, RegionRoute};
use common_telemetry::{info, warn};
use snafu::{ensure, OptionExt, ResultExt};

Expand Down Expand Up @@ -144,6 +144,12 @@ impl UpdateMetadata {
} = datanode_table_value.region_info.clone();
let table_route_value = ctx.get_table_route_value().await?;

let region_distribution = region_distribution(&region_routes);
info!(
"Trying to update region routes to {:?} for table: {}",
region_distribution,
region_id.table_id()
);
if let Err(err) = table_metadata_manager
.update_table_route(
region_id.table_id(),
Expand Down
10 changes: 10 additions & 0 deletions src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
use std::sync::Arc;

use common_telemetry::info;
use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::region_request::{AffectedRows, RegionCatchupRequest};
use store_api::storage::RegionId;
use tokio::time::Instant;

use crate::error::{self, Result};
use crate::region::opener::{replay_memtable, RegionOpener};
Expand All @@ -44,6 +46,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {

// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_manager.has_update().await? {
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
Expand All @@ -67,6 +70,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};

let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
let timer = Instant::now();
let last_entry_id = replay_memtable(
&self.wal,
&region.wal_options,
Expand All @@ -75,6 +80,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&region.version_control,
)
.await?;
info!(
"Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.",
timer.elapsed(),
request.entry_id
);
if let Some(expected_last_entry_id) = request.entry_id {
ensure!(
expected_last_entry_id == last_entry_id,
Expand Down

0 comments on commit 284a496

Please sign in to comment.