diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
index 01118d65137..c480ad1d454 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.sqlserver;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
+
 import com.ververica.cdc.debezium.Validator;
 import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
 import com.ververica.cdc.debezium.internal.DebeziumOffset;
@@ -61,6 +64,8 @@
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -199,17 +204,24 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
     /** Buffer the events from the source and record the errors from the debezium. */
     private transient Handover handover;
 
+    private transient SourceExactlyMetric sourceExactlyMetric;
+
+    private final MetricOption metricOption;
+
+    private transient Map<Long, Long> checkpointStartTimeMap;
+
     // ---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator) {
+            Validator validator, MetricOption metricOption) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
+        this.metricOption = metricOption;
     }
 
     @Override
@@ -222,6 +234,14 @@ public void open(Configuration parameters) throws Exception {
         this.executor = Executors.newSingleThreadExecutor(threadFactory);
         this.handover = new Handover();
         this.changeConsumer = new DebeziumChangeConsumer(handover);
+        if (metricOption != null) {
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
+        }
+        if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+            ((RowDataDebeziumDeserializeSchema) deserializer)
+                    .setSourceExactlyMetric(sourceExactlyMetric);
+        }
+        this.checkpointStartTimeMap = new HashMap<>();
     }
 
     // ------------------------------------------------------------------------
@@ -306,17 +326,33 @@ private void restoreHistoryRecordsState() throws Exception {
 
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
-        if (handover.hasError()) {
-            LOG.debug("snapshotState() called on closed source");
-            throw new FlinkRuntimeException(
-                    "Call snapshotState() on closed source, checkpoint failed.");
-        } else {
-            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
-            snapshotHistoryRecordsState();
-        }
-        if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
-            ((RowDataDebeziumDeserializeSchema) deserializer)
-                    .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+        try {
+            if (handover.hasError()) {
+                LOG.debug("snapshotState() called on closed source");
+                throw new FlinkRuntimeException(
+                        "Call snapshotState() on closed source, checkpoint failed.");
+            } else {
+                snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+                snapshotHistoryRecordsState();
+            }
+            if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+                ((RowDataDebeziumDeserializeSchema) deserializer)
+                        .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+            }
+            if (checkpointStartTimeMap != null) {
+                checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
+            }
+
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();
+            }
+        } catch (Exception e) {
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();
+            }
+            throw e;
         }
     }
 
@@ -498,6 +534,16 @@ public void notifyCheckpointComplete(long checkpointId) {
                 schema.flushAudit();
                 schema.updateLastCheckpointId(checkpointId);
             }
+            if (checkpointStartTimeMap != null) {
+                Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
+                if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
+                    sourceExactlyMetric.incNumSnapshotComplete();
+                    sourceExactlyMetric.recordSnapshotToCheckpointDelay(
+                            System.currentTimeMillis() - snapShotStartTimeById);
+                }
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
+            }
         } catch (Exception e) {
             // ignore exception if we are no longer running
             LOG.warn("Ignore error when committing offset to database.", e);
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
index d90f4705131..394ee0297be 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
@@ -139,37 +139,49 @@ public void open() {
 
     @Override
     public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
-        Envelope.Operation op = Envelope.operationFor(record);
-        Struct value = (Struct) record.value();
-        Schema valueSchema = record.valueSchema();
-        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
-            GenericRowData insert = extractAfterRow(value, valueSchema);
-            validator.validate(insert, RowKind.INSERT);
-            insert.setRowKind(RowKind.INSERT);
-            if (sourceExactlyMetric != null) {
-                out = new MetricsCollector<>(out, sourceExactlyMetric);
+        long deserializeStartTime = System.currentTimeMillis();
+        try {
+            Envelope.Operation op = Envelope.operationFor(record);
+            Struct value = (Struct) record.value();
+            Schema valueSchema = record.valueSchema();
+            if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+                GenericRowData insert = extractAfterRow(value, valueSchema);
+                validator.validate(insert, RowKind.INSERT);
+                insert.setRowKind(RowKind.INSERT);
+                if (sourceExactlyMetric != null) {
+                    out = new MetricsCollector<>(out, sourceExactlyMetric);
+                }
+                emit(record, insert, out);
+            } else if (op == Envelope.Operation.DELETE) {
+                GenericRowData delete = extractBeforeRow(value, valueSchema);
+                validator.validate(delete, RowKind.DELETE);
+                delete.setRowKind(RowKind.DELETE);
+                emit(record, delete, out);
+            } else {
+                if (changelogMode == DebeziumChangelogMode.ALL) {
+                    GenericRowData before = extractBeforeRow(value, valueSchema);
+                    validator.validate(before, RowKind.UPDATE_BEFORE);
+                    before.setRowKind(RowKind.UPDATE_BEFORE);
+                    emit(record, before, out);
+                }
+
+                GenericRowData after = extractAfterRow(value, valueSchema);
+                validator.validate(after, RowKind.UPDATE_AFTER);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                if (sourceExactlyMetric != null) {
+                    out = new MetricsCollector<>(out, sourceExactlyMetric);
+                }
+                emit(record, after, out);
             }
-            emit(record, insert, out);
-        } else if (op == Envelope.Operation.DELETE) {
-            GenericRowData delete = extractBeforeRow(value, valueSchema);
-            validator.validate(delete, RowKind.DELETE);
-            delete.setRowKind(RowKind.DELETE);
-            emit(record, delete, out);
-        } else {
-            if (changelogMode == DebeziumChangelogMode.ALL) {
-                GenericRowData before = extractBeforeRow(value, valueSchema);
-                validator.validate(before, RowKind.UPDATE_BEFORE);
-                before.setRowKind(RowKind.UPDATE_BEFORE);
-                emit(record, before, out);
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeSuccess();
+                sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
             }
-
-            GenericRowData after = extractAfterRow(value, valueSchema);
-            validator.validate(after, RowKind.UPDATE_AFTER);
-            after.setRowKind(RowKind.UPDATE_AFTER);
+        } catch (Exception e) {
             if (sourceExactlyMetric != null) {
-                out = new MetricsCollector<>(out, sourceExactlyMetric);
+                sourceExactlyMetric.incNumDeserializeError();
             }
-            emit(record, after, out);
+            throw e;
         }
     }
 
@@ -697,4 +709,9 @@ public void updateLastCheckpointId(long checkpointId) {
             sourceExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
+
+    /** allow DebeziumSourceFunction to set the SourceExactlyMetric */
+    public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) {
+        this.sourceExactlyMetric = sourceExactlyMetric;
+    }
 }
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
index 6a094521a5d..92353bf0cf6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.sqlserver;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+
 import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
 import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
 import io.debezium.connector.sqlserver.SqlServerConnector;
@@ -51,6 +53,7 @@ public static class Builder<T> {
         private Properties dbzProperties;
         private StartupOptions startupOptions = StartupOptions.initial();
         private DebeziumDeserializationSchema<T> deserializer;
+        private MetricOption metricOption;
 
         public Builder<T> hostname(String hostname) {
             this.hostname = hostname;
@@ -114,6 +117,12 @@ public Builder<T> startupOptions(StartupOptions startupOptions) {
             return this;
         }
 
+        /** metricOption used to instantiate SourceExactlyMetric when inlong.metric.labels is present in flink sql */
+        public Builder<T> metricOption(MetricOption metricOption) {
+            this.metricOption = metricOption;
+            return this;
+        }
+
         public DebeziumSourceFunction<T> build() {
             Properties props = new Properties();
             props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
@@ -154,7 +163,7 @@ public DebeziumSourceFunction<T> build() {
             }
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, null, new SqlServerValidator(props));
+                    deserializer, props, null, new SqlServerValidator(props), metricOption);
         }
     }
 }
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
index c49dd9747af..87defcedca4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
@@ -144,6 +144,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
                         .debeziumProperties(dbzProperties)
                         .startupOptions(startupOptions)
                         .deserializer(deserializer)
+                        .metricOption(metricOption)
                         .build();
         return SourceFunctionProvider.of(sourceFunction, false);
     }