From 1c2fae622a60ed8f4ea41303c97e29ac9ec90e23 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Fri, 27 Oct 2023 08:04:19 +0000 Subject: [PATCH] unable to emit metric should not fail the pipeline --- pom.xml | 2 +- .../plugin/gcp/bigquery/action/BigQueryExecute.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 29c6f8017d..c6cc48e3e3 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin google-cloud - 0.22.4 + 0.22.5 Google Cloud Plugins jar Plugins for Google Big Query diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 714b360621..1a27c25445 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -169,6 +169,17 @@ public void run(ActionContext context) throws Exception { } } + context.getMetrics().gauge(RECORDS_PROCESSED, rows); + try { + recordBytesProcessedMetric(context, queryJob); + } catch (Exception exception) { + // log the exception but not fail the pipeline + LOG.warn("Exception while trying to emit bytes processed metric.", + exception); + } + } + + private void recordBytesProcessedMetric(ActionContext context, Job queryJob) { long processedBytes = ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed(); LOG.info("Job {} processed {} bytes", queryJob.getJobId(), processedBytes); @@ -176,7 +187,6 @@ public void run(ActionContext context) throws Exception { .put(Constants.Metrics.Tag.APP_ENTITY_TYPE, Action.PLUGIN_TYPE) .put(Constants.Metrics.Tag.APP_ENTITY_TYPE_NAME, BigQueryExecute.NAME) .build(); - context.getMetrics().gauge(RECORDS_PROCESSED, rows); context.getMetrics().child(tags).countLong(BigQuerySinkUtils.BYTES_PROCESSED_METRIC, processedBytes); }