diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 70d3376fb4..299097eff2 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -76,13 +76,14 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( func (s *SnapshotFlowExecution) cloneTable( childCtx workflow.Context, snapshotName string, - sourceTable string, + sourceTableName string, destinationTableName string, ) (workflow.Future, error) { flowName := s.config.FlowJobName - + srcName := sourceTableName + dstName := destinationTableName childWorkflowIDSideEffect := workflow.SideEffect(childCtx, func(ctx workflow.Context) interface{} { - childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, destinationTableName, uuid.New().String()) + childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, uuid.New().String()) reg := regexp.MustCompile("[^a-zA-Z0-9]+") return reg.ReplaceAllString(childWorkflowID, "_") }) @@ -93,14 +94,14 @@ func (s *SnapshotFlowExecution) cloneTable( "flowName": flowName, "snapshotName": snapshotName, }).Errorf("failed to get child id for source table %s and destination table %s", - sourceTable, destinationTableName) + srcName, dstName) return nil, fmt.Errorf("failed to get child workflow ID: %w", err) } logrus.WithFields(logrus.Fields{ "flowName": flowName, "snapshotName": snapshotName, }).Infof("Obtained child id %s for source table %s and destination table %s", - childWorkflowID, sourceTable, destinationTableName) + childWorkflowID, srcName, dstName) childCtx = workflow.WithChildOptions(childCtx, workflow.ChildWorkflowOptions{ WorkflowID: childWorkflowID, @@ -118,7 +119,7 @@ func (s *SnapshotFlowExecution) cloneTable( sourcePostgres := s.config.Source sourcePostgres.GetPostgresConfig().TransactionSnapshot = snapshotName - query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", sourceTable) + query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", srcName) numWorkers := uint32(8) if s.config.SnapshotMaxParallelWorkers > 0 { @@ -136,9 +137,9 @@ func (s *SnapshotFlowExecution) cloneTable( DestinationPeer: s.config.Destination, Query: query, WatermarkColumn: "ctid", - WatermarkTable: sourceTable, + WatermarkTable: srcName, InitialCopyOnly: true, - DestinationTableIdentifier: destinationTableName, + DestinationTableIdentifier: dstName, NumRowsPerPartition: numRowsPerPartition, SyncMode: s.config.SnapshotSyncMode, MaxParallelWorkers: numWorkers,