Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column Exclusion #601

Merged
merged 18 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ jobs:
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
run: |
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
docker restart pg_cdc
working-directory: ./flow
env:
Expand All @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 32 ./... -timeout 2400s
gotestsum --format testname -- -p 16 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
13 changes: 6 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

tblNameMapping := make(map[string]string)
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
return syncResponse, nil
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
Expand Down Expand Up @@ -523,7 +523,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup
var numRecords int64

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
Expand All @@ -533,7 +532,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
pullPgRecords := func() {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords = int64(tmp)
numRecords := int64(tmp)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand All @@ -554,7 +553,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return fmt.Errorf("failed to pull records: %w", err)
}
numRecords = int64(recordBatch.NumRecords)
numRecords := int64(recordBatch.NumRecords)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))
Expand Down
24 changes: 14 additions & 10 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PostgresCDCSource struct {
ctx context.Context
replPool *pgxpool.Pool
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]string
TableNameMapping map[string]model.NameAndExclude
slot string
publication string
relationMessageMapping model.RelationMessageMapping
Expand All @@ -44,7 +44,7 @@ type PostgresCDCConfig struct {
Slot string
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
}

Expand Down Expand Up @@ -440,15 +440,15 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.Tuple, rel)
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -474,12 +474,12 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// create empty map of string to interface{}
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel)
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel)
newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand All @@ -488,7 +488,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
CheckPointID: int64(lsn),
OldItems: oldItems,
NewItems: newItems,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
}, nil
Expand All @@ -515,15 +515,15 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.OldTuple, rel)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -538,6 +538,7 @@ It takes a tuple and a relation message as input and returns
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
exclude map[string]struct{},
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
Expand All @@ -550,6 +551,9 @@ func (p *PostgresCDCSource) convertTupleToMap(

for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
if _, ok := exclude[colName]; ok {
continue
}
switch col.DataType {
case 'n': // null
val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
Expand Down Expand Up @@ -670,7 +674,7 @@ func (p *PostgresCDCSource) processRelationMessage(
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name,
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -206,7 +207,7 @@ func (c *PostgresConnector) createSlotAndPublication(
s *SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]string,
tableNameMapping map[string]model.NameAndExclude,
doInitialCopy bool,
) error {
/*
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,16 @@ func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.Set
return fmt.Errorf("error checking for replication slot and publication: %w", err)
}

tableNameMapping := make(map[string]model.NameAndExclude)
for k, v := range req.TableNameMapping {
serprex marked this conversation as resolved.
Show resolved Hide resolved
tableNameMapping[k] = model.NameAndExclude{
Name: v,
Exclude: make(map[string]struct{}, 0),
}
}
// Create the replication slot and publication
err = c.createSlotAndPublication(signal, exists,
slotName, publicationName, req.TableNameMapping, req.DoInitialCopy)
slotName, publicationName, tableNameMapping, req.DoInitialCopy)
if err != nil {
return fmt.Errorf("error creating replication slot and publication: %w", err)
}
Expand Down
Loading
Loading