Skip to content

Commit

Permalink
PartitionId needs to be unique, so make it random uuid, move xid info…
Browse files Browse the repository at this point in the history
… into range
  • Loading branch information
serprex committed Dec 4, 2023
1 parent 8190a94 commit 469fc51
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
17 changes: 11 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -248,14 +249,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

state := peerflow.NewQRepFlowState()
watermarkColumnParts := strings.Split(strings.ToLower(cfg.WatermarkColumn), "::")
preColon, postColon, hasColon := strings.Cut(cfg.WatermarkColumn, "::")
var workflowFn interface{}
if cfg.SourcePeer.Type == protos.DBType_POSTGRES &&
watermarkColumnParts[0] == "xmin" {
state.LastPartition.PartitionId = ""
if len(watermarkColumnParts) == 2 {
txid := watermarkColumnParts[1]
state.LastPartition.PartitionId = txid
preColon == "xmin" {
state.LastPartition.PartitionId = uuid.New().String()
if hasColon {
// hack to facilitate migrating from existing xmin sync
txid, err := strconv.ParseInt(postColon, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}}
}

workflowFn = peerflow.XminFlowWorkflow
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"bytes"
"fmt"
"strconv"
"text/template"
"time"

Expand Down Expand Up @@ -556,7 +557,9 @@ func (c *PostgresConnector) PullXminRecordStream(
) (int, int64, error) {
var currentSnapshotXmin int64
query := config.Query
if partition.PartitionId != "" {
oldxid := ""
if partition.Range != nil {
oldxid = strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)
query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)"
}

Expand All @@ -567,8 +570,8 @@ func (c *PostgresConnector) PullXminRecordStream(
}

var numRecords int
if partition.PartitionId != "" {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, partition.PartitionId)
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, oldxid)
} else {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query)
}
Expand Down
6 changes: 4 additions & 2 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package peerflow

import (
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -239,7 +238,10 @@ func XminFlowWorkflow(
return fmt.Errorf("xmin replication failed: %w", err)
}

state.LastPartition = &protos.QRepPartition{PartitionId: strconv.FormatInt(lastPartition&0xffffffff, 10)}
state.LastPartition = &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
}

if config.InitialCopyOnly {
q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName)
Expand Down

0 comments on commit 469fc51

Please sign in to comment.