diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java index 01118d65137..c480ad1d454 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.sqlserver; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -61,6 +64,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -199,17 +204,24 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + private transient SourceExactlyMetric sourceExactlyMetric; + + private final MetricOption metricOption; + + private transient Map checkpointStartTimeMap; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -222,6 +234,14 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + if (metricOption != null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .setSourceExactlyMetric(sourceExactlyMetric); + } + this.checkpointStartTimeMap = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -306,17 +326,33 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { - if (handover.hasError()) { - LOG.debug("snapshotState() called on closed source"); - throw new FlinkRuntimeException( - "Call snapshotState() on closed source, checkpoint failed."); - } else { - snapshotOffsetState(functionSnapshotContext.getCheckpointId()); - snapshotHistoryRecordsState(); - } - if (deserializer instanceof RowDataDebeziumDeserializeSchema) { - ((RowDataDebeziumDeserializeSchema) deserializer) - .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + try { + if (handover.hasError()) { + LOG.debug("snapshotState() called on closed source"); + throw new FlinkRuntimeException( + "Call snapshotState() on closed source, checkpoint failed."); + } else { + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); + snapshotHistoryRecordsState(); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + } + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } + + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } + throw e; } } @@ -498,6 +534,16 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric.recordSnapshotToCheckpointDelay( + System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java index d90f4705131..394ee0297be 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java @@ -139,37 +139,49 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - Envelope.Operation op = Envelope.operationFor(record); - Struct value = (Struct) record.value(); - Schema valueSchema = record.valueSchema(); - if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { - GenericRowData insert = extractAfterRow(value, valueSchema); - validator.validate(insert, RowKind.INSERT); - insert.setRowKind(RowKind.INSERT); - if (sourceExactlyMetric != null) { - out = new MetricsCollector<>(out, sourceExactlyMetric); + long deserializeStartTime = System.currentTimeMillis(); + try { + Envelope.Operation op = Envelope.operationFor(record); + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + GenericRowData insert = extractAfterRow(value, valueSchema); + validator.validate(insert, RowKind.INSERT); + insert.setRowKind(RowKind.INSERT); + if (sourceExactlyMetric != null) { + out = new MetricsCollector<>(out, sourceExactlyMetric); + } + emit(record, insert, out); + } else if (op == Envelope.Operation.DELETE) { + GenericRowData delete = extractBeforeRow(value, valueSchema); + validator.validate(delete, RowKind.DELETE); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, out); + } else { + if (changelogMode == DebeziumChangelogMode.ALL) { + GenericRowData before = extractBeforeRow(value, valueSchema); + validator.validate(before, RowKind.UPDATE_BEFORE); + before.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, before, out); + } + + GenericRowData after = extractAfterRow(value, valueSchema); + validator.validate(after, RowKind.UPDATE_AFTER); + after.setRowKind(RowKind.UPDATE_AFTER); + if (sourceExactlyMetric != null) { + out = new MetricsCollector<>(out, sourceExactlyMetric); + } + emit(record, after, out); } - emit(record, insert, out); - } else if (op == Envelope.Operation.DELETE) { - GenericRowData delete = extractBeforeRow(value, valueSchema); - validator.validate(delete, RowKind.DELETE); - delete.setRowKind(RowKind.DELETE); - emit(record, delete, out); - } else { - if (changelogMode == DebeziumChangelogMode.ALL) { - GenericRowData before = extractBeforeRow(value, valueSchema); - validator.validate(before, RowKind.UPDATE_BEFORE); - before.setRowKind(RowKind.UPDATE_BEFORE); - emit(record, before, out); + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); } - - GenericRowData after = extractAfterRow(value, valueSchema); - validator.validate(after, RowKind.UPDATE_AFTER); - after.setRowKind(RowKind.UPDATE_AFTER); + } catch (Exception e) { if (sourceExactlyMetric != null) { - out = new MetricsCollector<>(out, sourceExactlyMetric); + sourceExactlyMetric.incNumDeserializeError(); } - emit(record, after, out); + throw e; } } @@ -697,4 +709,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** allow DebeziumSourceFunction to set the SourceExactlyMetric */ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java index 6a094521a5d..92353bf0cf6 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.sqlserver; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.connectors.sqlserver.SqlServerValidator; import com.ververica.cdc.connectors.sqlserver.table.StartupOptions; import io.debezium.connector.sqlserver.SqlServerConnector; @@ -51,6 +53,7 @@ public static class Builder { private Properties dbzProperties; private StartupOptions startupOptions = StartupOptions.initial(); private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; public Builder hostname(String hostname) { this.hostname = hostname; @@ -114,6 +117,12 @@ public Builder startupOptions(StartupOptions startupOptions) { return this; } + /** metricOption used to instantiate SourceExactlyMetric when inlong.metric.labels is present in flink sql */ + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName()); @@ -154,7 +163,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( - deserializer, props, null, new SqlServerValidator(props)); + deserializer, props, null, new SqlServerValidator(props), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java index c49dd9747af..87defcedca4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java @@ -144,6 +144,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .debeziumProperties(dbzProperties) .startupOptions(startupOptions) .deserializer(deserializer) + .metricOption(metricOption) .build(); return SourceFunctionProvider.of(sourceFunction, false); }