diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index d6de2d1f171..4edb52b9c7c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -209,7 +209,7 @@ public class DebeziumSourceFunction extends RichSourceFunction private final MetricOption metricOption; /** The map to store the start time of each checkpoint. */ - private transient Map checkpointStartTimeMap = new HashMap<>(); + private transient Map checkpointStartTimeMap; // --------------------------------------------------------------------------------------- @@ -236,7 +236,7 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); - if (sourceExactlyMetric == null && metricOption != null) { + if (metricOption != null) { sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); } if (deserializer instanceof RowDataDebeziumDeserializeSchema) { @@ -244,9 +244,7 @@ public void open(Configuration parameters) throws Exception { .setSourceExactlyMetric(sourceExactlyMetric); } // instantiate checkpointStartTimeMap after restoring from checkpoint - if (checkpointStartTimeMap == null) { - checkpointStartTimeMap = new HashMap<>(); - } + checkpointStartTimeMap = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -361,7 +359,9 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th } else { LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); } - sourceExactlyMetric.incNumSnapshotCreate(); + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } } private void snapshotOffsetState(long checkpointId) throws Exception { @@ -544,7 +544,7 @@ public void notifyCheckpointComplete(long checkpointId) { // get the start time of the currently completed checkpoint if (checkpointStartTimeMap != null) { Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); - if (snapShotStartTimeById != null) { + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { sourceExactlyMetric.incNumCompletedSnapshots(); sourceExactlyMetric .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);