Skip to content

Commit

Permalink
feat: batching telemetry event request avoid too many requests (#20000)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Jan 8, 2025
1 parent 52532c7 commit 1bc6bea
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 16 deletions.
7 changes: 5 additions & 2 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ option go_package = "risingwavelabs.com/risingwave/proto/telemetry";
enum MetaBackend {
META_BACKEND_UNSPECIFIED = 0;
META_BACKEND_MEMORY = 1;
reserved 2;
reserved "META_BACKEND_ETCD";
META_BACKEND_ETCD = 2;
META_BACKEND_RDB = 3;
}

Expand Down Expand Up @@ -167,3 +166,7 @@ message EventMessage {
// mark the event is a test message
bool is_test = 11;
}

message BatchEventMessage {
repeated EventMessage events = 1;
}
37 changes: 33 additions & 4 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@

use std::sync::Arc;

use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;
use risingwave_pb::telemetry::PbEventMessage;
pub use risingwave_telemetry_event::{
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
current_timestamp, do_telemetry_event_report, post_telemetry_report_pb,
TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
};
use risingwave_telemetry_event::{
get_telemetry_risingwave_cloud_uuid, TELEMETRY_EVENT_REPORT_STASH_SIZE,
TELEMETRY_EVENT_REPORT_TX,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use tokio::time::{interval as tokio_interval_fn, Duration};
use uuid::Uuid;

use super::{Result, TELEMETRY_REPORT_INTERVAL};
Expand Down Expand Up @@ -60,9 +65,13 @@ where

let begin_time = std::time::Instant::now();
let session_id = Uuid::new_v4().to_string();
let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut event_interval =
tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL));
event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// fetch telemetry tracking_id from the meta node only at the beginning
// There is only one case tracking_id updated at the runtime ---- metastore data has been
// cleaned. There is no way that metastore has been cleaned but nodes are still running
Expand Down Expand Up @@ -91,9 +100,29 @@ where
)
});

let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<PbEventMessage>();
TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| {
tracing::warn!(
"Telemetry failed to set event reporting tx, event reporting will be disabled"
);
});
let mut event_stash = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {},
event = event_rx.recv() => {
debug_assert!(event.is_some());
event_stash.push(event.unwrap());
if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE {
do_telemetry_event_report(&mut event_stash).await;
}
continue;
}
_ = event_interval.tick() => {
do_telemetry_event_report(&mut event_stash).await;
continue;
},
_ = &mut shutdown_rx => {
tracing::info!("Telemetry exit");
return;
Expand Down
35 changes: 25 additions & 10 deletions src/common/telemetry_event/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use std::sync::OnceLock;

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedSender;
pub use util::*;

pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
Expand All @@ -32,6 +34,7 @@ pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
pub type TelemetryError = String;

pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();

pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";

Expand All @@ -42,6 +45,21 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
}

pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
let batch_message = PbBatchEventMessage {
events: std::mem::take(event_stash),
};

post_telemetry_report_pb(&url, batch_message.encode_to_vec())
.await
.unwrap_or_else(|e| tracing::debug!("{}", e));
}

pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds
pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action

pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
Expand Down Expand Up @@ -95,15 +113,12 @@ pub fn request_to_telemetry_event(
node,
is_test,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});

if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
let _ = tx.send(event).inspect_err(|e| {
tracing::warn!("Failed to send telemetry event queue: {}", e.as_report())
});
}
}

#[cfg(test)]
Expand Down

0 comments on commit 1bc6bea

Please sign in to comment.