Skip to content

Commit

Permalink
fix once
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 25, 2023
1 parent 651137a commit da16389
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,27 @@ func (c *PostgresConnector) getNumRowsPartitions(
return nil, fmt.Errorf("failed to get min max values for xmin: %w", err)
}

// we know these are int64s so we can just cast them
minValInt := maxVal.(int64)
maxValInt := maxVal.(int64)

// we will only return 1 partition for xmin:
// if there is no last partition, we will return a partition with the min and max values
// if there is a last partition, we will return a partition with the last partition's end value + 1 and the max value
if last != nil && last.Range != nil {
mvInt, ok := maxVal.(int64)
if !ok {
return nil, fmt.Errorf("expected max value to be int64, got %T", maxVal)
}
minVal = mvInt + 1
minVal = minValInt + 1
}

if minValInt > maxValInt {
// log and return an empty partition
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("xmin min value is greater than max value, returning empty partition")
return make([]*protos.QRepPartition, 0), nil
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"min": minVal,
"max": maxVal,
}).Infof("single xmin partition range: %v - %v", minVal, maxVal)

partition := &protos.QRepPartition{
Expand Down

0 comments on commit da16389

Please sign in to comment.