diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index c3eefadfb5..1d847a3a89 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "strconv" "strings" "time" @@ -251,6 +252,10 @@ func (h *FlowRequestHandler) CreateQRepFlow( var workflowFn interface{} if cfg.SourcePeer.Type == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" { state.LastPartition.PartitionId = "" + if cfg.WaitBetweenBatchesSeconds > 0 { + log.Info("Starting XMIN from :", cfg.WaitBetweenBatchesSeconds) + state.LastPartition.PartitionId = strconv.FormatUint(uint64(cfg.WaitBetweenBatchesSeconds), 10) + } workflowFn = peerflow.XminFlowWorkflow } else { workflowFn = peerflow.QRepFlowWorkflow