Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

column exclusion - properly handle schema changes #1512

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,10 +739,13 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if _, ok := prevRelMap[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(currRelMap[column.Name]),
})
// only add to delta if not excluded
if _, ok := p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Exclude[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(currRelMap[column.Name]),
})
}
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name] != currRelMap[column.Name] {
Expand Down
22 changes: 4 additions & 18 deletions flow/model/cdc_record_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,8 @@ func (r *CDCRecordStream) GetRecords() <-chan Record {
return r.records
}

func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta) {
if tm, ok := tableNameMapping[delta.SrcTableName]; ok && len(tm.Exclude) != 0 {
added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns))
for _, column := range delta.AddedColumns {
if _, has := tm.Exclude[column.ColumnName]; !has {
added = append(added, column)
}
}
if len(added) != 0 {
r.SchemaDeltas = append(r.SchemaDeltas, &protos.TableSchemaDelta{
SrcTableName: delta.SrcTableName,
DstTableName: delta.DstTableName,
AddedColumns: added,
})
}
} else {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude,
delta *protos.TableSchemaDelta,
) {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
26 changes: 0 additions & 26 deletions flow/shared/additional_tables.go

This file was deleted.

76 changes: 76 additions & 0 deletions flow/shared/schema_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package shared

import (
"log/slog"
"slices"

"go.temporal.io/sdk/log"

Check failure on line 7 in flow/shared/schema_helpers.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s prefix(github.com/PeerDB-io) -s default (gci)

"github.com/PeerDB-io/peer-flow/generated/protos"
"golang.org/x/exp/maps"

Check failure on line 10 in flow/shared/schema_helpers.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s prefix(github.com/PeerDB-io) -s default (gci)
)

func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
additionalTableMappings []*protos.TableMapping,
) bool {
currentSrcTables := make([]string, 0, len(currentTableMappings))
currentDstTables := make([]string, 0, len(currentTableMappings))
additionalSrcTables := make([]string, 0, len(additionalTableMappings))
additionalDstTables := make([]string, 0, len(additionalTableMappings))

for _, currentTableMapping := range currentTableMappings {
currentSrcTables = append(currentSrcTables, currentTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier)
}
for _, additionalTableMapping := range additionalTableMappings {
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
ArraysHaveOverlap(currentDstTables, additionalDstTables)
}

// given the output of GetTableSchema, processes it to be used by CDCFlow
// 1) changes the map key to be the destination table name instead of the source table name
// 2) performs column exclusion using protos.TableMapping as input.
func BuildProcessedSchemaMapping(tableMappings []*protos.TableMapping,
tableNameSchemaMapping map[string]*protos.TableSchema,
logger log.Logger,
) map[string]*protos.TableSchema {
processedSchemaMapping := make(map[string]*protos.TableSchema)
sortedSourceTables := maps.Keys(tableNameSchemaMapping)
slices.Sort(sortedSourceTables)

for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
var dstTableName string
for _, mapping := range tableMappings {
if mapping.SourceTableIdentifier == srcTableName {
dstTableName = mapping.DestinationTableIdentifier
if len(mapping.Exclude) != 0 {
columnCount := len(tableSchema.Columns)
columns := make([]*protos.FieldDescription, 0, columnCount)
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) {
columns = append(columns, column)
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: tableSchema.PrimaryKeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
Columns: columns,
}
}
break
}
}
processedSchemaMapping[dstTableName] = tableSchema

logger.Info("normalized table schema",
slog.String("table", dstTableName),
slog.Any("schema", tableSchema))
}
return processedSchemaMapping
}
31 changes: 2 additions & 29 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package peerflow
import (
"fmt"
"log/slog"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -201,34 +200,8 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sortedSourceTables)

s.logger.Info("setting up normalized tables for peer flow")
normalizedTableMapping := make(map[string]*protos.TableSchema)
for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
normalizedTableName := s.tableNameMapping[srcTableName]
for _, mapping := range flowConnectionConfigs.TableMappings {
if mapping.SourceTableIdentifier == srcTableName {
if len(mapping.Exclude) != 0 {
columnCount := len(tableSchema.Columns)
columns := make([]*protos.FieldDescription, 0, columnCount)
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) {
columns = append(columns, column)
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: tableSchema.PrimaryKeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
Columns: columns,
}
}
break
}
}
normalizedTableMapping[normalizedTableName] = tableSchema

s.logger.Info("normalized table schema", slog.String("table", normalizedTableName), slog.Any("schema", tableSchema))
}
normalizedTableMapping := shared.BuildProcessedSchemaMapping(flowConnectionConfigs.TableMappings,
tableNameSchemaMapping, s.logger)

// now setup the normalized tables on the destination peer
setupConfig := &protos.SetupNormalizedTableBatchInput{
Expand Down
12 changes: 5 additions & 7 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
"golang.org/x/exp/maps"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -139,12 +140,10 @@ func SyncFlowWorkflow(
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
modifiedDstTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand All @@ -167,10 +166,9 @@ func SyncFlowWorkflow(
nil,
).Get(ctx, nil)
} else {
for i, srcTable := range modifiedSrcTables {
dstTable := modifiedDstTables[i]
options.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
processedSchemaMapping := shared.BuildProcessedSchemaMapping(options.TableMappings,
getModifiedSchemaRes.TableNameSchemaMapping, logger)
maps.Copy(options.TableNameSchemaMapping, processedSchemaMapping)
}
}

Expand Down
Loading