Skip to content

Commit

Permalink
for xmin return only 1 partition (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 25, 2023
1 parent deffa20 commit c060dd8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,12 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.WatermarkColumn == "xmin" {
// for xmin we ignore the wait between batches, as seq scan time is
// extremely slow.
waitBetweenBatches = 10 * time.Second
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down
48 changes: 48 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,48 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)

minVal, maxVal, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return nil, fmt.Errorf("failed to get min max values for xmin: %w", err)
}

// we know these are int64s so we can just cast them
minValInt := minVal.(int64)
maxValInt := maxVal.(int64)

// we will only return 1 partition for xmin:
// if there is no last partition, we will return a partition with the min and max values
// if there is a last partition, we will return a partition with the last partition's end value + 1 and the max value
if last != nil && last.Range != nil {
minValInt += 1
}

if minValInt > maxValInt {
// log and return an empty partition
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("xmin min value is greater than max value, returning empty partition")
return make([]*protos.QRepPartition, 0), nil
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("single xmin partition range: %v - %v", minValInt, maxValInt)

partition := &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: minValInt,
End: maxValInt,
},
},
},
}

return []*protos.QRepPartition{partition}, nil
}

whereClause := ""
Expand Down Expand Up @@ -201,6 +243,7 @@ func (c *PostgresConnector) getMinMaxValues(
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)
}

// Get the maximum value from the database
maxQuery := fmt.Sprintf("SELECT MAX(%[1]s) FROM %[2]s", quotedWatermarkColumn, config.WatermarkTable)
row := tx.QueryRow(c.ctx, maxQuery)
Expand Down Expand Up @@ -268,6 +311,11 @@ func (c *PostgresConnector) getMinMaxValues(

func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig,
last *protos.QRepPartition) (bool, error) {
// for xmin lets always assume there are updates
if config.WatermarkColumn == "xmin" {
return true, nil
}

tx, err := c.pool.Begin(c.ctx)
if err != nil {
return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err)
Expand Down

0 comments on commit c060dd8

Please sign in to comment.