Skip to content

Commit

Permalink
minor oops
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 22, 2024
1 parent ac89784 commit 3e6569e
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 8 deletions.
7 changes: 3 additions & 4 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
SkipPkeyAndReplicaCheck: true,
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
7 changes: 3 additions & 4 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,9 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sourceTables)

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly,
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down

0 comments on commit 3e6569e

Please sign in to comment.