diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e0629d5233..1891d7f6c1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -383,7 +383,12 @@ func CDCFlowWorkflow( WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) - snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) + snapshotFlowFuture := workflow.ExecuteChildWorkflow( + snapshotFlowCtx, + SnapshotFlowWorkflow, + cfg, + state.SyncFlowOptions.TableNameSchemaMapping, + ) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { w.logger.Error("snapshot flow failed", slog.Any("error", err)) return state, fmt.Errorf("failed to execute snapshot workflow: %w", err) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index a42110f769..050bc604e5 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -238,10 +238,15 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( return nil } -func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error { +func SnapshotFlowWorkflow( + ctx workflow.Context, + config *protos.FlowConnectionConfigs, + tableNameSchemaMapping map[string]*protos.TableSchema, +) error { se := &SnapshotFlowExecution{ - config: config, - logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), + config: config, + tableNameSchemaMapping: tableNameSchemaMapping, + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), } numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1))