From d4ce6cfd8c3df279c7c2ce62f85df845b3e065e9 Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Mon, 3 Feb 2025 12:49:29 -0800 Subject: [PATCH] update put route when flattening result index (#1409) * update put route when flattening result index Signed-off-by: Jackie Han * clean up unused code Signed-off-by: Jackie Han --------- Signed-off-by: Jackie Han --- .../AbstractTimeSeriesActionHandler.java | 105 ++++++++---------- .../timeseries/util/RestHandlerUtils.java | 20 ---- .../ad/rest/AnomalyDetectorRestApiIT.java | 30 +++-- 3 files changed, 64 insertions(+), 91 deletions(-) diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index d5d6f76eb..be16e2e0a 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.time.Clock; import java.util.*; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; @@ -467,18 +468,10 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener listener private void handlePostRequest(boolean indexingDryRun, ActionListener listener) { createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> { if (shouldHandleFlattening(indexingDryRun)) { - String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse); String flattenedResultIndexAlias = config.getFlattenResultIndexAlias(); - timeSeriesIndices - .initFlattenedResultIndex( - flattenedResultIndexAlias, - ActionListener - .wrap( - initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse), - listener::onFailure - ) - ); + initAndSetupPipeline(flattenedResultIndexAlias, listener, l -> l.onResponse(createConfigResponse)); + } else { listener.onResponse(createConfigResponse); } @@ -491,23 +484,25 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) { return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping); } - protected void setupIngestPipeline( - String flattenedResultIndexAlias, - String configId, - ActionListener listener, - T createConfigResponse - ) { + private void initAndSetupPipeline(String flattenedResultIndexAlias, ActionListener listener, Consumer> onSuccess) { + timeSeriesIndices + .initFlattenedResultIndex( + flattenedResultIndexAlias, + ActionListener + .wrap(initResponse -> setupIngestPipeline(flattenedResultIndexAlias, listener, onSuccess), listener::onFailure) + ); + } + + private void setupIngestPipeline(String flattenedResultIndexAlias, ActionListener listener, Consumer> onSuccess) { String pipelineId = config.getFlattenResultIndexIngestPipelineName(); try { BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON); client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> { logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId); - bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse); - + bindIngestPipelineWithFlattenedResultIndex(pipelineId, flattenedResultIndexAlias, listener, onSuccess); }, exception -> { logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception); listener.onFailure(exception); @@ -519,6 +514,23 @@ protected void setupIngestPipeline( } } + private void bindIngestPipelineWithFlattenedResultIndex( + String pipelineId, + String flattenedResultIndexAlias, + ActionListener listener, + Consumer> onSuccess + ) { + UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId); + + client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> { + logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId); + onSuccess.accept(listener); + }, exception -> { + logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception); + listener.onFailure(exception); + })); + } + private BytesReference createPipelineDefinition(String indexName) throws IOException { XContentBuilder pipelineBuilder = XContentFactory.jsonBuilder(); pipelineBuilder.startObject(); @@ -544,11 +556,7 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep return BytesReference.bytes(pipelineBuilder); } - private UpdateSettingsRequest buildUpdateSettingsRequest( - String flattenedResultIndexAlias, - String defaultPipelineName, - String configId - ) { + private UpdateSettingsRequest buildUpdateSettingsRequest(String flattenedResultIndexAlias, String defaultPipelineName) { UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(); updateSettingsRequest.indices(flattenedResultIndexAlias); @@ -560,24 +568,6 @@ private UpdateSettingsRequest buildUpdateSettingsRequest( return updateSettingsRequest; } - protected void bindIngestPipelineWithFlattenedResultIndex( - String pipelineId, - String configId, - String flattenedResultIndexAlias, - ActionListener listener, - T createConfigResponse - ) { - UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId); - - client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> { - logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId); - listener.onResponse(createConfigResponse); - }, exception -> { - logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception); - listener.onFailure(exception); - })); - } - protected void updateConfig(String id, boolean indexingDryRun, ActionListener listener) { GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, id); client @@ -619,18 +609,6 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S ); return; } - if (!existingConfig.getFlattenResultIndexMapping() - && config.getFlattenResultIndexMapping() - && existingConfig.getCustomResultIndexOrAlias() != null) { - // customers can choose to use a flattened result index for newly created detectors and disable it for those detectors. - // however, since enabling the flattened result index creates additional resources and due to bwc concerns, - // we do not allow customers to enable this feature for existing running detectors. - listener - .onFailure( - new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST) - ); - return; - } } else { if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields()) || !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) { @@ -650,6 +628,19 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S listener::onFailure ); + } else if (!existingConfig.getFlattenResultIndexMapping() + && config.getFlattenResultIndexMapping() + && existingConfig.getCustomResultIndexOrAlias() != null) { + confirmBatchRunningListener = ActionListener + .wrap( + r -> initAndSetupPipeline( + config.getFlattenResultIndexAlias(), + listener, + l -> searchConfigInputIndices(id, indexingDryRun, l) + ), + listener::onFailure + ); + } else { confirmBatchRunningListener = ActionListener .wrap( @@ -673,11 +664,7 @@ private void unbindIngestPipelineWithFlattenedResultIndex( boolean indexingDryRun ) { // The pipeline name _none specifies that the index does not have an ingest pipeline. - UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest( - existingConfig.getFlattenResultIndexAlias(), - "_none", - existingConfig.getId() - ); + UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(existingConfig.getFlattenResultIndexAlias(), "_none"); client .admin() .indices() diff --git a/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java b/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java index 4d9924c60..ec45ff02c 100644 --- a/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java +++ b/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java @@ -26,20 +26,17 @@ import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.ShardSearchFailure; -import org.opensearch.ad.transport.IndexAnomalyDetectorResponse; import org.opensearch.common.Nullable; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.forecast.transport.IndexForecasterResponse; import org.opensearch.index.IndexNotFoundException; import org.opensearch.indices.InvalidIndexNameException; import org.opensearch.rest.RestChannel; @@ -293,21 +290,4 @@ public static Entity buildEntity(RestRequest request, String detectorId) throws // not a valid profile request with correct entity information return null; } - - public static String getConfigIdFromIndexResponse(ActionResponse actionResponse) { - String configId; - if (actionResponse instanceof IndexAnomalyDetectorResponse) { - IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) actionResponse; - configId = response.getId(); - logger.info("Handling IndexAnomalyDetectorResponse for configId: {}", configId); - } else if (actionResponse instanceof IndexForecasterResponse) { - IndexForecasterResponse response = (IndexForecasterResponse) actionResponse; - configId = response.getId(); - logger.info("Handling IndexForecasterResponse for configId: {}", configId); - } else { - throw new IllegalStateException("Unexpected response type: " + actionResponse.getClass().getName()); - } - return configId; - } - } diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index 0ae223c7b..1261f8358 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -445,19 +445,25 @@ public void testUpdateAnomalyDetectorFlattenResultIndexField() throws Exception detector.getLastBreakingUIChangeTime() ); - Exception ex = expectThrows( - ResponseException.class, - () -> TestHelpers - .makeRequest( - client(), - "PUT", - TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true", - ImmutableMap.of(), - TestHelpers.toHttpEntity(newDetector), - null - ) + Response updateResponse = TestHelpers + .makeRequest( + client(), + "PUT", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true", + ImmutableMap.of(), + TestHelpers.toHttpEntity(newDetector), + null + ); + + assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse)); + String expectedPipelineId = "flatten_result_index_ingest_pipeline_" + detector.getName().toLowerCase(Locale.ROOT); + String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId); + Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null); + assertEquals( + "Expected 200 response but got: " + getPipelineResponse.getStatusLine().getStatusCode(), + 200, + getPipelineResponse.getStatusLine().getStatusCode() ); - assertThat(ex.getMessage(), containsString(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX)); } public void testCreateAnomalyDetector() throws Exception {