From 92b752d7c20c5e3dd2b744016620f74e298a74ef Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:54:45 +0530 Subject: [PATCH] Postgres streaming using CopyFromFunc (#1204) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In the Postgres connector, the sync side doesn't take advantage of record streaming over Go channels, loading all records into an array and then processing when the channel is closed from the pull side. This PR addresses that and now Postgres copies from a function invocation instead of an array, therefore copying records while they're being read from the channel. This leads to lower memory utilization (no massive buffer) and improved parallelism. also changing some `Info` logs back to `Debug` to prevent spam --------- Co-authored-by: Philip Dubé --- flow/connectors/postgres/cdc.go | 6 +- flow/connectors/postgres/postgres.go | 163 ++++++++++++++------------- 2 files changed, 85 insertions(+), 84 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 36d04d1ef7..148d0fdd7c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -548,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -583,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -626,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // log lsn and relation id for debugging - p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e778b7c5fa..dc51546070 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -257,80 +257,88 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier)) - records := make([][]interface{}, 0) + numRecords := 0 tableNameRowsMapping := make(map[string]uint32) - for record := range req.Records.GetRecords() { - switch typedRecord := record.(type) { - case *model.InsertRecord: - itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - records = append(records, []interface{}{ - uuid.New().String(), - time.Now().UnixNano(), - typedRecord.DestinationTableName, - itemsJSON, - 0, - "{}", - req.SyncBatchID, - "", - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - records = append(records, []interface{}{ - uuid.New().String(), - time.Now().UnixNano(), - typedRecord.DestinationTableName, - newItemsJSON, - 1, - oldItemsJSON, - req.SyncBatchID, - utils.KeysToString(typedRecord.UnchangedToastColumns), - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + streamReadFunc := func() ([]any, error) { + record, ok := <-req.Records.GetRecords() + + if !ok { + return nil, nil + } else { + var row []any + switch typedRecord := record.(type) { + case *model.InsertRecord: + itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + row = []any{ + uuid.New().String(), + time.Now().UnixNano(), + typedRecord.DestinationTableName, + itemsJSON, + 0, + "{}", + req.SyncBatchID, + "", + } + case *model.UpdateRecord: + newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + row = []any{ + uuid.New().String(), + time.Now().UnixNano(), + typedRecord.DestinationTableName, + newItemsJSON, + 1, + oldItemsJSON, + req.SyncBatchID, + utils.KeysToString(typedRecord.UnchangedToastColumns), + } + case *model.DeleteRecord: + itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + row = []any{ + uuid.New().String(), + time.Now().UnixNano(), + typedRecord.DestinationTableName, + itemsJSON, + 2, + itemsJSON, + req.SyncBatchID, + "", + } + default: + return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) } - records = append(records, []interface{}{ - uuid.New().String(), - time.Now().UnixNano(), - typedRecord.DestinationTableName, - itemsJSON, - 2, - itemsJSON, - req.SyncBatchID, - "", - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) + numRecords += 1 + tableNameRowsMapping[record.GetDestinationTableName()] += 1 + return row, nil } } @@ -339,13 +347,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, fmt.Errorf("failed to sync schema changes: %w", err) } - if len(records) == 0 { - return &model.SyncResponse{ - LastSyncedCheckpointID: 0, - NumRecordsSynced: 0, - }, nil - } - syncRecordsTx, err := c.conn.Begin(c.ctx) if err != nil { return nil, fmt.Errorf("error starting transaction for syncing records: %w", err) @@ -362,13 +363,13 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S "_peerdb_uid", "_peerdb_timestamp", "_peerdb_destination_table_name", "_peerdb_data", "_peerdb_record_type", "_peerdb_match_data", "_peerdb_batch_id", "_peerdb_unchanged_toast_columns", }, - pgx.CopyFromRows(records)) + pgx.CopyFromFunc(streamReadFunc)) if err != nil { return nil, fmt.Errorf("error syncing records: %w", err) } - if syncedRecordsCount != int64(len(records)) { + if syncedRecordsCount != int64(numRecords) { return nil, fmt.Errorf("error syncing records: expected %d records to be synced, but %d were synced", - len(records), syncedRecordsCount) + numRecords, syncedRecordsCount) } c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", @@ -392,7 +393,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, - NumRecordsSynced: int64(len(records)), + NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: req.SyncBatchID, TableNameRowsMapping: tableNameRowsMapping, TableSchemaDeltas: req.Records.SchemaDeltas,