Skip to content

Commit

Permalink
feat: use the write runtime to handle the heartbeats (GreptimeTeam#4177)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Jun 20, 2024
1 parent 4306cba commit 5bcd7a1
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use once_cell::sync::OnceCell;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Streaming};

use crate::error;
use crate::error::Result;
use crate::handler::Pusher;
use crate::handler::{HeartbeatHandlerGroup, Pusher};
use crate::metasrv::{Context, Metasrv};
use crate::service::{GrpcResult, GrpcStream};

Expand All @@ -45,31 +46,24 @@ impl heartbeat_server::Heartbeat for Metasrv {
let (tx, rx) = mpsc::channel(128);
let handler_group = self.handler_group().clone();
let ctx = self.new_ctx();
let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_write(async move {
let mut pusher_key = None;
while let Some(msg) = in_stream.next().await {
let mut is_not_leader = false;
match msg {
Ok(req) => {
let header = match req.header.as_ref() {
Some(header) => header,
None => {
let err = error::MissingRequestHeaderSnafu {}.build();
error!("Exit on malformed request: MissingRequestHeader");
let _ = tx.send(Err(err.into())).await;
break;
}
};

debug!("Receiving heartbeat request: {:?}", req);

let Some(header) = req.header.as_ref() else {
error!("Exit on malformed request: MissingRequestHeader");
let _ = tx
.send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
.await;
break;
};

if pusher_key.is_none() {
let node_id = get_node_id(header);
let role = header.role() as i32;
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(tx.clone(), header);
handler_group.register(&key, pusher).await;
pusher_key = Some(key);
pusher_key = register_pusher(&handler_group, header, tx.clone()).await;
}

let res = handler_group
Expand All @@ -80,6 +74,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader());

debug!("Sending heartbeat response: {:?}", res);

if tx.send(res).await.is_err() {
info!("ReceiverStream was dropped; shutting down");
break;
Expand Down Expand Up @@ -168,6 +163,19 @@ fn get_node_id(header: &RequestHeader) -> u64 {
}
}

async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> Option<String> {
let role = header.role() as i32;
let node_id = get_node_id(header);
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(sender, header);
handler_group.register(&key, pusher).await;
Some(key)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down

0 comments on commit 5bcd7a1

Please sign in to comment.