diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 1f749818a7..bf2a83f0c3 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -302,9 +302,14 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error { q.logger.Info("idling until new rows are detected") + waitActivityTimeout := 5 * time.Minute + if q.config.WaitBetweenBatchesSeconds > 300 { + timeGap := 60 + waitActivityTimeout = time.Duration(q.config.WaitBetweenBatchesSeconds+uint32(timeGap)) * time.Second + } ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years - HeartbeatTimeout: 5 * time.Minute, + HeartbeatTimeout: waitActivityTimeout, }) if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config,