Skip to content

Commit

Permalink
use snapshot xmin not current txid
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 4, 2023
1 parent 55731f3 commit 8190a94
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
6 changes: 3 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,12 +878,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,

stream := model.NewQRecordStream(bufferSize)

var currentTxid int64
var currentSnapshotXmin int64
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
var pullErr error
var numRecords int
numRecords, currentTxid, pullErr = pgConn.PullXminRecordStream(config, partition, stream)
numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(config, partition, stream)
if pullErr != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand Down Expand Up @@ -930,5 +930,5 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return 0, err
}

return currentTxid, nil
return currentSnapshotXmin, nil
}
15 changes: 6 additions & 9 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,35 +554,32 @@ func (c *PostgresConnector) PullXminRecordStream(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, int64, error) {
var currentTxid int64
var currentSnapshotXmin int64
query := config.Query
// TODO if initial replication uses xmin with empty partition id, need to do full scan,
// but prevents parallelism without chunking from qrep code.
// Maybe strategy should be qrep initial load, then populate PartitionId for xmin to takeover.
if partition.PartitionId != "" {
query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)"
}

executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, currentTxid, err
return 0, currentSnapshotXmin, err
}

var numRecords int
if partition.PartitionId != "" {
numRecords, currentTxid, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, partition.PartitionId)
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, partition.PartitionId)
} else {
numRecords, currentTxid, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query)
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query)
}
if err != nil {
return 0, currentTxid, err
return 0, currentSnapshotXmin, err
}

log.WithFields(log.Fields{
"partition": partition.PartitionId,
}).Infof("pulled %d records for flow job %s", numRecords, config.FlowJobName)
return numRecords, currentTxid, nil
return numRecords, currentSnapshotXmin, nil
}

func BuildQuery(query string, flowJobName string) (string, error) {
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid(
query string,
args ...interface{},
) (int, int64, error) {
var currentTxid int64
var currentSnapshotXmin int64
log.WithFields(log.Fields{
"flowName": qe.flowJobName,
"partitionID": qe.partitionID,
Expand All @@ -346,16 +346,16 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid(
"flowName": qe.flowJobName,
"partitionID": qe.partitionID,
}).Errorf("[pg_query_executor] failed to begin transaction: %v", err)
return 0, currentTxid, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
return 0, currentSnapshotXmin, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

err = tx.QueryRow(qe.ctx, "select txid_current()").Scan(&currentTxid)
err = tx.QueryRow(qe.ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
return 0, currentTxid, err
return 0, currentSnapshotXmin, err
}

totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(tx, stream, query, args...)
return totalRecordsFetched, currentTxid, err
return totalRecordsFetched, currentSnapshotXmin, err
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
Expand Down

0 comments on commit 8190a94

Please sign in to comment.