Skip to content

Commit

Permalink
add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jan 28, 2025
1 parent edee31c commit 9b371ba
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1020,36 +1020,39 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
* creates flattened result index
* @param flattenedResultIndexAlias the flattened result index alias
* @param actionListener the action listener
* @throws IOException
*/
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener)
throws IOException {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener) {
try {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);
CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);

if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}
if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}

choosePrimaryShards(request, false);
choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
} catch (Exception e) {
logger.error("Error while initializing flattened result index: {}", flattenedResultIndexAlias, e);
actionListener.onFailure(e);
}
}

public String getFlattenedResultIndexAlias(String indexOrAliasName, String configId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -456,16 +455,14 @@ protected void prepareConfigIndexing(boolean indexingDryRun, ActionListener<T> l

private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener) {
handler.confirmJobRunning(clusterService, client, id, listener, () -> {
handleFlattenResultIndexMappingUpdate(listener);
updateConfig(id, indexingDryRun, listener);
}, xContentRegistry);
}

private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
String configId = response.getId();
if (shouldHandleFlattening(indexingDryRun)) {
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
String flattenedResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);
Expand All @@ -487,13 +484,10 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
}, listener::onFailure));
}

private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConfigResponse) {
private boolean shouldHandleFlattening(boolean indexingDryRun) {
Boolean flattenResultIndexMapping = config.getFlattenResultIndexMapping();

return !indexingDryRun
&& config.getCustomResultIndexOrAlias() != null
&& Boolean.TRUE.equals(flattenResultIndexMapping)
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
}

protected void setupIngestPipeline(String configId, ActionListener<T> listener) {
Expand Down Expand Up @@ -574,13 +568,13 @@ protected void updateResultIndexSetting(String pipelineId, String flattenedResul
}));
}

private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
private void handleFlattenResultIndexMappingUpdate(Config existingConfig, ActionListener<T> listener) {
if (config.getCustomResultIndexOrAlias() == null) {
return;
}
if (config.getFlattenResultIndexMapping() != null && config.getFlattenResultIndexMapping()) {
setupIngestPipeline(id, listener);
} else {
if (Boolean.TRUE.equals(existingConfig.getFlattenResultIndexMapping())
&& Boolean.FALSE.equals(config.getFlattenResultIndexMapping())
&& existingConfig.getCustomResultIndexOrAlias() != null) {
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {

Expand Down Expand Up @@ -611,6 +605,24 @@ public void onFailure(Exception e) {
}
}
});
} else if (Boolean.FALSE.equals(existingConfig.getFlattenResultIndexMapping())
&& Boolean.TRUE.equals(config.getFlattenResultIndexMapping())
&& existingConfig.getCustomResultIndexOrAlias() != null
) {
String flattenedResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), config.getId());
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener.wrap(initResponse -> setupIngestPipeline(config.getId(), ActionListener.wrap(pipelineResponse -> {
updateResultIndexSetting(
pipelineId,
flattenedResultIndexAlias,
ActionListener.wrap(updateResponse -> listener.onResponse(updateResponse), listener::onFailure)
);
}, listener::onFailure)), listener::onFailure)
);
}
}

Expand Down Expand Up @@ -661,6 +673,7 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
breakingUIChange = true;
}
}
handleFlattenResultIndexMappingUpdate(existingConfig, listener);

ActionListener<Void> confirmBatchRunningListener = ActionListener
.wrap(
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.forecast.transport.IndexForecasterResponse;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.rest.RestChannel;
Expand Down Expand Up @@ -290,4 +293,21 @@ public static Entity buildEntity(RestRequest request, String detectorId) throws
// not a valid profile request with correct entity information
return null;
}

public static String getConfigIdFromIndexResponse(ActionResponse actionResponse) {
String configId;
if (actionResponse instanceof IndexAnomalyDetectorResponse) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) actionResponse;
configId = response.getId();
logger.info("Handling IndexAnomalyDetectorResponse for configId: {}", configId);
} else if (actionResponse instanceof IndexForecasterResponse) {
IndexForecasterResponse response = (IndexForecasterResponse) actionResponse;
configId = response.getId();
logger.info("Handling IndexForecasterResponse for configId: {}", configId);
} else {
throw new IllegalStateException("Unexpected response type: " + actionResponse.getClass().getName());
}
return configId;
}

}
72 changes: 72 additions & 0 deletions src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,78 @@ public void testUpdateAnomalyDetector_disableFlattenResultIndex_shouldDeletePipe
assertEquals("Expected 404 response but got: " + statusCode, 404, statusCode);
}

public void testUpdateAnomalyDetector_enableFlattenResultIndex_shouldCreatePipeline() throws Exception {
AnomalyDetector detector = createIndexAndGetAnomalyDetector(
INDEX_NAME,
ImmutableList.of(TestHelpers.randomFeature("feature_bytes", "agg", true)),
false,
false
);
// test behavior when AD is enabled
updateClusterSettings(ADEnabledSetting.AD_ENABLED, true);
// create a detector with flatten result index disabled, shouldn't find related ingest pipeline
Response response = TestHelpers
.makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI, ImmutableMap.of(), TestHelpers.toHttpEntity(detector), null);
assertEquals("Create anomaly detector without flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
String expectedPipelineId = String.format(Locale.ROOT, "flatten_result_index_ingest_pipeline%s", id.toLowerCase(Locale.ROOT));
String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId);
ResponseException responseException = expectThrows(
ResponseException.class,
() -> TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null)
);
int statusCode = responseException.getResponse().getStatusLine().getStatusCode();
assertEquals("Expected 404 response but got: " + statusCode, 404, statusCode);
// update the detector with flatten result index enabled, should be able to find ingest pipeline
List<Feature> features = detector.getFeatureAttributes();
AnomalyDetector newDetector = new AnomalyDetector(
id,
detector.getVersion(),
detector.getName(),
detector.getDescription(),
detector.getTimeField(),
detector.getIndices(),
features,
detector.getFilterQuery(),
detector.getInterval(),
detector.getWindowDelay(),
detector.getShingleSize(),
detector.getUiMetadata(),
detector.getSchemaVersion(),
detector.getLastUpdateTime(),
null,
detector.getUser(),
detector.getCustomResultIndexOrAlias(),
TestHelpers.randomImputationOption(features),
randomIntBetween(1, 10000),
randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2),
randomIntBetween(1, 1000),
null,
null,
null,
null,
true,
detector.getLastBreakingUIChangeTime()
);
Response updateResponse = TestHelpers
.makeRequest(
client(),
"PUT",
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
ImmutableMap.of(),
TestHelpers.toHttpEntity(newDetector),
null
);
assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse));
Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null);
assertEquals(
"Expected 200 response but got: " + getPipelineResponse.getStatusLine().getStatusCode(),
200,
getPipelineResponse.getStatusLine().getStatusCode()
);
}

public void testCreateAnomalyDetector() throws Exception {
AnomalyDetector detector = createIndexAndGetAnomalyDetector(INDEX_NAME);
updateClusterSettings(ADEnabledSetting.AD_ENABLED, false);
Expand Down

0 comments on commit 9b371ba

Please sign in to comment.