diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java index 7962b6f4cdb..01118d65137 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java @@ -314,6 +314,10 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + } } private void snapshotOffsetState(long checkpointId) throws Exception { @@ -488,6 +492,12 @@ public void notifyCheckpointComplete(long checkpointId) { DebeziumOffset offset = DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets); changeConsumer.commitOffset(offset); + + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + RowDataDebeziumDeserializeSchema schema = (RowDataDebeziumDeserializeSchema) deserializer; + schema.flushAudit(); + schema.updateLastCheckpointId(checkpointId); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java index 2e632c1cb2e..d90f4705131 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java @@ -19,7 +19,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import com.ververica.cdc.debezium.table.AppendMetadataCollector; import com.ververica.cdc.debezium.table.DebeziumChangelogMode; @@ -101,7 +101,7 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; private final MetricOption metricOption; - private SourceMetricData sourceMetricData; + private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ public static Builder newBuilder() { @@ -133,7 +133,7 @@ public static Builder newBuilder() { @Override public void open() { if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption); + sourceExactlyMetric = new SourceExactlyMetric(metricOption); } } @@ -146,8 +146,8 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData insert = extractAfterRow(value, valueSchema); validator.validate(insert, RowKind.INSERT); insert.setRowKind(RowKind.INSERT); - if (sourceMetricData != null) { - out = new MetricsCollector<>(out, sourceMetricData); + if (sourceExactlyMetric != null) { + out = new MetricsCollector<>(out, sourceExactlyMetric); } emit(record, insert, out); } else if (op == Envelope.Operation.DELETE) { @@ -166,8 +166,8 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData after = extractAfterRow(value, valueSchema); validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); - if (sourceMetricData != null) { - out = new MetricsCollector<>(out, sourceMetricData); + if (sourceExactlyMetric != null) { + out = new MetricsCollector<>(out, sourceExactlyMetric); } emit(record, after, out); } @@ -679,4 +679,22 @@ public Object convert(Object dbzObj, Schema schema) throws Exception { } }; } + + public void flushAudit() { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.flushAudit(); + } + } + + public void updateCurrentCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateCurrentCheckpointId(checkpointId); + } + } + + public void updateLastCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateLastCheckpointId(checkpointId); + } + } }