diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 2fadcbbe62..e5f861b763 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -108,10 +108,9 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex }) tableSchemaInput := &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: q.config.SourcePeer, - TableIdentifiers: []string{q.config.WatermarkTable}, - FlowName: q.config.FlowJobName, - SkipPkeyAndReplicaCheck: true, + PeerConnectionConfig: q.config.SourcePeer, + TableIdentifiers: []string{q.config.WatermarkTable}, + FlowName: q.config.FlowJobName, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index d5820a5cb0..c2a1a5f306 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -182,10 +182,9 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sort.Strings(sourceTables) tableSchemaInput := &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: flowConnectionConfigs.Source, - TableIdentifiers: sourceTables, - FlowName: s.cdcFlowName, - SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly, + PeerConnectionConfig: flowConnectionConfigs.Source, + TableIdentifiers: sourceTables, + FlowName: s.cdcFlowName, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)