Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jan 27, 2025
1 parent 9fced72 commit 2fd6bc2
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 27 deletions.
21 changes: 21 additions & 0 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.EnumMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,6 +46,8 @@
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This class provides utility methods for various anomaly detection indices.
*/
Expand All @@ -57,6 +60,8 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
// The index name pattern to query all AD result, history and current AD result
public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*";

// private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructor function
*
Expand Down Expand Up @@ -122,6 +127,22 @@ public static String getResultMappings() throws IOException {
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
}

/**
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
* @return JSON mapping for the flattened result index.
* @throws IOException if the mapping file cannot be read.
*/
public static String getFlattenedResultMappings() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> mapping = objectMapper
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);

mapping.put("dynamic", true);

return objectMapper.writeValueAsString(mapping);
}

/**
* Get anomaly detector state index mapping json content.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

Expand Down Expand Up @@ -90,6 +90,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;

Expand Down Expand Up @@ -137,6 +138,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
private NamedXContentRegistry xContentRegistry;
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
protected String customResultIndexPrefix;
private final ObjectMapper objectMapper = new ObjectMapper();

protected class IndexState {
// keep track of whether the mapping version is up-to-date
Expand Down Expand Up @@ -1016,21 +1018,28 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu

/**
* creates flattened result index
* @param indexName the index name
* @param flattenedResultIndexAlias the flattened result index alias
* @param actionListener the action listener
* @throws IOException
*/
public void initFlattenedResultIndex(String indexName, ActionListener<CreateIndexResponse> actionListener) throws IOException {
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener)
throws IOException {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);

if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}

choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {}", indexName);
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
Expand All @@ -1043,15 +1052,6 @@ public void initFlattenedResultIndex(String indexName, ActionListener<CreateInde
}));
}

/**
* Get flattened result index mapping json content
* @return flattened result index mapping
* @throws IOException
*/
public String getFlattenedResultIndexMappings() throws IOException {
return getMappings(FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE);
}

public <T> void validateCustomIndexForBackendJob(
String resultIndexOrAlias,
String securityLogId,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public abstract class Config implements Writeable, ToXContentObject {
public static final String RESULT_INDEX_FIELD_MIN_SIZE = "result_index_min_size";
public static final String RESULT_INDEX_FIELD_MIN_AGE = "result_index_min_age";
public static final String RESULT_INDEX_FIELD_TTL = "result_index_ttl";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_result_index_mapping";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_custom_result_index";
// Changing categorical field, feature attributes, interval, windowDelay, time field, horizon, indices,
// result index would force us to display results only from the most recent update. Otherwise,
// the UI appear cluttered and unclear.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ protected void prepareConfigIndexing(boolean indexingDryRun, ActionListener<T> l
}
}

private String getFlattenedResultIndexAlias(String configId) {
return config.getCustomResultIndexOrAlias() + "_flattened_" + configId.toLowerCase(Locale.ROOT);
}

private String getFlattenResultIndexIngestPipelineId(String configId) {
return "flatten_result_index_ingest_pipeline" + configId.toLowerCase(Locale.ROOT);
}

private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener) {
handler.confirmJobRunning(clusterService, client, id, listener, () -> {
handleFlattenResultIndexMappingUpdate(listener);
Expand All @@ -465,17 +473,17 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
String detectorId = response.getId();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);
String configId = response.getId();
String flattenedResultIndexAlias = getFlattenedResultIndexAlias(configId);
String pipelineId = getFlattenResultIndexIngestPipelineId(configId);

timeSeriesIndices
.initFlattenedResultIndex(
indexName,
ActionListener.wrap(initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(pipelineResponse -> {
flattenedResultIndexAlias,
ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
updateResultIndexSetting(
pipelineId,
indexName,
flattenedResultIndexAlias,
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
);
}, listener::onFailure)), listener::onFailure)
Expand All @@ -493,12 +501,12 @@ private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConf
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
}

protected void setupIngestPipeline(String detectorId, ActionListener<T> listener) {
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);
protected void setupIngestPipeline(String configId, ActionListener<T> listener) {
String flattenedResultIndexAlias = getFlattenedResultIndexAlias(configId);
String pipelineId = getFlattenResultIndexIngestPipelineId(configId);

try {
BytesReference pipelineSource = createPipelineDefinition(indexName);
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

Expand Down Expand Up @@ -576,10 +584,9 @@ private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
return;
}
if (config.getFlattenResultIndexMapping() != null && config.getFlattenResultIndexMapping()) {
// if field value is true, create the pipeline. No need to get and compare with previous value
setupIngestPipeline(id, listener);
} else {
String pipelineId = "anomaly_detection_ingest_pipeline_" + config.getId();
String pipelineId = getFlattenResultIndexIngestPipelineId(config.getId());
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {

@Override
Expand Down

0 comments on commit 2fd6bc2

Please sign in to comment.