Skip to content

Commit

Permalink
Fix column exclusion (initial-load) (#1420)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Mar 1, 2024
1 parent 54ad0ef commit 03f21b3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
7 changes: 6 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,12 @@ func CDCFlowWorkflow(
WaitForCancellation: true,
}
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(
snapshotFlowCtx,
SnapshotFlowWorkflow,
cfg,
state.SyncFlowOptions.TableNameSchemaMapping,
)
if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil {
w.logger.Error("snapshot flow failed", slog.Any("error", err))
return state, fmt.Errorf("failed to execute snapshot workflow: %w", err)
Expand Down
11 changes: 8 additions & 3 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,15 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
return nil
}

func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error {
func SnapshotFlowWorkflow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
tableNameSchemaMapping map[string]*protos.TableSchema,
) error {
se := &SnapshotFlowExecution{
config: config,
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
config: config,
tableNameSchemaMapping: tableNameSchemaMapping,
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
}

numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1))
Expand Down

0 comments on commit 03f21b3

Please sign in to comment.