Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.EnumMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,6 +46,8 @@
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This class provides utility methods for various anomaly detection indices.
*/
Expand Down Expand Up @@ -122,6 +125,22 @@ public static String getResultMappings() throws IOException {
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
}

/**
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
* @return JSON mapping for the flattened result index.
* @throws IOException if the mapping file cannot be read.
*/
public static String getFlattenedResultMappings() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> mapping = objectMapper
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);

mapping.put("dynamic", true);

return objectMapper.writeValueAsString(mapping);
}

/**
* Get anomaly detector state index mapping json content.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ public static AnomalyDetector parse(
case RESULT_INDEX_FIELD_TTL:
customResultIndexTTL = onlyParseNumberValue(parser);
break;
case FLATTEN_RESULT_INDEX_MAPPING:
case FLATTEN_CUSTOM_RESULT_INDEX:
flattenResultIndexMapping = onlyParseBooleanValue(parser);
break;
case BREAKING_UI_CHANGE_TIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public ADResultWriteRequest(
String detectorId,
RequestPriority priority,
AnomalyResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
super(expirationEpochMs, detectorId, priority, result, resultIndex);
super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex);
}

public ADResultWriteRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ protected ADResultWriteRequest createResultWriteRequest(
String configId,
RequestPriority priority,
AnomalyResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void saveResult(AnomalyResult result, Config config) {
config.getId(),
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
public class ADResultBulkTransportAction extends ResultBulkTransportAction<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest> {

private static final Logger LOG = LogManager.getLogger(ADResultBulkTransportAction.class);
private final ClusterService clusterService;
private final Client client;

@Inject
public ADResultBulkTransportAction(
Expand All @@ -61,39 +63,77 @@ public ADResultBulkTransportAction(
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
ADResultBulkRequest::new
);
this.clusterService = clusterService;
this.client = client;
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);
}

/**
* Prepares a {@link BulkRequest} for indexing anomaly detection results.
*
* This method processes a list of anomaly detection results provided in the {@link ADResultBulkRequest}.
* Each result is evaluated based on the current indexing pressure and result priority. If a flattened
* result index exists for the result, the result is also added to the flattened index.
*
* @param indexingPressurePercent the current percentage of indexing pressure. This value influences
* whether a result is indexed based on predefined thresholds and probabilities.
* @param request the {@link ADResultBulkRequest} containing anomaly detection results
* to be processed.
* @return a {@link BulkRequest} containing all results that are eligible for indexing.
*
* <p><b>Behavior:</b></p>
* <ul>
* <li>Results are added to the bulk request if the indexing pressure is within acceptable limits
* or the result has high priority.</li>
* <li>If a flattened result index exists for a result, it is added to the flattened index in addition
* to the primary index.</li>
* </ul>
*
* <p><b>Indexing Pressure Thresholds:</b></p>
* <ul>
* <li>Below the soft limit: All results are added.</li>
* <li>Between the soft limit and the hard limit: High-priority results are always added, and
* other results are added based on a probability that decreases with increasing pressure.</li>
* <li>Above the hard limit: Only high-priority results are added.</li>
* </ul>
*
* @see ADResultBulkRequest
* @see BulkRequest
* @see ADResultWriteRequest
*/
@Override
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
BulkRequest bulkRequest = new BulkRequest();
List<ADResultWriteRequest> results = request.getResults();

if (indexingPressurePercent <= softLimit) {
for (ADResultWriteRequest resultWriteRequest : results) {
addResult(bulkRequest, resultWriteRequest.getResult(), resultWriteRequest.getResultIndex());
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
String resultIndex = resultWriteRequest.getResultIndex();

if (shouldAddResult(indexingPressurePercent, result)) {
addResult(bulkRequest, result, resultIndex);
if (resultWriteRequest.getFlattenResultIndex() != null) {
addResult(bulkRequest, result, resultWriteRequest.getFlattenResultIndex());
}
}
}

return bulkRequest;
}

private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult result) {
if (indexingPressurePercent <= softLimit) {
// Always add when below soft limit
return true;
} else if (indexingPressurePercent <= hardLimit) {
// exceed soft limit (60%) but smaller than hard limit (90%)
float acceptProbability = 1 - indexingPressurePercent;
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
if (result.isHighPriority() || random.nextFloat() < acceptProbability) {
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
}
}
return result.isHighPriority() || random.nextFloat() < acceptProbability;
} else {
// if exceeding hard limit, only index non-zero grade or error result
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
if (result.isHighPriority()) {
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
}
}
return result.isHighPriority();
}

return bulkRequest;
}

private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public static Forecaster parse(
case RESULT_INDEX_FIELD_TTL:
customResultIndexTTL = parser.intValue();
break;
case FLATTEN_RESULT_INDEX_MAPPING:
case FLATTEN_CUSTOM_RESULT_INDEX:
flattenResultIndexMapping = parser.booleanValue();
break;
case BREAKING_UI_CHANGE_TIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public ForecastResultWriteRequest(
String forecasterId,
RequestPriority priority,
ForecastResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
super(expirationEpochMs, forecasterId, priority, result, resultIndex);
super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex);
}

public ForecastResultWriteRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ protected ForecastResultWriteRequest createResultWriteRequest(
String configId,
RequestPriority priority,
ForecastResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void saveResult(ForecastResult result, Config config) {
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static String FAIL_TO_FIND_CONFIG_MSG = "Can't find config with id: ";
public static final String CAN_NOT_CHANGE_CATEGORY_FIELD = "Can't change category field";
public static final String CAN_NOT_CHANGE_CUSTOM_RESULT_INDEX = "Can't change custom result index";
public static final String CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX = "Can't change flatten result index";
public static final String CATEGORICAL_FIELD_TYPE_ERR_MSG = "Categorical field %s must be of type keyword or ip.";
// Modifying message for FEATURE below may break the parseADValidationException method of ValidateAnomalyDetectorTransportAction
public static final String FEATURE_INVALID_MSG_PREFIX = "Feature has an invalid query";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

Expand Down Expand Up @@ -89,6 +90,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;

Expand Down Expand Up @@ -136,6 +138,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
private NamedXContentRegistry xContentRegistry;
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
protected String customResultIndexPrefix;
private final ObjectMapper objectMapper = new ObjectMapper();

protected class IndexState {
// keep track of whether the mapping version is up-to-date
Expand Down Expand Up @@ -272,6 +275,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc
return Resources.toString(url, Charsets.UTF_8);
}

public static String getScripts(String scriptFileRelativePath) throws IOException {
URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath);
return Resources.toString(url, Charsets.UTF_8);
}

protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) {
request
.settings(
Expand Down Expand Up @@ -1008,6 +1016,45 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
}
}

/**
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
* creates flattened result index
* @param flattenedResultIndexAlias the flattened result index alias
* @param actionListener the action listener
*/
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener) {
try {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);

if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}

choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
} catch (Exception e) {
logger.error("Error while initializing flattened result index: {}", flattenedResultIndexAlias, e);
actionListener.onFailure(e);
}
}

public <T> void validateCustomIndexForBackendJob(
String resultIndexOrAlias,
String securityLogId,
Expand Down Expand Up @@ -1252,15 +1299,18 @@ protected void rolloverAndDeleteHistoryIndex(
}

// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));
candidateResultAliases.forEach(config -> {
handleResultIndexRolloverAndDelete(config.getCustomResultIndexOrAlias(), config, resultIndex);
if (config.getFlattenResultIndexMapping()) {
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();
handleResultIndexRolloverAndDelete(flattenedResultIndexAlias, config, resultIndex);
}
});
}, e -> { logger.error("Failed to get configs with custom result index alias.", e); }));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);
private void handleResultIndexRolloverAndDelete(String indexAlias, Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(indexAlias, getCustomResultIndexPattern(indexAlias));

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
Expand All @@ -1272,9 +1322,9 @@ private void handleCustomResultIndex(Config config, IndexType resultIndex) {

// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
indexAlias,
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
getAllCustomResultIndexPattern(indexAlias),
resultIndex,
config.getCustomResultIndexTTL()
);
Expand Down
Loading
Loading