diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 689db44ffc..8c5b03db01 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -197,17 +197,18 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( for _, srcTableName := range sortedSourceTables { tableSchema := tableNameSchemaMapping[srcTableName] normalizedTableName := s.tableNameMapping[srcTableName] - // TODO source can be used for multiple targets, need to adjust based on destination for _, mapping := range flowConnectionConfigs.TableMappings { - if mapping.SourceTableIdentifier == srcTableName && len(mapping.Exclude) != 0 { - tableSchema = &protos.TableSchema{ - TableIdentifier: tableSchema.TableIdentifier, - Columns: maps.Clone(tableSchema.Columns), - PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, - IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, - } - for _, exclude := range mapping.Exclude { - delete(tableSchema.Columns, exclude) + if mapping.SourceTableIdentifier == srcTableName { + if len(mapping.Exclude) != 0 { + tableSchema = &protos.TableSchema{ + TableIdentifier: tableSchema.TableIdentifier, + Columns: maps.Clone(tableSchema.Columns), + PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, + IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, + } + for _, exclude := range mapping.Exclude { + delete(tableSchema.Columns, exclude) + } } break } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index a18ef34d12..2e4622cdb8 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -138,7 +138,6 @@ func (s *SnapshotFlowExecution) cloneTable( } from := "*" if len(mapping.Exclude) != 0 { - // TODO need to verify destination is right for _, v := range s.config.TableNameSchemaMapping { if v.TableIdentifier == srcName { from = strings.Join(maps.Keys(v.Columns), ",")