diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 36d04d1ef7..148d0fdd7c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -548,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -583,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -626,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // log lsn and relation id for debugging - p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e778b7c5fa..dc51546070 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -257,80 +257,88 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier)) - records := make([][]interface{}, 0) + numRecords := 0 tableNameRowsMapping := make(map[string]uint32) - for record := range req.Records.GetRecords() { - switch typedRecord := record.(type) { - case *model.InsertRecord: - itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - records = append(records, []interface{}{ - uuid.New().String(), - time.Now().UnixNano(), - typedRecord.DestinationTableName, - itemsJSON, - 0, - "{}", - req.SyncBatchID, - "", - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - records = append(records, []interface{}{ - uuid.New().String(), - time.Now().UnixNano(), - typedRecord.DestinationTableName, - newItemsJSON, - 1, - oldItemsJSON, - req.SyncBatchID, - utils.KeysToString(typedRecord.UnchangedToastColumns), - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ - UnnestColumns: map[string]struct{}{}, - HStoreAsJSON: false, - }) - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + streamReadFunc := func() ([]any, error) { + record, ok := <-req.Records.GetRecords() + + if !ok { + return nil, nil + } else { + var row []any + switch typedRecord := record.(type) { + case *model.InsertRecord: + itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + row = []any{ + uuid.New().String(), + time.Now().UnixNano(), + typedRecord.DestinationTableName, + itemsJSON, + 0, + "{}", + req.SyncBatchID, + "", + } + case *model.UpdateRecord: + newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + row = []any{ + uuid.New().String(), + time.Now().UnixNano(), + typedRecord.DestinationTableName, + newItemsJSON, + 1, + oldItemsJSON, + req.SyncBatchID, + utils.KeysToString(typedRecord.UnchangedToastColumns), + } + case *model.DeleteRecord: + itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ + UnnestColumns: map[string]struct{}{}, + HStoreAsJSON: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + row = []any{ + uuid.New().String(), + time.Now().UnixNano(), + typedRecord.DestinationTableName, + itemsJSON, + 2, + itemsJSON, + req.SyncBatchID, + "", + } + default: + return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) } - records = append(records, []interface{}{ - uuid.New().String(), - time.Now().UnixNano(), - typedRecord.DestinationTableName, - itemsJSON, - 2, - itemsJSON, - req.SyncBatchID, - "", - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) + numRecords += 1 + tableNameRowsMapping[record.GetDestinationTableName()] += 1 + return row, nil } } @@ -339,13 +347,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, fmt.Errorf("failed to sync schema changes: %w", err) } - if len(records) == 0 { - return &model.SyncResponse{ - LastSyncedCheckpointID: 0, - NumRecordsSynced: 0, - }, nil - } - syncRecordsTx, err := c.conn.Begin(c.ctx) if err != nil { return nil, fmt.Errorf("error starting transaction for syncing records: %w", err) @@ -362,13 +363,13 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S "_peerdb_uid", "_peerdb_timestamp", "_peerdb_destination_table_name", "_peerdb_data", "_peerdb_record_type", "_peerdb_match_data", "_peerdb_batch_id", "_peerdb_unchanged_toast_columns", }, - pgx.CopyFromRows(records)) + pgx.CopyFromFunc(streamReadFunc)) if err != nil { return nil, fmt.Errorf("error syncing records: %w", err) } - if syncedRecordsCount != int64(len(records)) { + if syncedRecordsCount != int64(numRecords) { return nil, fmt.Errorf("error syncing records: expected %d records to be synced, but %d were synced", - len(records), syncedRecordsCount) + numRecords, syncedRecordsCount) } c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", @@ -392,7 +393,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, - NumRecordsSynced: int64(len(records)), + NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: req.SyncBatchID, TableNameRowsMapping: tableNameRowsMapping, TableSchemaDeltas: req.Records.SchemaDeltas,