Skip to content

Commit

Permalink
pgvalue-qrep-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 22, 2024
1 parent 349fee4 commit c138f2a
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 73 deletions.
22 changes: 8 additions & 14 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ func (c *PostgresConnector) GetQRepPartitions(
return nil, fmt.Errorf("failed to set transaction snapshot: %w", err)
}

// TODO re-enable locking of the watermark table.
// // lock the table while we get the partitions.
// lockQuery := fmt.Sprintf("LOCK %s IN EXCLUSIVE MODE", config.WatermarkTable)
// if _, err = tx.Exec(c.ctx, lockQuery); err != nil {
// // if we aren't able to lock, just log the error and continue
// log.Warnf("failed to lock table %s: %v", config.WatermarkTable, err)
// }

return c.getNumRowsPartitions(ctx, getPartitionsTx, config, last)
}

Expand Down Expand Up @@ -335,7 +327,7 @@ func corePullQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
sink QuerySinkWrite,
sink QuerySinkWriter,
) (int, error) {
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
Expand Down Expand Up @@ -416,7 +408,7 @@ func syncQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
sink QuerySinkRead,
sink QuerySinkReader,
) (int, error) {
dstTable, err := utils.ParseSchemaTable(config.DestinationTableIdentifier)
if err != nil {
Expand Down Expand Up @@ -642,14 +634,16 @@ func (c *PostgresConnector) PullXminRecordStream(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, int64, error) {
return pullXminRecordStream(c, ctx, config, partition, stream)
return pullXminRecordStream(c, ctx, config, partition, RecordStreamSink{
QRecordStream: stream,
})
}

func (c *PostgresConnector) PullXminPgRecordStream(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
pipe *io.PipeWriter,
pipe PgCopyWriter,
) (int, int64, error) {
return pullXminRecordStream(c, ctx, config, partition, pipe)
}
Expand All @@ -659,7 +653,7 @@ func pullXminRecordStream(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
sink QuerySinkWrite,
sink QuerySinkWriter,
) (int, int64, error) {
var currentSnapshotXmin int64
query := config.Query
Expand All @@ -675,7 +669,7 @@ func pullXminRecordStream(
var err error
var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.executeAndProcessQueryStreamGettingCurrentSnapshotXmin(
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
ctx,
sink,
query,
Expand Down
117 changes: 58 additions & 59 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package connpostgres
import (
"context"
"fmt"
"io"
"log/slog"

"github.com/jackc/pgx/v5"
Expand All @@ -25,45 +24,6 @@ type QRepQueryExecutor struct {
partitionID string
}

type QuerySinkWrite interface {
Close(error)
ExecuteQuery(context.Context, *QRepQueryExecutor, string, ...interface{}) (int, error)
ExecuteQueryWithTx(context.Context, *QRepQueryExecutor, pgx.Tx, string, ...interface{}) (int, error)
}

type QuerySinkRead interface {
GetColumnNames() []string
CopyInto(context.Context, pgx.Tx, pgx.Identifier) (int64, error)
}

type RecordStreamSink struct {
*model.QRecordStream
}

type PgCopyShared struct {
schema []pgconn.FieldDescription
schemaSet bool
schemaLatch chan struct{}
err error
}

type PgCopyWriter struct {
*io.PipeWriter
schema *PgCopyShared
}

type PgCopyReader struct {
*io.PipeReader
schema *PgCopyShared
}

func NewPgCopyPipe() (PgCopyReader, PgCopyWriter) {
read, write := io.Pipe()
schema := PgCopyShared{}
return PgCopyReader{PipeReader: read, schema: &schema},
PgCopyWriter{PipeWriter: write, schema: &schema}
}

func (c *PostgresConnector) NewQRepQueryExecutor(flowJobName string, partitionID string) *QRepQueryExecutor {
return c.NewQRepQueryExecutorSnapshot("", flowJobName, partitionID)
}
Expand Down Expand Up @@ -169,8 +129,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
return batch, nil
}

func processRowsStream(
qe *QRepQueryExecutor,
func (qe *QRepQueryExecutor) processRowsStream(
ctx context.Context,
cursorName string,
stream *model.QRecordStream,
Expand Down Expand Up @@ -206,8 +165,7 @@ func processRowsStream(
return numRows, nil
}

func processFetchedRows(
qe *QRepQueryExecutor,
func (qe *QRepQueryExecutor) processFetchedRows(
ctx context.Context,
query string,
tx pgx.Tx,
Expand All @@ -230,7 +188,7 @@ func processFetchedRows(
stream.SetSchema(qe.fieldDescriptionsToSchema(fieldDescriptions))
}

numRows, err := processRowsStream(qe, ctx, cursorName, stream, rows, fieldDescriptions)
numRows, err := qe.processRowsStream(ctx, cursorName, stream, rows, fieldDescriptions)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to process rows", slog.Any("error", err))
return 0, fmt.Errorf("failed to process rows: %w", err)
Expand Down Expand Up @@ -258,7 +216,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
// must wait on errors to close before returning to maintain qe.conn exclusion
go func() {
defer close(errors)
_, err := sink.ExecuteQuery(ctx, qe, query, args...)
_, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err))
errors <- err
Expand Down Expand Up @@ -286,17 +244,30 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
}
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
ctx context.Context,
sink QuerySinkWrite,
stream *model.QRecordStream,
query string,
args ...interface{},
) (*model.QRecordBatch, error)
) (int, error) {
qe.logger.Info("Executing and processing query stream", slog.String("query", query))
defer stream.Close(nil)

tx, err := qe.conn.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadOnly,
IsoLevel: pgx.RepeatableRead,
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

return qe.ExecuteAndProcessQueryStreamWithTx(ctx, tx, stream, query, args...)
}

func executeAndProcessQueryStream(
qe *QRepQueryExecutor,
func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(
ctx context.Context,
sink QuerySinkWrite,
sink QuerySinkWriter,
query string,
args ...interface{},
) (int, error) {
Expand All @@ -315,9 +286,38 @@ func executeAndProcessQueryStream(
return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
}

func (qe *QRepQueryExecutor) executeAndProcessQueryStreamGettingCurrentSnapshotXmin(
func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
ctx context.Context,
sink QuerySinkWrite,
stream *model.QRecordStream,
query string,
args ...interface{},
) (int, int64, error) {
var currentSnapshotXmin pgtype.Int8
qe.logger.Info("Executing and processing query stream", slog.String("query", query))
defer stream.Close(nil)

tx, err := qe.conn.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadOnly,
IsoLevel: pgx.RepeatableRead,
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, err
}

totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(ctx, tx, stream, query, args...)
return totalRecordsFetched, currentSnapshotXmin.Int64, err
}

func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
ctx context.Context,
sink QuerySinkWriter,
query string,
args ...interface{},
) (int, int64, error) {
Expand All @@ -344,10 +344,10 @@ func (qe *QRepQueryExecutor) executeAndProcessQueryStreamGettingCurrentSnapshotX
return totalRecordsFetched, currentSnapshotXmin.Int64, err
}

func (stream RecordStreamSink) ExecuteQueryWithTx(
func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
ctx context.Context,
qe *QRepQueryExecutor,
tx pgx.Tx,
stream *model.QRecordStream,
query string,
args ...interface{},
) (int, error) {
Expand Down Expand Up @@ -389,7 +389,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(

totalRecordsFetched := 0
for {
numRows, err := processFetchedRows(qe, ctx, query, tx, cursorName, fetchSize, stream.QRecordStream)
numRows, err := qe.processFetchedRows(ctx, query, tx, cursorName, fetchSize, stream)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to process fetched rows", slog.Any("error", err))
return 0, err
Expand All @@ -404,8 +404,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
}

qe.logger.Info("Committing transaction")
err = tx.Commit(ctx)
if err != nil {
if err := tx.Commit(ctx); err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
stream.Close(err)
Expand Down
Loading

0 comments on commit c138f2a

Please sign in to comment.