Skip to content

Commit

Permalink
feat: defer HeartbeatHandlerGroup construction and enhance `Leaders…
Browse files Browse the repository at this point in the history
…hipChangeNotifier` (GreptimeTeam#4826)

* feat: enhance `HeartbeatHandlerGroup`

* chore: apply suggestions from CR

* chore: minor refactoring

* chore: code styling

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Oct 15, 2024
1 parent b61a388 commit b5233e5
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 27 deletions.
4 changes: 4 additions & 0 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl Instance {
_guard: guard,
}
}

pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
&mut self.instance
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ impl StartCommand {
}
}

struct StandaloneInformationExtension {
pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/leadership_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::error;
use common_telemetry::{error, info};

use crate::error::Result;

Expand Down Expand Up @@ -45,6 +45,13 @@ pub struct LeadershipChangeNotifier {
listeners: Vec<Arc<dyn LeadershipChangeListener>>,
}

impl LeadershipChangeNotifierCustomizer for LeadershipChangeNotifier {
fn customize(&self, notifier: &mut LeadershipChangeNotifier) {
info!("Customizing leadership change notifier");
notifier.listeners.extend(self.listeners.clone());
}
}

impl LeadershipChangeNotifier {
/// Adds a listener to the notifier.
pub fn add_listener(&mut self, listener: Arc<dyn LeadershipChangeListener>) {
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl MetasrvInstance {
pub fn plugins(&self) -> Plugins {
self.plugins.clone()
}

pub fn mut_inner(&mut self) -> &mut Metasrv {
&mut self.metasrv
}
}

pub async fn bootstrap_metasrv_with_router(
Expand Down
10 changes: 6 additions & 4 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,23 +209,24 @@ impl Pushers {
}
}

#[derive(Clone)]
struct NameCachedHandler {
name: &'static str,
handler: Box<dyn HeartbeatHandler>,
handler: Arc<dyn HeartbeatHandler>,
}

impl NameCachedHandler {
fn new(handler: impl HeartbeatHandler + 'static) -> Self {
let name = handler.name();
let handler = Box::new(handler);
let handler = Arc::new(handler);
Self { name, handler }
}
}

pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;

/// The group of heartbeat handlers.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct HeartbeatHandlerGroup {
handlers: Vec<NameCachedHandler>,
pushers: Pushers,
Expand Down Expand Up @@ -442,6 +443,7 @@ impl Mailbox for HeartbeatMailbox {
}

/// The builder to build the group of heartbeat handlers.
#[derive(Clone)]
pub struct HeartbeatHandlerGroupBuilder {
/// The handler to handle region failure.
region_failure_handler: Option<RegionFailureHandler>,
Expand Down Expand Up @@ -540,7 +542,7 @@ impl HeartbeatHandlerGroupBuilder {
}

Ok(HeartbeatHandlerGroup {
handlers: self.handlers.into_iter().collect(),
handlers: self.handlers,
pushers: self.pushers,
})
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};

#[derive(Clone)]
pub struct RegionFailureHandler {
heartbeat_acceptor: HeartbeatAcceptor,
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::metasrv::Context;
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::RegionLeaseKeeper;

#[derive(Clone)]
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
Expand Down
27 changes: 20 additions & 7 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::broadcast::error::RecvError;

use crate::cluster::MetaPeerClientRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{
InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
StopProcedureManagerSnafu,
self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroupRef;
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
use crate::lease::lookup_datanode_peer;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
Expand Down Expand Up @@ -353,7 +353,8 @@ pub struct Metasrv {
selector: SelectorRef,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: HeartbeatHandlerGroupRef,
handler_group: Option<HeartbeatHandlerGroupRef>,
handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
election: Option<ElectionRef>,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
Expand All @@ -370,7 +371,15 @@ pub struct Metasrv {
}

impl Metasrv {
pub async fn try_start(&self) -> Result<()> {
pub async fn try_start(&mut self) -> Result<()> {
let builder = self
.handler_group_builder
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
self.handler_group = Some(Arc::new(builder.build()?));

if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
Expand Down Expand Up @@ -558,10 +567,14 @@ impl Metasrv {
&self.flow_selector
}

pub fn handler_group(&self) -> &HeartbeatHandlerGroupRef {
pub fn handler_group(&self) -> &Option<HeartbeatHandlerGroupRef> {
&self.handler_group
}

pub fn handler_group_builder(&mut self) -> &mut Option<HeartbeatHandlerGroupBuilder> {
&mut self.handler_group_builder
}

pub fn election(&self) -> Option<&ElectionRef> {
self.election.as_ref()
}
Expand Down
25 changes: 13 additions & 12 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::{
HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers,
};
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
Expand All @@ -74,7 +72,7 @@ pub struct MetasrvBuilder {
kv_backend: Option<KvBackendRef>,
in_memory: Option<ResettableKvBackendRef>,
selector: Option<SelectorRef>,
handler_group: Option<HeartbeatHandlerGroup>,
handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClientRef>,
node_manager: Option<NodeManagerRef>,
Expand All @@ -88,7 +86,7 @@ impl MetasrvBuilder {
kv_backend: None,
in_memory: None,
selector: None,
handler_group: None,
handler_group_builder: None,
meta_peer_client: None,
election: None,
options: None,
Expand Down Expand Up @@ -118,8 +116,11 @@ impl MetasrvBuilder {
self
}

pub fn heartbeat_handler(mut self, handler_group: HeartbeatHandlerGroup) -> Self {
self.handler_group = Some(handler_group);
pub fn heartbeat_handler(
mut self,
handler_group_builder: HeartbeatHandlerGroupBuilder,
) -> Self {
self.handler_group_builder = Some(handler_group_builder);
self
}

Expand Down Expand Up @@ -161,7 +162,7 @@ impl MetasrvBuilder {
kv_backend,
in_memory,
selector,
handler_group,
handler_group_builder,
node_manager,
plugins,
table_metadata_allocator,
Expand Down Expand Up @@ -338,8 +339,8 @@ impl MetasrvBuilder {
.context(error::InitDdlManagerSnafu)?,
);

let handler_group = match handler_group {
Some(handler_group) => handler_group,
let handler_group_builder = match handler_group_builder {
Some(handler_group_builder) => handler_group_builder,
None => {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
Expand All @@ -352,7 +353,6 @@ impl MetasrvBuilder {
.with_region_failure_handler(region_failover_handler)
.with_region_lease_handler(Some(region_lease_handler))
.add_default_handlers()
.build()?
}
};

Expand All @@ -371,7 +371,8 @@ impl MetasrvBuilder {
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group: Arc::new(handler_group),
handler_group: None,
handler_group_builder: Some(handler_group_builder),
election,
procedure_manager,
mailbox,
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn mock(
None => builder,
};

let metasrv = builder.build().await.unwrap();
let mut metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();

let (client, server) = tokio::io::duplex(1024);
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/region/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl RegionFailureDetectorController for RegionFailureDetectorControl {
}

/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
#[derive(Clone)]
pub(crate) struct HeartbeatAcceptor {
sender: Sender<Event>,
}
Expand Down
9 changes: 8 additions & 1 deletion src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use api::v1::meta::{
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use once_cell::sync::OnceCell;
use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -45,7 +46,13 @@ impl heartbeat_server::Heartbeat for Metasrv {
) -> GrpcResult<Self::HeartbeatStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self.handler_group().clone();
let handler_group = self
.handler_group()
.clone()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;

let ctx = self.new_ctx();
let _handle = common_runtime::spawn_global(async move {
let mut pusher_key = None;
Expand Down

0 comments on commit b5233e5

Please sign in to comment.