Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Transform as an ISM action #760

Merged
merged 38 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9536186
Initial impl
tanqiuliu Apr 5, 2023
679f92b
Merge branch 'opensearch-project:main' into feature/ism_transform
tanqiuliu Apr 12, 2023
7dab6cf
fix style
tanqiuliu Apr 8, 2023
40c9390
end to end functional
tanqiuliu Apr 15, 2023
39cb7b5
ISM transform unit tests & integ tests
tanqiuliu Apr 17, 2023
47721af
Merge branch 'main' into feature/ism_transform
tanqiuliu May 12, 2023
02ed7df
Merge branch 'main' into feature/ism_transform
tanqiuliu Jun 3, 2023
6a75695
Merge branch 'main' into feature/ism_transform
tanqiuliu Jun 18, 2023
10ba7c5
Merge branch 'main' into feature/ism_transform
tanqiuliu Jul 1, 2023
e6699d3
Merge branch 'main' into feature/ism_transform
tanqiuliu Jul 11, 2023
e972c12
Fix after core #8157 (#857)
bowenlan-amzn Jul 19, 2023
157960d
Upgrade the backport workflow (#862)
lezzago Jul 19, 2023
98ddb8d
Added 2.9 release notes. (#851)
AWSHurneyt Jul 21, 2023
271b25b
Handle NPE in isRollupIndex (#855)
blampe Jul 22, 2023
8918a79
Fix core XcontentType refactor (#873)
Hailong-am Jul 27, 2023
60fc6f2
fix for max & min aggregations when no metric property exist (#870)
sbcd90 Jul 28, 2023
c9ba2fb
core refactor change (#884)
Hailong-am Aug 3, 2023
58153bd
update backport branch name (#885)
Hailong-am Aug 7, 2023
513553b
core refactor change (#887)
Hailong-am Aug 8, 2023
4cd7730
Fix breaking change by core refactor (#888)
Hailong-am Aug 10, 2023
d8bb834
fix core breaking (#906)
bowenlan-amzn Aug 25, 2023
3ec3c3e
Support copy alias in rollover (#907)
bowenlan-amzn Sep 1, 2023
716ca89
Set preference to _primary when searching control-center index (#911)
gaobinlong Sep 1, 2023
96ef16e
Add primary first preference to all search requests (#912)
bowenlan-amzn Sep 1, 2023
1ba75b0
fix intelliJ IDEA gradle sync error (#916)
Hailong-am Sep 5, 2023
9a7212d
make control center index as system index (#919)
Hailong-am Sep 6, 2023
a6ea2f7
Updates demo certs used in integ tests (#921)
DarshitChanpura Sep 7, 2023
1b4d9d3
Added 2.10 release notes (#925)
lezzago Sep 8, 2023
e5ff66b
Bump bwc version (#930)
bowenlan-amzn Sep 14, 2023
155f0af
fix integ tests; upgrade mappings versions
tanqiuliu Sep 25, 2023
19003c9
Fix DCO
tanqiuliu Sep 25, 2023
f9b0af4
Merge branch 'main' into feature/ism_transform
tanqiuliu Sep 25, 2023
b0c56b5
Merge branch 'main' into feature/ism_transform
tanqiuliu Oct 5, 2023
0202ea2
Addressed pr comments; Add integ test case for re-execute the same tr…
tanqiuliu Oct 5, 2023
a082860
Addressed detekt error
tanqiuliu Oct 6, 2023
857abe5
Merge branch 'main' into feature/ism_transform
tanqiuliu Oct 6, 2023
2f8c4d6
Added ISMTransform writeable test
tanqiuliu Oct 7, 2023
399afba
Addressed comments; Moved updateTransformStartTime to IndexManagement…
tanqiuliu Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea

### Building from the command line

1. `./gradlew build` builds and tests project.
1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ data class ActionProperties(
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
val shrinkActionProperties: ShrinkActionProperties? = null,
val transformActionProperties: TransformActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
Expand All @@ -32,6 +33,7 @@ data class ActionProperties(
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
out.writeOptionalWriteable(transformActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -40,6 +42,7 @@ data class ActionProperties(
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params)
return builder
}

Expand All @@ -52,7 +55,8 @@ data class ActionProperties(
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
Expand All @@ -61,6 +65,7 @@ data class ActionProperties(
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null
var transformActionProperties: TransformActionProperties? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -75,10 +80,13 @@ data class ActionProperties(
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> {
transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

data class TransformActionProperties(
val transformId: String?,
val hasTransformFailed: Boolean?
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalString(transformId)
out.writeOptionalBoolean(hasTransformFailed)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId)
if (hasTransformFailed != null) builder.field(Properties.HAS_TRANSFORM_FAILED.key, hasTransformFailed)
return builder
}

companion object {
const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties"

fun fromStreamInput(sin: StreamInput): TransformActionProperties {
val transformId: String? = sin.readOptionalString()
val hasTransformFailed: Boolean? = sin.readOptionalBoolean()
return TransformActionProperties(transformId, hasTransformFailed)
}

fun parse(xcp: XContentParser): TransformActionProperties {
var transformId: String? = null
var hasTransformFailed: Boolean? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Properties.TRANSFORM_ID.key -> transformId = xcp.text()
Properties.HAS_TRANSFORM_FAILED.key -> hasTransformFailed = xcp.booleanValue()
}
}

return TransformActionProperties(transformId, hasTransformFailed)
}
}

enum class Properties(val key: String) {
TRANSFORM_ID("transformId"),
tanqiuliu marked this conversation as resolved.
Show resolved Hide resolved
HAS_TRANSFORM_FAILED("has_transform_failed")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry
)

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
Expand All @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() {
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
SnapshotActionParser(),
TransformActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.index.Index
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
Expand Down Expand Up @@ -110,7 +111,8 @@ class ManagedIndexCoordinator(
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
indexManagementIndices: IndexManagementIndices,
private val indexMetadataProvider: IndexMetadataProvider
private val indexMetadataProvider: IndexMetadataProvider,
private val xContentRegistry: NamedXContentRegistry
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {
Expand Down Expand Up @@ -432,7 +434,7 @@ class ManagedIndexCoordinator(

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
parseFromSearchResponse(response = response, parse = Policy.Companion::parse)
parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (ex: IndexNotFoundException) {
emptyList()
} catch (ex: ClusterBlockException) {
Expand Down Expand Up @@ -613,7 +615,7 @@ class ManagedIndexCoordinator(
}
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformAction(
val ismTransform: ISMTransform,
index: Int
) : Action(name, index) {

companion object {
const val name = "transform"
const val ISM_TRANSFORM_FIELD = "ism_transform"
}

private val attemptCreateTransformJobStep = AttemptCreateTransformJobStep(this)
private val waitForTransformCompletionStep = WaitForTransformCompletionStep()
private val steps = listOf(attemptCreateTransformJobStep, waitForTransformCompletionStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// if stepMetaData is null, return first step
val stepMetaData = context.metadata.stepMetaData ?: return attemptCreateTransformJobStep

// if the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> waitForTransformCompletionStep
else -> attemptCreateTransformJobStep
}
}

return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> attemptCreateTransformJobStep
else -> waitForTransformCompletionStep
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(ISM_TRANSFORM_FIELD, ismTransform)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
ismTransform.writeTo(out)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val ismTransform = ISMTransform(sin)
val index = sin.readInt()
return TransformAction(ismTransform, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var ismTransform: ISMTransform? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
TransformAction.ISM_TRANSFORM_FIELD -> ismTransform = ISMTransform.parse(xcp)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in TransformAction.")
}
}

return TransformAction(ismTransform = requireNotNull(ismTransform) { "TransformAction transform is null." }, index)
}

override fun getActionType(): String {
return TransformAction.name
}
}
Loading