diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index a0b71c554c8..a5252402c80 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -372,6 +372,12 @@ public void incNumDeserializeSuccess() { } } + public void decNumDeserializeSuccess() { + if (numDeserializeSuccess != null) { + numDeserializeSuccess.dec(); + } + } + public void incNumDeserializeError() { if (numDeserializeError != null) { numDeserializeError.inc(); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index 83234daae8f..5fef1d95c52 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -96,6 +96,10 @@ public void deserialize(Message message, Collector collector) throws IOException { try { long deserializeStartTime = System.currentTimeMillis(); + // increase the number of deserialize success first, if deserialize failed, decrease it + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + } // Get the key row data List keyRowData = new ArrayList<>(); if (keyDeserialization != null) { @@ -119,11 +123,11 @@ public void deserialize(Message message, Collector collector) message, keyRowData, valueRowData, metricsCollector); if (sourceExactlyMetric != null) { sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); - sourceExactlyMetric.incNumDeserializeSuccess(); } } catch (Exception e) { if (sourceExactlyMetric != null) { sourceExactlyMetric.incNumDeserializeError(); + sourceExactlyMetric.decNumDeserializeSuccess(); } throw e; }