Skip to content

Commit

Permalink
Column Exclusion (#601)
Browse files Browse the repository at this point in the history
If a mirror is created with exclusions omit those columns from schema of target table

Syntax is in `TABLE MAPPING` of `CREATE MIRROR`, `exclude:[col1,col2]`

Ignores column appearing in schema changes too

Fixes #428
  • Loading branch information
serprex authored and iskakaushik committed Nov 17, 2023
1 parent 02a190f commit 82c3fea
Show file tree
Hide file tree
Showing 23 changed files with 921 additions and 1,991 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ jobs:
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
run: |
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
docker restart pg_cdc
working-directory: ./flow
env:
Expand All @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 32 ./... -timeout 2400s
gotestsum --format testname -- -p 16 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
13 changes: 6 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

tblNameMapping := make(map[string]string)
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
return syncResponse, nil
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
Expand Down Expand Up @@ -523,7 +523,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup
var numRecords int64

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
Expand All @@ -533,7 +532,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
pullPgRecords := func() {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords = int64(tmp)
numRecords := int64(tmp)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand All @@ -554,7 +553,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return fmt.Errorf("failed to pull records: %w", err)
}
numRecords = int64(recordBatch.NumRecords)
numRecords := int64(recordBatch.NumRecords)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))
Expand Down
24 changes: 14 additions & 10 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PostgresCDCSource struct {
ctx context.Context
replPool *pgxpool.Pool
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]string
TableNameMapping map[string]model.NameAndExclude
slot string
publication string
relationMessageMapping model.RelationMessageMapping
Expand All @@ -44,7 +44,7 @@ type PostgresCDCConfig struct {
Slot string
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
}

Expand Down Expand Up @@ -451,15 +451,15 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.Tuple, rel)
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -485,12 +485,12 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// create empty map of string to interface{}
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel)
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel)
newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand All @@ -499,7 +499,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
CheckPointID: int64(lsn),
OldItems: oldItems,
NewItems: newItems,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
}, nil
Expand All @@ -526,15 +526,15 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.OldTuple, rel)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
DestinationTableName: p.TableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -549,6 +549,7 @@ It takes a tuple and a relation message as input and returns
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
exclude map[string]struct{},
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
Expand All @@ -561,6 +562,9 @@ func (p *PostgresCDCSource) convertTupleToMap(

for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
if _, ok := exclude[colName]; ok {
continue
}
switch col.DataType {
case 'n': // null
val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
Expand Down Expand Up @@ -681,7 +685,7 @@ func (p *PostgresCDCSource) processRelationMessage(
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name,
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -206,7 +207,7 @@ func (c *PostgresConnector) createSlotAndPublication(
s *SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]string,
tableNameMapping map[string]model.NameAndExclude,
doInitialCopy bool,
) error {
/*
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,16 @@ func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.Set
return fmt.Errorf("error checking for replication slot and publication: %w", err)
}

tableNameMapping := make(map[string]model.NameAndExclude)
for k, v := range req.TableNameMapping {
tableNameMapping[k] = model.NameAndExclude{
Name: v,
Exclude: make(map[string]struct{}, 0),
}
}
// Create the replication slot and publication
err = c.createSlotAndPublication(signal, exists,
slotName, publicationName, req.TableNameMapping, req.DoInitialCopy)
slotName, publicationName, tableNameMapping, req.DoInitialCopy)
if err != nil {
return fmt.Errorf("error creating replication slot and publication: %w", err)
}
Expand Down
Loading

0 comments on commit 82c3fea

Please sign in to comment.