Skip to content

Commit

Permalink
Merge branch 'main' into ui/remove-pub-load-check
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Mar 20, 2024
2 parents 748d20b + af39551 commit 243a29b
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 84 deletions.
11 changes: 7 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,10 +739,13 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if _, ok := prevRelMap[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(currRelMap[column.Name]),
})
// only add to delta if not excluded
if _, ok := p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Exclude[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(currRelMap[column.Name]),
})
}
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name] != currRelMap[column.Name] {
Expand Down
22 changes: 4 additions & 18 deletions flow/model/cdc_record_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,8 @@ func (r *CDCRecordStream) GetRecords() <-chan Record {
return r.records
}

func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta) {
if tm, ok := tableNameMapping[delta.SrcTableName]; ok && len(tm.Exclude) != 0 {
added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns))
for _, column := range delta.AddedColumns {
if _, has := tm.Exclude[column.ColumnName]; !has {
added = append(added, column)
}
}
if len(added) != 0 {
r.SchemaDeltas = append(r.SchemaDeltas, &protos.TableSchemaDelta{
SrcTableName: delta.SrcTableName,
DstTableName: delta.DstTableName,
AddedColumns: added,
})
}
} else {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude,
delta *protos.TableSchemaDelta,
) {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
26 changes: 0 additions & 26 deletions flow/shared/additional_tables.go

This file was deleted.

76 changes: 76 additions & 0 deletions flow/shared/schema_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package shared

import (
"log/slog"
"slices"

"go.temporal.io/sdk/log"
"golang.org/x/exp/maps"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
additionalTableMappings []*protos.TableMapping,
) bool {
currentSrcTables := make([]string, 0, len(currentTableMappings))
currentDstTables := make([]string, 0, len(currentTableMappings))
additionalSrcTables := make([]string, 0, len(additionalTableMappings))
additionalDstTables := make([]string, 0, len(additionalTableMappings))

for _, currentTableMapping := range currentTableMappings {
currentSrcTables = append(currentSrcTables, currentTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier)
}
for _, additionalTableMapping := range additionalTableMappings {
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
ArraysHaveOverlap(currentDstTables, additionalDstTables)
}

// given the output of GetTableSchema, processes it to be used by CDCFlow
// 1) changes the map key to be the destination table name instead of the source table name
// 2) performs column exclusion using protos.TableMapping as input.
func BuildProcessedSchemaMapping(tableMappings []*protos.TableMapping,
tableNameSchemaMapping map[string]*protos.TableSchema,
logger log.Logger,
) map[string]*protos.TableSchema {
processedSchemaMapping := make(map[string]*protos.TableSchema)
sortedSourceTables := maps.Keys(tableNameSchemaMapping)
slices.Sort(sortedSourceTables)

for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
var dstTableName string
for _, mapping := range tableMappings {
if mapping.SourceTableIdentifier == srcTableName {
dstTableName = mapping.DestinationTableIdentifier
if len(mapping.Exclude) != 0 {
columnCount := len(tableSchema.Columns)
columns := make([]*protos.FieldDescription, 0, columnCount)
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) {
columns = append(columns, column)
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: tableSchema.PrimaryKeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
Columns: columns,
}
}
break
}
}
processedSchemaMapping[dstTableName] = tableSchema

logger.Info("normalized table schema",
slog.String("table", dstTableName),
slog.Any("schema", tableSchema))
}
return processedSchemaMapping
}
31 changes: 2 additions & 29 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package peerflow
import (
"fmt"
"log/slog"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -201,34 +200,8 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sortedSourceTables)

s.logger.Info("setting up normalized tables for peer flow")
normalizedTableMapping := make(map[string]*protos.TableSchema)
for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
normalizedTableName := s.tableNameMapping[srcTableName]
for _, mapping := range flowConnectionConfigs.TableMappings {
if mapping.SourceTableIdentifier == srcTableName {
if len(mapping.Exclude) != 0 {
columnCount := len(tableSchema.Columns)
columns := make([]*protos.FieldDescription, 0, columnCount)
for _, column := range tableSchema.Columns {
if !slices.Contains(mapping.Exclude, column.Name) {
columns = append(columns, column)
}
}
tableSchema = &protos.TableSchema{
TableIdentifier: tableSchema.TableIdentifier,
PrimaryKeyColumns: tableSchema.PrimaryKeyColumns,
IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull,
Columns: columns,
}
}
break
}
}
normalizedTableMapping[normalizedTableName] = tableSchema

s.logger.Info("normalized table schema", slog.String("table", normalizedTableName), slog.Any("schema", tableSchema))
}
normalizedTableMapping := shared.BuildProcessedSchemaMapping(flowConnectionConfigs.TableMappings,
tableNameSchemaMapping, s.logger)

// now setup the normalized tables on the destination peer
setupConfig := &protos.SetupNormalizedTableBatchInput{
Expand Down
12 changes: 5 additions & 7 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
"golang.org/x/exp/maps"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -139,12 +140,10 @@ func SyncFlowWorkflow(
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
modifiedDstTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand All @@ -167,10 +166,9 @@ func SyncFlowWorkflow(
nil,
).Get(ctx, nil)
} else {
for i, srcTable := range modifiedSrcTables {
dstTable := modifiedDstTables[i]
options.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
processedSchemaMapping := shared.BuildProcessedSchemaMapping(options.TableMappings,
getModifiedSchemaRes.TableNameSchemaMapping, logger)
maps.Copy(options.TableNameSchemaMapping, processedSchemaMapping)
}
}

Expand Down

0 comments on commit 243a29b

Please sign in to comment.