Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 25, 2023
1 parent 43fbdfc commit 9a5a2ac
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
6 changes: 6 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,12 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.WatermarkColumn == "xmin" {
// for xmin we ignore the wait between batches, as seq scan time is
// extremely slow.
waitBetweenBatches = 10 * time.Second
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (c *PostgresConnector) getNumRowsPartitions(
// if there is a last partition, we will return a partition with the last partition's end value + 1 and the max value
if last != nil && last.Range != nil {
minValInt += 1
minVal = minValInt
}

if minValInt > maxValInt {
Expand All @@ -114,15 +113,15 @@ func (c *PostgresConnector) getNumRowsPartitions(

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("single xmin partition range: %v - %v", minVal, maxVal)
}).Infof("single xmin partition range: %v - %v", minValInt, maxValInt)

partition := &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: minVal.(int64),
End: maxVal.(int64),
Start: minValInt,
End: maxValInt,
},
},
},
Expand Down

0 comments on commit 9a5a2ac

Please sign in to comment.