From 92bba72ff6ecc014497af7c3a9fb97d283186f21 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 13 Dec 2024 22:22:22 +0530 Subject: [PATCH] change better and now twice --- flow/shared/schema_helpers.go | 43 +++++++++++++++++++++------------ flow/workflows/snapshot_flow.go | 8 +++--- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/flow/shared/schema_helpers.go b/flow/shared/schema_helpers.go index 7c1d787e0..5927ee184 100644 --- a/flow/shared/schema_helpers.go +++ b/flow/shared/schema_helpers.go @@ -49,25 +49,27 @@ func BuildProcessedSchemaMapping( for _, mapping := range tableMappings { if mapping.SourceTableIdentifier == srcTableName { dstTableName = mapping.DestinationTableIdentifier - columns := make([]*protos.FieldDescription, 0, len(tableSchema.Columns)) - pkeyColumns := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) - for _, column := range tableSchema.Columns { - if !slices.Contains(mapping.Exclude, column.Name) && !strings.Contains(column.Name, "-") { - columns = append(columns, column) + if len(mapping.Exclude) != 0 || TableSchemaHasColumnsWithSubstr(tableSchema, "-") { + columns := make([]*protos.FieldDescription, 0, len(tableSchema.Columns)) + pkeyColumns := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) + for _, column := range tableSchema.Columns { + if !slices.Contains(mapping.Exclude, column.Name) && !strings.Contains(column.Name, "-") { + columns = append(columns, column) + } + if slices.Contains(tableSchema.PrimaryKeyColumns, column.Name) && + !slices.Contains(mapping.Exclude, column.Name) { + pkeyColumns = append(pkeyColumns, column.Name) + } } - if slices.Contains(tableSchema.PrimaryKeyColumns, column.Name) && - !slices.Contains(mapping.Exclude, column.Name) { - pkeyColumns = append(pkeyColumns, column.Name) + tableSchema = &protos.TableSchema{ + TableIdentifier: tableSchema.TableIdentifier, + PrimaryKeyColumns: pkeyColumns, + IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, + NullableEnabled: tableSchema.NullableEnabled, + System: tableSchema.System, + Columns: columns, } } - tableSchema = &protos.TableSchema{ - TableIdentifier: tableSchema.TableIdentifier, - PrimaryKeyColumns: pkeyColumns, - IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, - NullableEnabled: tableSchema.NullableEnabled, - System: tableSchema.System, - Columns: columns, - } break } } @@ -79,3 +81,12 @@ func BuildProcessedSchemaMapping( } return processedSchemaMapping } + +func TableSchemaHasColumnsWithSubstr(tableSchema *protos.TableSchema, substr string) bool { + for _, column := range tableSchema.Columns { + if strings.Contains(column.Name, substr) { + return true + } + } + return false +} diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 1db3b6d60..93b7bc045 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -141,10 +141,10 @@ func (s *SnapshotFlowExecution) cloneTable( return fmt.Errorf("unable to parse source table: %w", err) } from := "*" - if len(mapping.Exclude) != 0 { - if err := initTableSchema(); err != nil { - return err - } + if err := initTableSchema(); err != nil { + return err + } + if len(mapping.Exclude) != 0 || shared.TableSchemaHasColumnsWithSubstr(tableSchema, "-") { quotedColumns := make([]string, 0, len(tableSchema.Columns)) for _, col := range tableSchema.Columns { if !slices.Contains(mapping.Exclude, col.Name) {