Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

jackiehanyang
Copy link
Collaborator

Description

wip - sending implementation out first.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang
Copy link
Collaborator Author

jackiehanyang commented Jan 24, 2025

resolved
non related IT test case failed:
REPRODUCE WITH: ./gradlew ':integTest' --tests "org.opensearch.ad.rest.AnomalyDetectorRestApiIT.testSearchTopAnomalyResultsWithCustomResultIndex" -Dtests.seed=764AE145CE6907EA -Dtests.security.manager=false -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=es-419 -Dtests.timezone=NZ -Druntime.java=21
org.opensearch.ad.rest.AnomalyDetectorRestApiIT > testSearchTopAnomalyResultsWithCustomResultIndex FAILED
org.opensearch.client.ResponseException: method [POST], host [http://127.0.0.1:44973/], URI [/_plugins/_anomaly_detection/detectors], status line [HTTP/1.1 500 Internal Server Error]
{"error":{"root_cause":[{"type":"status_exception","reason":"Fail to create detector"}],"type":"status_exception","reason":"Fail to create detector"},"status":500}
at __randomizedtesting.SeedInfo.seed([764AE145CE6907EA:A9D4336811E01C9A]:0)
at app//org.opensearch.client.RestClient.convertResponse(RestClient.java:501)
at app//org.opensearch.client.RestClient.performRequest(RestClient.java:384)
at app//org.opensearch.client.RestClient.performRequest(RestClient.java:359)
at app//org.opensearch.timeseries.TestHelpers.makeRequest(TestHelpers.java:231)
at app//org.opensearch.timeseries.TestHelpers.makeRequest(TestHelpers.java:204)
at app//org.opensearch.ad.AnomalyDetectorRestTestCase.createAnomalyDetector(AnomalyDetectorRestTestCase.java:133)
at app//org.opensearch.ad.rest.AnomalyDetectorRestApiIT.testSearchTopAnomalyResultsWithCustomResultIndex(AnomalyDetectorRestApiIT.java:2069)

@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 2 times, most recently from 2fd6bc2 to 8b12fd7 Compare January 27, 2025 07:22
Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang
Copy link
Collaborator Author

checking the failed bwc test...

Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 4 times, most recently from ac8f122 to 9ffc24f Compare January 27, 2025 18:59
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 3 times, most recently from 4400f44 to b876a22 Compare January 28, 2025 00:07
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 2 times, most recently from f1e21cf to ad3d99c Compare January 28, 2025 09:15
Signed-off-by: Jackie Han <[email protected]>
@kaituo
Copy link
Collaborator

kaituo commented Jan 28, 2025

bwc test failure is similar to opensearch-project/OpenSearch#15234

=== Standard error of node `node{::adBwcCluster#twoThirdsUpgradedClusterTask-0}` ===
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:393) [opensearch-2.19.0.jar:2.19.0]
?  	... 41 more
?  Caused by: java.lang.IllegalStateException: unexpected byte [0xac]
?  	at org.opensearch.core.common.io.stream.StreamInput.readBoolean(StreamInput.java:596) ~[opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.common.io.stream.StreamInput.readOptionalBoolean(StreamInput.java:606) ~[opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.search.internal.ShardSearchRequest.<init>(ShardSearchRequest.java:255) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.RequestHandlerRegistry.newRequest(RequestHandlerRegistry.java:87) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.newRequest(NativeMessageHandler.java:316) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleRequest(NativeMessageHandler.java:271) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:146) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:120) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:112) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:768) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.forwardFragments(InboundBytesHandler.java:137) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.doHandleBytes(InboundBytesHandler.java:77) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:124) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:113) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
?  	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
?  	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
?  	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[?:?]
?  	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[?:?]
?  	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
?  	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
?  	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
? WARN ][r.suppressed             ] [adBwcCluster0-2] path: /_opendistro/_anomaly_detection/detectors/yHNUrJQBrgHv3CV3UwAC/_start, params: {detectorID=yHNUrJQBrgHv3CV3UwAC}
?  org.opensearch.OpenSearchStatusException: Fail to start detector
?  	at org.opensearch.timeseries.util.RestHandlerUtils.lambda$wrapRestActionListener$2(RestHandlerUtils.java:243) [opensearch-anomaly-detection-2.19.0.0.jar:2.19.0.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.timeseries.rest.handler.IndexJobActionHandler.lambda$onGetJobForWrite$10(IndexJobActionHandler.java:303) [opensearch-anomaly-detection-2.19.0.0.jar:2.19.0.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.timeseries.task.TaskManager.lambda$getAndExecuteOnLatestTasks$17(TaskManager.java:605) [opensearch-anomaly-detection-2.19.0.0.jar:2.19.0.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.support.TransportAction$1.onFailure(TransportAction.java:124) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.action.ActionListener$5.onFailure(ActionListener.java:277) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.raisePhaseFailure(AbstractSearchAsyncAction.java:802) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:775) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:395) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:815) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:548) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:316) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:104) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:75) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:766) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TransportService$9.handleException(TransportService.java:1741) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1527) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.lambda$handleException$5(NativeMessageHandler.java:454) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:343) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleException(NativeMessageHandler.java:452) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handlerResponseError(NativeMessageHandler.java:444) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:172) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:120) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:112) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:768) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.forwardFragments(InboundBytesHandler.java:137) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.doHandleBytes(InboundBytesHandler.java:77) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:124) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:113) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) [transport-netty4-client-2.19.0.jar:2.19.0]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) [netty-handler-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.117.Final.jar:4.1.117.Final]
?  	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
?   ? last 40 non error or warning messages from /__w/anomaly-detection/anomaly-detection/build/testclusters/adBwcCluster0-2/logs/opensearch.stdout.log ?
? [2025-01-28T09:53:13,046][DEBUG][o.o.c.c.C.CoordinatorPublication] [adBwcCluster0-2] publishing version 63 to [PublicationTarget{discoveryNode={adBwcCluster0-0}{yxG8heptSiK2eK9S9Ks46A}{mFsI26s-QTuJOWcGDLRAjQ}{127.0.0.1}{127.0.0.1:37589}{dimr}{upgraded=true, testattr=test, shard_indexing_pressure_enabled=true}, state=NOT_STARTED, ackIsPending=true}, PublicationTarget{discoveryNode={adBwcCluster0-1}{th_t3o4ySIqKWHrFfQg35g}{IePje0osQvilETYvk6-fFA}{127.0.0.1}{127.0.0.1:45277}{dimr}{upgraded=true, testattr=test, shard_indexing_pressure_enabled=true}, state=NOT_STARTED, ackIsPending=true}, PublicationTarget{discoveryNode={adBwcCluster0-2}{jpEavNb_SKyKsfA6FVKT0A}{SalIDjFNQGC9PjW8hGFlLQ}{127.0.0.1}{127.0.0.1:33951}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}, state=NOT_STARTED, ackIsPending=true}]
? [2025-01-28T09:53:13,046][DEBUG][o.o.c.c.PublicationTransportHandler] [adBwcCluster0-2] received diff cluster state version [63] with uuid [_1E3aVONQ_6Q0Oj7pl9LVg], diff size [589]

@@ -1008,6 +1016,53 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
}
}

/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you roll over flattened result index same as custom result index?

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.info("Ingest pipeline deleted successfully for pipelineId: {}", pipelineId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not seem to return listener in this branch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do you need to remove the index.default_pipeline setting from the index?

Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
List<Feature> features = detector.getFeatureAttributes();
long expectedFeatures = features.stream().filter(Feature::getEnabled).count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we check equality of expectedFeatures?


PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(response -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think use client.admin() is fine as I cannot find other ways of put pipeline with client and core's put pipeline api used the same thing: https://github.com/opensearch-project/OpenSearch/blob/32a88eb16bb7bff15fa5430c1490cd102c6cca28/server/src/main/java/org/opensearch/rest/action/ingest/RestPutPipelineAction.java#L77

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants