Skip to content

Commit

Permalink
fix(telemetry): not send report if cluster is in test env (#20180)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Jan 22, 2025
1 parent f5aed12 commit e0e0e78
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 22 deletions.
23 changes: 15 additions & 8 deletions src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,27 @@ pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWav
pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
pub const TELEMETRY_CLUSTER_TYPE_SINGLE_NODE: &str = "single-node";
pub const TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE: &str = "docker-compose";
const TELEMETRY_CLUSTER_TYPE_TEST: &str = "test";

pub use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;

pub fn telemetry_cluster_type_from_env_var() -> PbTelemetryClusterType {
pub fn telemetry_cluster_type_from_env_var() -> TelemetryResult<PbTelemetryClusterType> {
let cluster_type = match env::var(TELEMETRY_CLUSTER_TYPE) {
Ok(cluster_type) => cluster_type,
Err(_) => return PbTelemetryClusterType::Unspecified,
Err(_) => return Ok(PbTelemetryClusterType::Unspecified),
};
match cluster_type.as_str() {
TELEMETRY_CLUSTER_TYPE_HOSTED => PbTelemetryClusterType::CloudHosted,
TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => PbTelemetryClusterType::DockerCompose,
TELEMETRY_CLUSTER_TYPE_KUBERNETES => PbTelemetryClusterType::Kubernetes,
TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => PbTelemetryClusterType::SingleNode,
_ => PbTelemetryClusterType::Unspecified,
TELEMETRY_CLUSTER_TYPE_HOSTED => Ok(PbTelemetryClusterType::CloudHosted),
TELEMETRY_CLUSTER_TYPE_DOCKER_COMPOSE => Ok(PbTelemetryClusterType::DockerCompose),
TELEMETRY_CLUSTER_TYPE_KUBERNETES => Ok(PbTelemetryClusterType::Kubernetes),
TELEMETRY_CLUSTER_TYPE_SINGLE_NODE => Ok(PbTelemetryClusterType::SingleNode),

// block the report if the cluster is in test env
// but it only blocks the report from meta node, not other nodes
TELEMETRY_CLUSTER_TYPE_TEST => Err(TelemetryError::from(
"test cluster type should not send telemetry report",
)),
_ => Err(TelemetryError::from("invalid cluster type")),
}
}

Expand Down Expand Up @@ -156,7 +163,7 @@ pub fn report_scarf_enabled() -> bool {
telemetry_env_enabled()
&& !matches!(
telemetry_cluster_type_from_env_var(),
PbTelemetryClusterType::CloudHosted
Ok(PbTelemetryClusterType::CloudHosted)
)
}

Expand Down
27 changes: 14 additions & 13 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,23 @@ where
// There is only one case tracking_id updated at the runtime ---- metastore data has been
// cleaned. There is no way that metastore has been cleaned but nodes are still running
let tracking_id = {
if let Some(cloud_uuid) = get_telemetry_risingwave_cloud_uuid() {
cloud_uuid
} else {
match info_fetcher.fetch_telemetry_info().await {
Ok(Some(id)) => id,
Ok(None) => {
tracing::info!("Telemetry is disabled");
return;
}
Err(err) => {
tracing::error!("Telemetry failed to get tracking_id, err {}", err);
return;
}
match (
info_fetcher.fetch_telemetry_info().await,
get_telemetry_risingwave_cloud_uuid(),
) {
(Ok(None), _) => {
tracing::info!("Telemetry is disabled");
return;
}
(Err(err), _) => {
tracing::error!("Telemetry failed to get tracking_id, err {}", err);
return;
}
(Ok(Some(_)), Some(cloud_uuid)) => cloud_uuid,
(Ok(Some(id)), None) => id,
}
};

TELEMETRY_TRACKING_ID
.set(tracking_id.clone())
.unwrap_or_else(|_| {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/service/src/telemetry_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::telemetry::telemetry_cluster_type_from_env_var;
use risingwave_meta::controller::SqlMetaStore;
use risingwave_meta_model::prelude::Cluster;
use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService;
Expand Down Expand Up @@ -44,6 +45,9 @@ impl TelemetryInfoService for TelemetryInfoServiceImpl {
&self,
_request: Request<GetTelemetryInfoRequest>,
) -> Result<Response<TelemetryInfoResponse>, Status> {
if telemetry_cluster_type_from_env_var().is_err() {
return Ok(Response::new(TelemetryInfoResponse { tracking_id: None }));
}
match self.get_tracking_id().await? {
Some(tracking_id) => Ok(Response::new(TelemetryInfoResponse {
tracking_id: Some(tracking_id.into()),
Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ impl MetaTelemetryInfoFetcher {
#[async_trait::async_trait]
impl TelemetryInfoFetcher for MetaTelemetryInfoFetcher {
async fn fetch_telemetry_info(&self) -> TelemetryResult<Option<String>> {
// the err here means building cluster on test env, so we don't need to report telemetry
if telemetry_cluster_type_from_env_var().is_err() {
return Ok(None);
}
Ok(Some(self.tracking_id.clone().into()))
}
}
Expand Down Expand Up @@ -224,7 +228,8 @@ impl TelemetryReportCreator for MetaReportCreator {
streaming_job_count,
meta_backend: MetaBackend::Sql,
job_desc: stream_job_desc,
cluster_type: telemetry_cluster_type_from_env_var(),
// it blocks the report if the cluster type is not valid or leak from test env
cluster_type: telemetry_cluster_type_from_env_var()?,
object_store_media_type: self.object_store_media_type,
})
}
Expand Down

0 comments on commit e0e0e78

Please sign in to comment.