From 814bd90244bfe70845228b51deea0c4f92038d2f Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 9 Oct 2023 17:10:13 -0400 Subject: [PATCH 1/2] [postgres] Copy to destination not staging --- flow/connectors/postgres/cdc.go | 2 + flow/connectors/postgres/qrep_sync_method.go | 102 ++++--------------- 2 files changed, 24 insertions(+), 80 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6bf823ab9b..1d19b21508 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -66,6 +66,8 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) ( if p.publication != "" { pubOpt := fmt.Sprintf("publication_names '%s'", p.publication) pluginArguments = append(pluginArguments, pubOpt) + } else { + return nil, fmt.Errorf("publication name is not set") } replicationOpts := pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments} diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index 1990dd95c5..d00f15c4a7 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -3,13 +3,11 @@ package connpostgres import ( "context" "fmt" - "strings" "time" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/encoding/protojson" @@ -35,38 +33,9 @@ func (s *QRepStagingTableSync) SyncQRepRecords( stream *model.QRecordStream, ) (int, error) { partitionID := partition.PartitionId - runID, err := util.RandomUInt64() - if err != nil { - return -1, fmt.Errorf("failed to generate random runID: %v", err) - } - startTime := time.Now() - pool := s.connector.pool - - // create a staging temporary table with the same schema as the destination table - stagingTable := fmt.Sprintf("_%d_staging", runID) - - // create the staging temporary table if not exists - tmpTableStmt := fmt.Sprintf( - `CREATE TEMP TABLE %s AS SELECT * FROM %s LIMIT 0;`, - stagingTable, - dstTableName.String(), - ) - _, err = pool.Exec(context.Background(), tmpTableStmt) - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - "partitionID": partitionID, - "destinationTable": dstTableName, - }).Errorf( - "failed to create staging temporary table %s, statement: '%s'. Error: %v", - stagingTable, - tmpTableStmt, - err, - ) - return 0, fmt.Errorf("failed to create staging temporary table %s: %w", stagingTable, err) - } + pool := s.connector.pool schema, err := stream.Schema() if err != nil { log.WithFields(log.Fields{ @@ -77,30 +46,13 @@ func (s *QRepStagingTableSync) SyncQRepRecords( return 0, fmt.Errorf("failed to get schema from stream: %w", err) } - // Step 2: Insert records into the staging table. - copySource := model.NewQRecordBatchCopyFromSource(stream) - - // Perform the COPY FROM operation - syncRecordsStartTime := time.Now() - syncedRows, err := pool.CopyFrom( - context.Background(), - pgx.Identifier{stagingTable}, - schema.GetColumnNames(), - copySource, - ) - - if err != nil { - return -1, fmt.Errorf("failed to copy records into staging temporary table: %v", err) - } - metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, syncedRows, time.Since(syncRecordsStartTime)) - // Second transaction - to handle rest of the processing - tx2, err := pool.Begin(context.Background()) + tx, err := pool.Begin(context.Background()) if err != nil { return 0, fmt.Errorf("failed to begin transaction: %v", err) } defer func() { - if err := tx2.Rollback(context.Background()); err != nil { + if err := tx.Rollback(context.Background()); err != nil { if err != pgx.ErrTxClosed { log.WithFields(log.Fields{ "flowName": flowJobName, @@ -111,33 +63,22 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } }() - colNames := schema.GetColumnNames() - // wrap the column names in double quotes to handle reserved keywords - for i, colName := range colNames { - colNames[i] = fmt.Sprintf("\"%s\"", colName) - } - colNamesStr := strings.Join(colNames, ", ") - log.WithFields(log.Fields{ - "flowName": flowJobName, - "partitionID": partitionID, - }).Infof("Obtained column names and quoted them in QRep sync") - insertFromStagingStmt := fmt.Sprintf( - "INSERT INTO %s (%s) SELECT %s FROM %s", - dstTableName.String(), - colNamesStr, - colNamesStr, - stagingTable, + // Step 2: Insert records into the destination table. + copySource := model.NewQRecordBatchCopyFromSource(stream) + + // Perform the COPY FROM operation + syncRecordsStartTime := time.Now() + syncedRows, err := tx.CopyFrom( + context.Background(), + pgx.Identifier{dstTableName.String()}, + schema.GetColumnNames(), + copySource, ) - _, err = tx2.Exec(context.Background(), insertFromStagingStmt) if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - "partitionID": partitionID, - "destinationTable": dstTableName, - }).Errorf("failed to execute statement '%s': %v", insertFromStagingStmt, err) - return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) + return -1, fmt.Errorf("failed to copy records into destination table: %v", err) } + metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, syncedRows, time.Since(syncRecordsStartTime)) // marshal the partition to json using protojson pbytes, err := protojson.Marshal(partition) @@ -155,7 +96,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( "partitionID": partitionID, "destinationTable": dstTableName, }).Infof("Executing transaction inside Qrep sync") - rows, err := tx2.Exec( + rows, err := tx.Exec( context.Background(), insertMetadataStmt, flowJobName, @@ -167,6 +108,12 @@ func (s *QRepStagingTableSync) SyncQRepRecords( if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) } + + err = tx.Commit(context.Background()) + if err != nil { + return -1, fmt.Errorf("failed to commit transaction: %v", err) + } + totalRecordsAtTarget, err := s.connector.getApproxTableCounts([]string{dstTableName.String()}) if err != nil { return -1, fmt.Errorf("failed to get total records at target: %v", err) @@ -174,11 +121,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords( metrics.LogQRepNormalizeMetrics(s.connector.ctx, flowJobName, rows.RowsAffected(), time.Since(normalizeRecordsStartTime), totalRecordsAtTarget) - err = tx2.Commit(context.Background()) - if err != nil { - return -1, fmt.Errorf("failed to commit transaction: %v", err) - } - numRowsInserted := copySource.NumRecords() log.WithFields(log.Fields{ "flowName": flowJobName, From 9943f267ce30a669a800c7a0a58338f3e5c233b5 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 9 Oct 2023 17:56:46 -0400 Subject: [PATCH 2/2] identifier separation --- flow/connectors/postgres/qrep_sync_method.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index d00f15c4a7..3f6047d9f6 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -70,7 +70,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( syncRecordsStartTime := time.Now() syncedRows, err := tx.CopyFrom( context.Background(), - pgx.Identifier{dstTableName.String()}, + pgx.Identifier{dstTableName.Schema, dstTableName.Table}, schema.GetColumnNames(), copySource, )