Skip to content

Commit

Permalink
column exclusion - properly handle schema changes (#1512)
Browse files Browse the repository at this point in the history
Column exclusions works by removing columns from the schema we fetch of
the source table. This exclusion was not being done in the code path for
schema changes [where we fetch the schema again], causing a disconnect
and normalize to fail.

Fixed by moving the exclusion code to a separate function and making
both code paths use it. Also CDC handles excluded columns earlier to
prevent spurious logs.
  • Loading branch information
heavycrystal authored Mar 20, 2024
1 parent 966e4a5 commit af39551
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 84 deletions.
11 changes: 7 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,10 +739,13 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if _, ok := prevRelMap[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(currRelMap[column.Name]),
})
// only add to delta if not excluded
if _, ok := p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Exclude[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(currRelMap[column.Name]),
})
}
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name] != currRelMap[column.Name] {
Expand Down
22 changes: 4 additions & 18 deletions flow/model/cdc_record_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,8 @@ func (r *CDCRecordStream) GetRecords() <-chan Record {
return r.records
}

func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta) {
if tm, ok := tableNameMapping[delta.SrcTableName]; ok && len(tm.Exclude) != 0 {
added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns))
for _, column := range delta.AddedColumns {
if _, has := tm.Exclude[column.ColumnName]; !has {
added = append(added, column)
}
}
if len(added) != 0 {
r.SchemaDeltas = append(r.SchemaDeltas, &protos.TableSchemaDelta{
SrcTableName: delta.SrcTableName,
DstTableName: delta.DstTableName,
AddedColumns: added,
})
}
} else {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude,
delta *protos.TableSchemaDelta,
) {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
26 changes: 0 additions & 26 deletions flow/shared/additional_tables.go

This file was deleted.

76 changes: 76 additions & 0 deletions flow/shared/schema_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package shared

import (
"log/slog"
"slices"

"go.temporal.io/sdk/log"
"golang.org/x/exp/maps"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
additionalTableMappings []*protos.TableMapping,
) bool {
currentSrcTables := make([]string, 0, len(currentTableMappings))
currentDstTables := make([]string, 0, len(currentTableMappings))
additionalSrcTables := make([]string, 0, len(additionalTableMappings))
additionalDstTables := make([]string, 0, len(additionalTableMappings))

for _, currentTableMapping := range currentTableMappings {
currentSrcTables = append(currentSrcTables, currentTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier)
}
for _, additionalTableMapping := range additionalTableMappings {
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
ArraysHaveOverlap(currentDstTables, additionalDstTables)
}

// given the output of GetTableSchema, processes it to be used by CDCFlow
// 1) changes the map key to be the destination table name instead of the source table name
// 2) performs column exclusion using protos.TableMapping as input.
func BuildProcessedSchemaMapping(tableMappings []*protos.TableMapping,
tableNameSchemaMapping map[string]*protos.TableSchema,
logger log.Logger,
) map[string]*protos.TableSchema {
processedSchemaMapping := make(map[string]*protos.TableSchema)
sortedSourceTables := maps.Keys(tableNameSchemaMapping)
slices.Sort(sortedSourceTables)

for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
var dstTableName string
for _, mapping := range tableMappings {
if mapping.SourceTableIdentifier == srcTableName {
dstTableName = mapping.DestinationTableIdentifier
if len(mapping.Exclude) != 0 {
columnCount := len(tableSchema.Columns)
columns := make([]*protos.FieldDescription, 0, columnCount)
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) {
columns = append(columns, column)
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: tableSchema.PrimaryKeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
Columns: columns,
}
}
break
}
}
processedSchemaMapping[dstTableName] = tableSchema

logger.Info("normalized table schema",
slog.String("table", dstTableName),
slog.Any("schema", tableSchema))
}
return processedSchemaMapping
}
31 changes: 2 additions & 29 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package peerflow
import (
"fmt"
"log/slog"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -201,34 +200,8 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sortedSourceTables)

s.logger.Info("setting up normalized tables for peer flow")
normalizedTableMapping := make(map[string]*protos.TableSchema)
for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
normalizedTableName := s.tableNameMapping[srcTableName]
for _, mapping := range flowConnectionConfigs.TableMappings {
if mapping.SourceTableIdentifier == srcTableName {
if len(mapping.Exclude) != 0 {
columnCount := len(tableSchema.Columns)
columns := make([]*protos.FieldDescription, 0, columnCount)
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) {
columns = append(columns, column)
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: tableSchema.PrimaryKeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
Columns: columns,
}
}
break
}
}
normalizedTableMapping[normalizedTableName] = tableSchema

s.logger.Info("normalized table schema", slog.String("table", normalizedTableName), slog.Any("schema", tableSchema))
}
normalizedTableMapping := shared.BuildProcessedSchemaMapping(flowConnectionConfigs.TableMappings,
tableNameSchemaMapping, s.logger)

// now setup the normalized tables on the destination peer
setupConfig := &protos.SetupNormalizedTableBatchInput{
Expand Down
12 changes: 5 additions & 7 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
"golang.org/x/exp/maps"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -139,12 +140,10 @@ func SyncFlowWorkflow(
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
modifiedDstTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand All @@ -167,10 +166,9 @@ func SyncFlowWorkflow(
nil,
).Get(ctx, nil)
} else {
for i, srcTable := range modifiedSrcTables {
dstTable := modifiedDstTables[i]
options.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
processedSchemaMapping := shared.BuildProcessedSchemaMapping(options.TableMappings,
getModifiedSchemaRes.TableNameSchemaMapping, logger)
maps.Copy(options.TableNameSchemaMapping, processedSchemaMapping)
}
}

Expand Down

0 comments on commit af39551

Please sign in to comment.