Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11357][Sort] Add new source metrics for sort-connector-sqlserver-cdc-v1.15 #11358

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.sqlserver;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -61,6 +64,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -199,17 +204,24 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private transient SourceExactlyMetric sourceExactlyMetric;

private final MetricOption metricOption;

private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator, MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -222,6 +234,14 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
this.checkpointStartTimeMap = new HashMap<>();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -306,17 +326,33 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
try {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}

if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
throw e;
}
}

Expand Down Expand Up @@ -498,6 +534,16 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric.recordSnapshotToCheckpointDelay(
System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,49 @@ public void open() {

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
long deserializeStartTime = System.currentTimeMillis();
try {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
} else {
if (changelogMode == DebeziumChangelogMode.ALL) {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, before, out);
}

GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, after, out);
}
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
} else {
if (changelogMode == DebeziumChangelogMode.ALL) {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, before, out);
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}

GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
sourceExactlyMetric.incNumDeserializeError();
}
emit(record, after, out);
throw e;
}
}

Expand Down Expand Up @@ -697,4 +709,9 @@ public void updateLastCheckpointId(long checkpointId) {
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

/** allow DebeziumSourceFunction to set the SourceExactlyMetric */
public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) {
this.sourceExactlyMetric = sourceExactlyMetric;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.sqlserver;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import io.debezium.connector.sqlserver.SqlServerConnector;
Expand Down Expand Up @@ -51,6 +53,7 @@ public static class Builder<T> {
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

public Builder<T> hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -114,6 +117,12 @@ public Builder<T> startupOptions(StartupOptions startupOptions) {
return this;
}

/** metricOption used to instantiate SourceExactlyMetric when inlong.metric.labels is present in flink sql */
public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
Expand Down Expand Up @@ -154,7 +163,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
deserializer, props, null, new SqlServerValidator(props));
deserializer, props, null, new SqlServerValidator(props), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer)
.metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Expand Down
Loading