diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7b5a7a173c..4084a19c22 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -668,6 +668,12 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second } + if config.WatermarkColumn == "xmin" { + // for xmin we ignore the wait between batches, as seq scan time is + // extremely slow. + waitBetweenBatches = 10 * time.Second + } + srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get qrep source connector: %w", err) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 56d9cddcf7..69f2957a4b 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -101,7 +101,6 @@ func (c *PostgresConnector) getNumRowsPartitions( // if there is a last partition, we will return a partition with the last partition's end value + 1 and the max value if last != nil && last.Range != nil { minValInt += 1 - minVal = minValInt } if minValInt > maxValInt { @@ -114,15 +113,15 @@ func (c *PostgresConnector) getNumRowsPartitions( log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Infof("single xmin partition range: %v - %v", minVal, maxVal) + }).Infof("single xmin partition range: %v - %v", minValInt, maxValInt) partition := &protos.QRepPartition{ PartitionId: uuid.New().String(), Range: &protos.PartitionRange{ Range: &protos.PartitionRange_IntRange{ IntRange: &protos.IntPartitionRange{ - Start: minVal.(int64), - End: maxVal.(int64), + Start: minValInt, + End: maxValInt, }, }, },