Skip to content

Commit

Permalink
for xmin return only 1 partition
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 25, 2023
1 parent deffa20 commit d1ff2c2
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,42 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)

minVal, maxVal, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return nil, fmt.Errorf("failed to get min max values for xmin: %w", err)
}

// we will only return 1 partition for xmin:
// if there is no last partition, we will return a partition with the min and max values
// if there is a last partition, we will return a partition with the last partition's end value + 1 and the max value
if last != nil && last.Range != nil {
mvInt, ok := maxVal.(int64)
if !ok {
return nil, fmt.Errorf("expected max value to be int64, got %T", maxVal)
}
minVal = mvInt + 1
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"min": minVal,
"max": maxVal,
}).Infof("single xmin partition range: %v - %v", minVal, maxVal)

partition := &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: minVal.(int64),
End: maxVal.(int64),
},
},
},
}

return []*protos.QRepPartition{partition}, nil
}

whereClause := ""
Expand Down Expand Up @@ -201,6 +237,7 @@ func (c *PostgresConnector) getMinMaxValues(
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)
}

// Get the maximum value from the database
maxQuery := fmt.Sprintf("SELECT MAX(%[1]s) FROM %[2]s", quotedWatermarkColumn, config.WatermarkTable)
row := tx.QueryRow(c.ctx, maxQuery)
Expand Down

0 comments on commit d1ff2c2

Please sign in to comment.