Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added minimum for search.cancel_after_time_interval setting for rollups #1026

Merged
merged 6 commits into from
Nov 19, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import org.opensearch.core.action.ActionListener
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.SearchPhaseExecutionException
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.core.common.breaker.CircuitBreakingException
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.opensearchapi.retry
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.rollup.model.Rollup
Expand Down Expand Up @@ -44,10 +46,16 @@ class RollupSearchService(
@Volatile private var retrySearchPolicy =
BackoffPolicy.constantBackoff(ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), ROLLUP_SEARCH_BACKOFF_COUNT.get(settings))

@Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) { millis, count ->
retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count)
}

clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
cancelAfterTimeInterval = it
}
}

// TODO: Failed shouldn't process? How to recover from failed -> how does a user retry a failed rollup
Expand Down Expand Up @@ -103,7 +111,12 @@ class RollupSearchService(
"Composite search failed for rollup, retrying [#${retryCount - 1}] -" +
" reducing page size of composite aggregation from ${job.pageSize} to $pageSize"
)
search(job.copy(pageSize = pageSize).getRollupSearchRequest(metadata), listener)

val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata)
val timeoutMins = max(cancelAfterTimeInterval.minutes(), 10)
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(timeoutMins)

search(searchRequest, listener)
}
}
)
Expand Down