Skip to content

Commit

Permalink
Postgres streaming using CopyFromFunc (#1204)
Browse files Browse the repository at this point in the history
In the Postgres connector, the sync side doesn't take advantage of
record streaming over Go channels, loading all records into an array and
then processing when the channel is closed from the pull side. This PR
addresses that and now Postgres copies from a function invocation
instead of an array, therefore copying records while they're being read
from the channel.

This leads to lower memory utilization (no massive buffer) and improved
parallelism.

also changing some `Info` logs back to `Debug` to prevent spam

---------

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
heavycrystal and serprex authored Feb 5, 2024
1 parent 42cc3a0 commit 92b752d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 84 deletions.
6 changes: 3 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// log lsn and relation id for debugging
p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -583,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// log lsn and relation id for debugging
p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -626,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// log lsn and relation id for debugging
p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down
163 changes: 82 additions & 81 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,80 +257,88 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier))

records := make([][]interface{}, 0)
numRecords := 0
tableNameRowsMapping := make(map[string]uint32)

for record := range req.Records.GetRecords() {
switch typedRecord := record.(type) {
case *model.InsertRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err)
}

records = append(records, []interface{}{
uuid.New().String(),
time.Now().UnixNano(),
typedRecord.DestinationTableName,
itemsJSON,
0,
"{}",
req.SyncBatchID,
"",
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err)
}
oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err)
}

records = append(records, []interface{}{
uuid.New().String(),
time.Now().UnixNano(),
typedRecord.DestinationTableName,
newItemsJSON,
1,
oldItemsJSON,
req.SyncBatchID,
utils.KeysToString(typedRecord.UnchangedToastColumns),
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err)
streamReadFunc := func() ([]any, error) {
record, ok := <-req.Records.GetRecords()

if !ok {
return nil, nil
} else {
var row []any
switch typedRecord := record.(type) {
case *model.InsertRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err)
}

row = []any{
uuid.New().String(),
time.Now().UnixNano(),
typedRecord.DestinationTableName,
itemsJSON,
0,
"{}",
req.SyncBatchID,
"",
}
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err)
}
oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err)
}

row = []any{
uuid.New().String(),
time.Now().UnixNano(),
typedRecord.DestinationTableName,
newItemsJSON,
1,
oldItemsJSON,
req.SyncBatchID,
utils.KeysToString(typedRecord.UnchangedToastColumns),
}
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err)
}

row = []any{
uuid.New().String(),
time.Now().UnixNano(),
typedRecord.DestinationTableName,
itemsJSON,
2,
itemsJSON,
req.SyncBatchID,
"",
}
default:
return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord)
}

records = append(records, []interface{}{
uuid.New().String(),
time.Now().UnixNano(),
typedRecord.DestinationTableName,
itemsJSON,
2,
itemsJSON,
req.SyncBatchID,
"",
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
default:
return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord)
numRecords += 1
tableNameRowsMapping[record.GetDestinationTableName()] += 1
return row, nil
}
}

Expand All @@ -339,13 +347,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

if len(records) == 0 {
return &model.SyncResponse{
LastSyncedCheckpointID: 0,
NumRecordsSynced: 0,
}, nil
}

syncRecordsTx, err := c.conn.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("error starting transaction for syncing records: %w", err)
Expand All @@ -362,13 +363,13 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
"_peerdb_uid", "_peerdb_timestamp", "_peerdb_destination_table_name", "_peerdb_data",
"_peerdb_record_type", "_peerdb_match_data", "_peerdb_batch_id", "_peerdb_unchanged_toast_columns",
},
pgx.CopyFromRows(records))
pgx.CopyFromFunc(streamReadFunc))
if err != nil {
return nil, fmt.Errorf("error syncing records: %w", err)
}
if syncedRecordsCount != int64(len(records)) {
if syncedRecordsCount != int64(numRecords) {
return nil, fmt.Errorf("error syncing records: expected %d records to be synced, but %d were synced",
len(records), syncedRecordsCount)
numRecords, syncedRecordsCount)
}

c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY",
Expand All @@ -392,7 +393,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(len(records)),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: req.SyncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
Expand Down

0 comments on commit 92b752d

Please sign in to comment.