diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index 1fd17389ee46f..d96a61f0f8524 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -38,20 +38,27 @@ pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWav pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes"; pub const TELEMETRY_CLUSTER_TYPE_SINGLE_NODE: &str = "single-node"; pub const TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE: &str = "docker-compose"; +const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test"; pub use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid; -pub fn telemetry_cluster_type_from_env_var() -> PbTelemetryClusterType { +pub fn telemetry_cluster_type_from_env_var() -> TelemetryResult { let cluster_type = match env::var(TELEMETRY_CLUSTER_TYPE) { Ok(cluster_type) => cluster_type, - Err(_) => return PbTelemetryClusterType::Unspecified, + Err(_) => return Ok(PbTelemetryClusterType::Unspecified), }; match cluster_type.as_str() { - TELEMETRY_CLUSTER_TYPE_HOSTED => PbTelemetryClusterType::CloudHosted, - TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => PbTelemetryClusterType::DockerCompose, - TELEMETRY_CLUSTER_TYPE_KUBERNETES => PbTelemetryClusterType::Kubernetes, - TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => PbTelemetryClusterType::SingleNode, - _ => PbTelemetryClusterType::Unspecified, + TELEMETRY_CLUSTER_TYPE_HOSTED => Ok(PbTelemetryClusterType::CloudHosted), + TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => Ok(PbTelemetryClusterType::DockerCompose), + TELEMETRY_CLUSTER_TYPE_KUBERNETES => Ok(PbTelemetryClusterType::Kubernetes), + TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => Ok(PbTelemetryClusterType::SingleNode), + + // block the report if the cluster is in test env + // but it only blocks the report from meta node, not other nodes + TELEMETRY_CLUSTER_TYPE_TEST => Err(TelemetryError::from( + "test cluster type should not send telemetry report", + )), + _ => Err(TelemetryError::from("invalid cluster type")), } } @@ -156,7 +163,7 @@ pub fn report_scarf_enabled() -> bool { telemetry_env_enabled() && !matches!( telemetry_cluster_type_from_env_var(), - PbTelemetryClusterType::CloudHosted + Ok(PbTelemetryClusterType::CloudHosted) ) } diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index faff786443e15..0231f93b3d647 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -67,22 +67,23 @@ where // There is only one case tracking_id updated at the runtime ---- metastore data has been // cleaned. There is no way that metastore has been cleaned but nodes are still running let tracking_id = { - if let Some(cloud_uuid) = get_telemetry_risingwave_cloud_uuid() { - cloud_uuid - } else { - match info_fetcher.fetch_telemetry_info().await { - Ok(Some(id)) => id, - Ok(None) => { - tracing::info!("Telemetry is disabled"); - return; - } - Err(err) => { - tracing::error!("Telemetry failed to get tracking_id, err {}", err); - return; - } + match ( + info_fetcher.fetch_telemetry_info().await, + get_telemetry_risingwave_cloud_uuid(), + ) { + (Ok(None), _) => { + tracing::info!("Telemetry is disabled"); + return; + } + (Err(err), _) => { + tracing::error!("Telemetry failed to get tracking_id, err {}", err); + return; } + (Ok(Some(_)), Some(cloud_uuid)) => cloud_uuid, + (Ok(Some(id)), None) => id, } }; + TELEMETRY_TRACKING_ID .set(tracking_id.clone()) .unwrap_or_else(|_| { diff --git a/src/meta/service/src/telemetry_service.rs b/src/meta/service/src/telemetry_service.rs index be76bc05bcf04..018e2522494dd 100644 --- a/src/meta/service/src/telemetry_service.rs +++ b/src/meta/service/src/telemetry_service.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::telemetry::telemetry_cluster_type_from_env_var; use risingwave_meta::controller::SqlMetaStore; use risingwave_meta_model::prelude::Cluster; use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService; @@ -44,6 +45,9 @@ impl TelemetryInfoService for TelemetryInfoServiceImpl { &self, _request: Request, ) -> Result, Status> { + if telemetry_cluster_type_from_env_var().is_err() { + return Ok(Response::new(TelemetryInfoResponse { tracking_id: None })); + } match self.get_tracking_id().await? { Some(tracking_id) => Ok(Response::new(TelemetryInfoResponse { tracking_id: Some(tracking_id.into()), diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs index 36cda3698feb6..7ee33a2c14d88 100644 --- a/src/meta/src/telemetry.rs +++ b/src/meta/src/telemetry.rs @@ -156,6 +156,10 @@ impl MetaTelemetryInfoFetcher { #[async_trait::async_trait] impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher { async fn fetch_telemetry_info(&self) -> TelemetryResult> { + // the err here means building cluster on test env, so we don't need to report telemetry + if telemetry_cluster_type_from_env_var().is_err() { + return Ok(None); + } Ok(Some(self.tracking_id.clone().into())) } } @@ -224,7 +228,8 @@ impl TelemetryReportCreator for MetaReportCreator { streaming_job_count, meta_backend: MetaBackend::Sql, job_desc: stream_job_desc, - cluster_type: telemetry_cluster_type_from_env_var(), + // it blocks the report if the cluster type is not valid or leak from test env + cluster_type: telemetry_cluster_type_from_env_var()?, object_store_media_type: self.object_store_media_type, }) }