Skip to content

Commit d9ed3dc

Browse files
authored
Merge branch 'main' into remove-temporal-testsuite
2 parents 5ed27f4 + 03f21b3 commit d9ed3dc

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

flow/workflows/cdc_flow.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,12 @@ func CDCFlowWorkflow(
383383
WaitForCancellation: true,
384384
}
385385
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
386-
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
386+
snapshotFlowFuture := workflow.ExecuteChildWorkflow(
387+
snapshotFlowCtx,
388+
SnapshotFlowWorkflow,
389+
cfg,
390+
state.SyncFlowOptions.TableNameSchemaMapping,
391+
)
387392
if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil {
388393
w.logger.Error("snapshot flow failed", slog.Any("error", err))
389394
return state, fmt.Errorf("failed to execute snapshot workflow: %w", err)

flow/workflows/snapshot_flow.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,15 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
238238
return nil
239239
}
240240

241-
func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error {
241+
func SnapshotFlowWorkflow(
242+
ctx workflow.Context,
243+
config *protos.FlowConnectionConfigs,
244+
tableNameSchemaMapping map[string]*protos.TableSchema,
245+
) error {
242246
se := &SnapshotFlowExecution{
243-
config: config,
244-
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
247+
config: config,
248+
tableNameSchemaMapping: tableNameSchemaMapping,
249+
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
245250
}
246251

247252
numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1))

0 commit comments

Comments
 (0)