From 03f21b3b3237486fd099b47c47f2b26276f70b45 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 1 Mar 2024 16:02:14 -0500 Subject: [PATCH] Fix column exclusion (initial-load) (#1420) --- flow/workflows/cdc_flow.go | 7 ++++++- flow/workflows/snapshot_flow.go | 11 ++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e0629d5233..1891d7f6c1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -383,7 +383,12 @@ func CDCFlowWorkflow( WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) - snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) + snapshotFlowFuture := workflow.ExecuteChildWorkflow( + snapshotFlowCtx, + SnapshotFlowWorkflow, + cfg, + state.SyncFlowOptions.TableNameSchemaMapping, + ) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { w.logger.Error("snapshot flow failed", slog.Any("error", err)) return state, fmt.Errorf("failed to execute snapshot workflow: %w", err) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index a42110f769..050bc604e5 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -238,10 +238,15 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( return nil } -func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error { +func SnapshotFlowWorkflow( + ctx workflow.Context, + config *protos.FlowConnectionConfigs, + tableNameSchemaMapping map[string]*protos.TableSchema, +) error { se := &SnapshotFlowExecution{ - config: config, - logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), + config: config, + tableNameSchemaMapping: tableNameSchemaMapping, + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), } numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1))