From acedb79e77d8a871a5cb6f0855f9f1bd52f17ce3 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 14 Jan 2025 22:04:26 +0800 Subject: [PATCH 1/4] fix --- src/common/src/telemetry/report.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index f4737038863b0..3c3a2addfd86d 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -101,10 +101,13 @@ where }); let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::(); + + let mut enable_event_report = true; TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| { tracing::warn!( "Telemetry failed to set event reporting tx, event reporting will be disabled" ); + enable_event_report = false; }); let mut event_stash = Vec::new(); @@ -112,6 +115,10 @@ where tokio::select! { _ = interval.tick() => {}, event = event_rx.recv() => { + if !enable_event_report { + // if have error creating the channel, will get None message from the channel + continue; + } debug_assert!(event.is_some()); event_stash.push(event.unwrap()); if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE { @@ -120,6 +127,9 @@ where continue; } _ = event_interval.tick() => { + if !enable_event_report { + continue; + } do_telemetry_event_report(&mut event_stash).await; continue; }, From b8a1fc097ccbbfd6f315773c19314250604a361f Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 14 Jan 2025 22:11:49 +0800 Subject: [PATCH 2/4] fix --- src/common/telemetry_event/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/common/telemetry_event/src/lib.rs b/src/common/telemetry_event/src/lib.rs index 18874f4818627..e12a3601840c7 100644 --- a/src/common/telemetry_event/src/lib.rs +++ b/src/common/telemetry_event/src/lib.rs @@ -46,6 +46,10 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option { } pub async fn do_telemetry_event_report(event_stash: &mut Vec) { + if event_stash.is_empty() { + return; + } + const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); let batch_message = PbBatchEventMessage { From 162b7884ab18271716e8b7e6b586cd17da3bdb92 Mon Sep 17 00:00:00 2001 From: tabversion Date: Wed, 15 Jan 2025 14:03:01 +0800 Subject: [PATCH 3/4] comment --- src/common/src/telemetry/report.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index 3c3a2addfd86d..46bd2d698d9c0 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -107,6 +107,11 @@ where tracing::warn!( "Telemetry failed to set event reporting tx, event reporting will be disabled" ); + // possible failure: + // When running in standalone mode, the static TELEMETRY_EVENT_REPORT_TX is shared + // and can be set by meta/compute nodes. + // In such case, the one first set the static will do the event reporting and others' + // event report is disabled. enable_event_report = false; }); let mut event_stash = Vec::new(); From 43fbac4c5f775987ea9cc8b936fa603113634a71 Mon Sep 17 00:00:00 2001 From: tabversion Date: Wed, 15 Jan 2025 21:05:48 +0800 Subject: [PATCH 4/4] monir --- src/common/src/telemetry/report.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index 46bd2d698d9c0..f056ddfcb6dee 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -119,11 +119,7 @@ where loop { tokio::select! { _ = interval.tick() => {}, - event = event_rx.recv() => { - if !enable_event_report { - // if have error creating the channel, will get None message from the channel - continue; - } + event = event_rx.recv(), if enable_event_report => { debug_assert!(event.is_some()); event_stash.push(event.unwrap()); if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE { @@ -131,10 +127,7 @@ where } continue; } - _ = event_interval.tick() => { - if !enable_event_report { - continue; - } + _ = event_interval.tick(), if enable_event_report => { do_telemetry_event_report(&mut event_stash).await; continue; },