Skip to content

Commit

Permalink
Xmin replication to account for wrap-around (#747)
Browse files Browse the repository at this point in the history
QRep based xmin replication has a fault: xmin isn't monotonic & has
wraparound issues

Wraparound workaround: grab all records with `0 < age(xmin) <= age(last
snapshot xmin)` which'll include some records from before wraparound but
so it goes

Logic is copied from qrep code

---------

Co-authored-by: phdub <[email protected]>
Co-authored-by: Amogh-Bharadwaj <[email protected]>
  • Loading branch information
3 people authored Dec 5, 2023
1 parent 7798482 commit 53235aa
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 63 deletions.
100 changes: 94 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}, nil
}

// ReplicateQRepPartition replicates a QRepPartition from the source to the destination.
// ReplicateQRepPartitions spawns multiple ReplicateQRepPartition
func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
config *protos.QRepConfig,
partitions *protos.QRepPartitionBatch,
Expand Down Expand Up @@ -537,11 +537,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
"flowName": config.FlowJobName,
}).Errorf("failed to pull records: %v", err)
goroutineErr = err
}
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
if err != nil {
log.Errorf("%v", err)
goroutineErr = err
} else {
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
if err != nil {
log.Errorf("%v", err)
goroutineErr = err
}
}
wg.Done()
}
Expand Down Expand Up @@ -845,3 +846,90 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
}
return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery")
}

// ReplicateXminPartition replicates a XminPartition from the source to the destination.
func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
if err != nil {
return 0, fmt.Errorf("failed to update start time for partition: %w", err)
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("replicating xmin\n")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)

stream := model.NewQRecordStream(bufferSize)

var currentSnapshotXmin int64
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
var pullErr error
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(config, partition, stream)
if pullErr != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to pull records: %v", err)
return err
}
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords))
if err != nil {
log.Errorf("%v", err)
return err
}
return nil
})

shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return "syncing xmin."
})

defer func() {
shutdown <- true
}()

res, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return 0, fmt.Errorf("failed to sync records: %w", err)
}

if res == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("no records to push for xmin\n")
} else {
err := errGroup.Wait()
if err != nil {
return 0, err
}
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
if err != nil {
return 0, err
}

return currentSnapshotXmin, nil
}
27 changes: 20 additions & 7 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -248,13 +249,25 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

state := peerflow.NewQRepFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.QRepFlowWorkflow, // workflow function
cfg, // workflow input
state,
)
preColon, postColon, hasColon := strings.Cut(cfg.WatermarkColumn, "::")
var workflowFn interface{}
if cfg.SourcePeer.Type == protos.DBType_POSTGRES &&
preColon == "xmin" {
state.LastPartition.PartitionId = uuid.New().String()
if hasColon {
// hack to facilitate migrating from existing xmin sync
txid, err := strconv.ParseInt(postColon, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}}
}

workflowFn = peerflow.XminFlowWorkflow
} else {
workflowFn = peerflow.QRepFlowWorkflow
}
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
w.RegisterWorkflow(peerflow.XminFlowWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type QRepPullConnector interface {
// GetQRepPartitions returns the partitions for a given table that haven't been synced yet.
GetQRepPartitions(config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)

// GetQRepRecords returns the records for a given partition.
// PullQRepRecords returns the records for a given partition.
PullQRepRecords(config *protos.QRepConfig, partition *protos.QRepPartition) (*model.QRecordBatch, error)
}

Expand Down
83 changes: 36 additions & 47 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"bytes"
"fmt"
"strconv"
"text/template"
"time"

Expand Down Expand Up @@ -84,50 +85,6 @@ func (c *PostgresConnector) getNumRowsPartitions(
var err error
numRowsPerPartition := int64(config.NumRowsPerPartition)
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)
if config.WatermarkColumn == "xmin" {
minVal, maxVal, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return nil, fmt.Errorf("failed to get min max values for xmin: %w", err)
}

// we know these are int64s so we can just cast them
minValInt := minVal.(int64)
maxValInt := maxVal.(int64)

// we will only return 1 partition for xmin:
// if there is no last partition, we will return a partition with the min and max values
// if there is a last partition, we will return a partition with the last partition's
// end value + 1 and the max value
if last != nil && last.Range != nil {
minValInt += 1
}

if minValInt > maxValInt {
// log and return an empty partition
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("xmin min value is greater than max value, returning empty partition")
return make([]*protos.QRepPartition, 0), nil
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("single xmin partition range: %v - %v", minValInt, maxValInt)

partition := &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: minValInt,
End: maxValInt,
},
},
},
}

return []*protos.QRepPartition{partition}, nil
}

whereClause := ""
if last != nil && last.Range != nil {
Expand Down Expand Up @@ -244,9 +201,6 @@ func (c *PostgresConnector) getMinMaxValues(
) (interface{}, interface{}, error) {
var minValue, maxValue interface{}
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)
}

parsedWatermarkTable, err := utils.ParseSchemaTable(config.WatermarkTable)
if err != nil {
Expand Down Expand Up @@ -596,6 +550,41 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
return nil
}

func (c *PostgresConnector) PullXminRecordStream(
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, int64, error) {
var currentSnapshotXmin int64
query := config.Query
oldxid := ""
if partition.Range != nil {
oldxid = strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)
query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)"
}

executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, currentSnapshotXmin, err
}

var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, oldxid)
} else {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query)
}
if err != nil {
return 0, currentSnapshotXmin, err
}

log.WithFields(log.Fields{
"partition": partition.PartitionId,
}).Infof("pulled %d records for flow job %s", numRecords, config.FlowJobName)
return numRecords, currentSnapshotXmin, nil
}

func BuildQuery(query string, flowJobName string) (string, error) {
tmpl, err := template.New("query").Parse(query)
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,51 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(tx, stream, query, args...)
return totalRecordsFetched, err
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid(
stream *model.QRecordStream,
query string,
args ...interface{},
) (int, int64, error) {
var currentSnapshotXmin int64
log.WithFields(log.Fields{
"flowName": qe.flowJobName,
"partitionID": qe.partitionID,
}).Infof("Executing and processing query stream '%s'", query)
defer close(stream.Records)

tx, err := qe.pool.BeginTx(qe.ctx, pgx.TxOptions{
AccessMode: pgx.ReadOnly,
IsoLevel: pgx.RepeatableRead,
})
if err != nil {
log.WithFields(log.Fields{
"flowName": qe.flowJobName,
"partitionID": qe.partitionID,
}).Errorf("[pg_query_executor] failed to begin transaction: %v", err)
return 0, currentSnapshotXmin, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

err = tx.QueryRow(qe.ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
return 0, currentSnapshotXmin, err
}

totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(tx, stream, query, args...)
return totalRecordsFetched, currentSnapshotXmin, err
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
tx pgx.Tx,
stream *model.QRecordStream,
query string,
args ...interface{},
) (int, error) {
var err error

defer func() {
err := tx.Rollback(qe.ctx)
if err != nil && err != pgx.ErrTxClosed {
Expand Down Expand Up @@ -397,6 +442,9 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
qe.recordHeartbeat("#%d fetched %d rows", numFetchOpsComplete, numRows)
}

log.WithFields(log.Fields{
"flowName": qe.flowJobName,
}).Info("Committing transaction")
err = tx.Commit(qe.ctx)
if err != nil {
stream.Records <- &model.QRecordOrError{
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s",
s.pgSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
Expand Down
Loading

0 comments on commit 53235aa

Please sign in to comment.