Skip to content

Commit

Permalink
update put route when flattening result index (#1409)
Browse files Browse the repository at this point in the history
* update put route when flattening result index

Signed-off-by: Jackie Han <[email protected]>

* clean up unused code

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang authored Feb 3, 2025
1 parent 70eb089 commit d4ce6cf
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.time.Clock;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -467,18 +468,10 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener
private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun)) {
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();

timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener
.wrap(
initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse),
listener::onFailure
)
);
initAndSetupPipeline(flattenedResultIndexAlias, listener, l -> l.onResponse(createConfigResponse));

} else {
listener.onResponse(createConfigResponse);
}
Expand All @@ -491,23 +484,25 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) {
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
}

protected void setupIngestPipeline(
String flattenedResultIndexAlias,
String configId,
ActionListener<T> listener,
T createConfigResponse
) {
private void initAndSetupPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, Consumer<ActionListener<T>> onSuccess) {
timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener
.wrap(initResponse -> setupIngestPipeline(flattenedResultIndexAlias, listener, onSuccess), listener::onFailure)
);
}

private void setupIngestPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, Consumer<ActionListener<T>> onSuccess) {
String pipelineId = config.getFlattenResultIndexIngestPipelineName();

try {
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);

bindIngestPipelineWithFlattenedResultIndex(pipelineId, flattenedResultIndexAlias, listener, onSuccess);
}, exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
listener.onFailure(exception);
Expand All @@ -519,6 +514,23 @@ protected void setupIngestPipeline(
}
}

private void bindIngestPipelineWithFlattenedResultIndex(
String pipelineId,
String flattenedResultIndexAlias,
ActionListener<T> listener,
Consumer<ActionListener<T>> onSuccess
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
onSuccess.accept(listener);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
}));
}

private BytesReference createPipelineDefinition(String indexName) throws IOException {
XContentBuilder pipelineBuilder = XContentFactory.jsonBuilder();
pipelineBuilder.startObject();
Expand All @@ -544,11 +556,7 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
return BytesReference.bytes(pipelineBuilder);
}

private UpdateSettingsRequest buildUpdateSettingsRequest(
String flattenedResultIndexAlias,
String defaultPipelineName,
String configId
) {
private UpdateSettingsRequest buildUpdateSettingsRequest(String flattenedResultIndexAlias, String defaultPipelineName) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
updateSettingsRequest.indices(flattenedResultIndexAlias);

Expand All @@ -560,24 +568,6 @@ private UpdateSettingsRequest buildUpdateSettingsRequest(
return updateSettingsRequest;
}

protected void bindIngestPipelineWithFlattenedResultIndex(
String pipelineId,
String configId,
String flattenedResultIndexAlias,
ActionListener<T> listener,
T createConfigResponse
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
listener.onResponse(createConfigResponse);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
}));
}

protected void updateConfig(String id, boolean indexingDryRun, ActionListener<T> listener) {
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, id);
client
Expand Down Expand Up @@ -619,18 +609,6 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
// customers can choose to use a flattened result index for newly created detectors and disable it for those detectors.
// however, since enabling the flattened result index creates additional resources and due to bwc concerns,
// we do not allow customers to enable this feature for existing running detectors.
listener
.onFailure(
new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST)
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
Expand All @@ -650,6 +628,19 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
listener::onFailure
);

} else if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
confirmBatchRunningListener = ActionListener
.wrap(
r -> initAndSetupPipeline(
config.getFlattenResultIndexAlias(),
listener,
l -> searchConfigInputIndices(id, indexingDryRun, l)
),
listener::onFailure
);

} else {
confirmBatchRunningListener = ActionListener
.wrap(
Expand All @@ -673,11 +664,7 @@ private void unbindIngestPipelineWithFlattenedResultIndex(
boolean indexingDryRun
) {
// The pipeline name _none specifies that the index does not have an ingest pipeline.
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(
existingConfig.getFlattenResultIndexAlias(),
"_none",
existingConfig.getId()
);
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(existingConfig.getFlattenResultIndexAlias(), "_none");
client
.admin()
.indices()
Expand Down
20 changes: 0 additions & 20 deletions src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.forecast.transport.IndexForecasterResponse;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.rest.RestChannel;
Expand Down Expand Up @@ -293,21 +290,4 @@ public static Entity buildEntity(RestRequest request, String detectorId) throws
// not a valid profile request with correct entity information
return null;
}

public static String getConfigIdFromIndexResponse(ActionResponse actionResponse) {
String configId;
if (actionResponse instanceof IndexAnomalyDetectorResponse) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) actionResponse;
configId = response.getId();
logger.info("Handling IndexAnomalyDetectorResponse for configId: {}", configId);
} else if (actionResponse instanceof IndexForecasterResponse) {
IndexForecasterResponse response = (IndexForecasterResponse) actionResponse;
configId = response.getId();
logger.info("Handling IndexForecasterResponse for configId: {}", configId);
} else {
throw new IllegalStateException("Unexpected response type: " + actionResponse.getClass().getName());
}
return configId;
}

}
30 changes: 18 additions & 12 deletions src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,19 +445,25 @@ public void testUpdateAnomalyDetectorFlattenResultIndexField() throws Exception
detector.getLastBreakingUIChangeTime()
);

Exception ex = expectThrows(
ResponseException.class,
() -> TestHelpers
.makeRequest(
client(),
"PUT",
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
ImmutableMap.of(),
TestHelpers.toHttpEntity(newDetector),
null
)
Response updateResponse = TestHelpers
.makeRequest(
client(),
"PUT",
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
ImmutableMap.of(),
TestHelpers.toHttpEntity(newDetector),
null
);

assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse));
String expectedPipelineId = "flatten_result_index_ingest_pipeline_" + detector.getName().toLowerCase(Locale.ROOT);
String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId);
Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null);
assertEquals(
"Expected 200 response but got: " + getPipelineResponse.getStatusLine().getStatusCode(),
200,
getPipelineResponse.getStatusLine().getStatusCode()
);
assertThat(ex.getMessage(), containsString(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX));
}

public void testCreateAnomalyDetector() throws Exception {
Expand Down

0 comments on commit d4ce6cf

Please sign in to comment.