Skip to content

Commit

Permalink
change better and now twice
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 13, 2024
1 parent 5da46f1 commit 92bba72
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
43 changes: 27 additions & 16 deletions flow/shared/schema_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,27 @@ func BuildProcessedSchemaMapping(
for _, mapping := range tableMappings {
if mapping.SourceTableIdentifier == srcTableName {
dstTableName = mapping.DestinationTableIdentifier
columns := make([]*protos.FieldDescription, 0, len(tableSchema.Columns))
pkeyColumns := make([]string, 0, len(tableSchema.PrimaryKeyColumns))
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) && !strings.Contains(column.Name, "-") {
columns = append(columns, column)
if len(mapping.Exclude) != 0 || TableSchemaHasColumnsWithSubstr(tableSchema, "-") {
columns := make([]*protos.FieldDescription, 0, len(tableSchema.Columns))
pkeyColumns := make([]string, 0, len(tableSchema.PrimaryKeyColumns))
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) && !strings.Contains(column.Name, "-") {
columns = append(columns, column)
}
if slices.Contains(tableSchema.PrimaryKeyColumns, column.Name) &&
!slices.Contains(mapping.Exclude, column.Name) {
pkeyColumns = append(pkeyColumns, column.Name)
}
}
if slices.Contains(tableSchema.PrimaryKeyColumns, column.Name) &&
!slices.Contains(mapping.Exclude, column.Name) {
pkeyColumns = append(pkeyColumns, column.Name)
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: pkeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
NullableEnabled: tableSchema.NullableEnabled,
System: tableSchema.System,
Columns: columns,
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: pkeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
NullableEnabled: tableSchema.NullableEnabled,
System: tableSchema.System,
Columns: columns,
}
break
}
}
Expand All @@ -79,3 +81,12 @@ func BuildProcessedSchemaMapping(
}
return processedSchemaMapping
}

func TableSchemaHasColumnsWithSubstr(tableSchema *protos.TableSchema, substr string) bool {
for _, column := range tableSchema.Columns {
if strings.Contains(column.Name, substr) {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func (s *SnapshotFlowExecution) cloneTable(
return fmt.Errorf("unable to parse source table: %w", err)
}
from := "*"
if len(mapping.Exclude) != 0 {
if err := initTableSchema(); err != nil {
return err
}
if err := initTableSchema(); err != nil {
return err
}
if len(mapping.Exclude) != 0 || shared.TableSchemaHasColumnsWithSubstr(tableSchema, "-") {
quotedColumns := make([]string, 0, len(tableSchema.Columns))
for _, col := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, col.Name) {
Expand Down

0 comments on commit 92bba72

Please sign in to comment.