Skip to content

Commit

Permalink
Lots of TODOs throwing code down will test later doubt any of it work…
Browse files Browse the repository at this point in the history
…s but maybe there's something to talk about here
  • Loading branch information
serprex committed Nov 3, 2023
1 parent aa72c90 commit 4c12513
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 17 deletions.
7 changes: 5 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

tblNameMapping := make(map[string]string)
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
tblNameMapping[v.SourceTableIdentifier] = model.NameAndExclude {
Name: v.DestinationTableIdentifier,
Exclude: v.Exclude,
}
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)
Expand Down
26 changes: 16 additions & 10 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"fmt"
"reflect"
"slices"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -25,7 +26,7 @@ type PostgresCDCSource struct {
ctx context.Context
replPool *pgxpool.Pool
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]string
TableNameMapping map[string]model.NameAndExclude
slot string
publication string
relationMessageMapping model.RelationMessageMapping
Expand All @@ -44,7 +45,7 @@ type PostgresCDCConfig struct {
Slot string
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
}

Expand Down Expand Up @@ -439,15 +440,15 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.Tuple, rel)
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -473,12 +474,12 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// create empty map of string to interface{}
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel)
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel)
newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand All @@ -487,7 +488,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
CheckPointID: int64(lsn),
OldItems: oldItems,
NewItems: newItems,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
}, nil
Expand All @@ -514,15 +515,15 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.OldTuple, rel)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -537,6 +538,7 @@ It takes a tuple and a relation message as input and returns
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
exclude []string,
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
Expand All @@ -547,8 +549,12 @@ func (p *PostgresCDCSource) convertTupleToMap(
items := model.NewRecordItems()
unchangedToastColumns := make(map[string]struct{})

// TODO need to adjust idx by subtracting 1 from idx for each previously excluded column?
for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
if slices.Contains(exclude, colName) {
continue
}
switch col.DataType {
case 'n': // null
val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
Expand Down Expand Up @@ -669,7 +675,7 @@ func (p *PostgresCDCSource) processRelationMessage(
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name,
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -207,7 +208,7 @@ func (c *PostgresConnector) createSlotAndPublication(
s *SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]string,
tableNameMapping map[string]model.NameAndExclude,
doInitialCopy bool,
) error {
/*
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,9 +775,16 @@ func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.Set
return fmt.Errorf("error checking for replication slot and publication: %w", err)
}

tableNameMapping := make(map[string]model.NameAndExclude);
for k, v := range req.TableNameMapping {
tableNameMapping[k] = model.NameAndExclude {
Name: v,
Exclude: make([]string, 0),
}
}
// Create the replication slot and publication
err = c.createSlotAndPublication(signal, exists,
slotName, publicationName, req.TableNameMapping, req.DoInitialCopy)
slotName, publicationName, tableNameMapping, req.DoInitialCopy)
if err != nil {
return fmt.Errorf("error creating replication slot and publication: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type NameAndExclude struct {
Name string
Exclude []string
}

type PullRecordsRequest struct {
// FlowJobName is the name of the flow job.
FlowJobName string
Expand All @@ -24,7 +29,7 @@ type PullRecordsRequest struct {
//relId to name Mapping
SrcTableIDNameMapping map[uint32]string
// source to destination table name mapping
TableNameMapping map[string]string
TableNameMapping map[string]NameAndExclude
// tablename to schema mapping
TableNameSchemaMapping map[string]*protos.TableSchema
// override publication name
Expand Down
13 changes: 13 additions & 0 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,20 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
normalizedTableName := s.tableNameMapping[srcTableName]
// TODO srcTableName lookup probably incomplete
// TODO lookup would be <O(N) ideally
// TODO source can be used for multiple targets, need to adjust based on destination
for _, mapping := range flowConnectionConfigs.TableMappings {
if mapping.SourceTableIdentifier == srcTableName && len(mapping.Exclude) != 0 {
// TODO modifying reference, need to copy tableSchema itself
tableSchema.Columns = maps.Clone(tableSchema.Columns)
for _, exclude := range mapping.Exclude {
delete(tableSchema.Columns, exclude)
}
}
}
normalizedTableMapping[normalizedTableName] = tableSchema

s.logger.Info("normalized table schema: ", normalizedTableName, " -> ", tableSchema)
}

Expand Down
23 changes: 21 additions & 2 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package peerflow
import (
"fmt"
"regexp"
"slices"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/concurrency"
Expand Down Expand Up @@ -139,8 +141,25 @@ func (s *SnapshotFlowExecution) cloneTable(
}).Errorf("unable to parse source table")
return fmt.Errorf("unable to parse source table: %w", err)
}
query := fmt.Sprintf("SELECT * FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}",
parsedSrcTable.String(), partitionCol)
from := "*"
if len(mapping.Exclude) != 0 {
var sb strings.Builder
// TODO what's the correct key?
for _, col := range s.config.TableNameSchemaMapping[srcName].Columns {
// TODO optimize lookup
if !slices.Contains(mapping.Exclude, col) {
if sb.Len() != 0 {
sb.WriteString(", ")
}
// TODO escape
sb.WriteString(col)
}
}
from = sb.String()
}

query := fmt.Sprintf("SELECT %s FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}",
from, parsedSrcTable.String(), partitionCol)

numWorkers := uint32(8)
if s.config.SnapshotMaxParallelWorkers > 0 {
Expand Down

0 comments on commit 4c12513

Please sign in to comment.