Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Convert-Index-To-Remote Action for issue #808 #1302

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ integTest {
excludeTestsMatching "org.opensearch.indexmanagement.indexstatemanagement.action.NotificationActionIT"
}
exclude 'org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/ConvertIndexToRemoteIT.class'
}

// TODO: raise issue in Core, this is because of the test framework
Expand Down Expand Up @@ -647,6 +648,7 @@ task integTestRemote(type: RestIntegTestTask) {

// Snapshot action integration tests rely on node level setting path.repo which we can't set remotely
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteIT.class'
}

// === Set up BWC tests ===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityActionParser
Expand Down Expand Up @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() {
ShrinkActionParser(),
SnapshotActionParser(),
TransformActionParser(),
ConvertIndexToRemoteActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.WaitForRestoreStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

class ConvertIndexToRemoteAction(
val repository: String,
index: Int,
) : Action(name, index) {

companion object {
const val name = "convert_index_to_remote"
const val REPOSITORY_FIELD = "repository"

@JvmStatic
fun fromStreamInput(si: StreamInput): ConvertIndexToRemoteAction {
val repository = si.readString()
val index = si.readInt()
return ConvertIndexToRemoteAction(repository, index)
}
}

private val attemptRestoreStep = AttemptRestoreStep(this)
private val waitForRestoreStep = WaitForRestoreStep()

private val steps = listOf(attemptRestoreStep, waitForRestoreStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// If stepMetaData is null, return the first step (attemptRestoreStep)
val stepMetaData = context.metadata.stepMetaData ?: return attemptRestoreStep

// If the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptRestoreStep.name -> waitForRestoreStep
else -> attemptRestoreStep // Default to the first step
}
}

// If the current step is not completed, continue executing it
return when (stepMetaData.name) {
AttemptRestoreStep.name -> attemptRestoreStep
else -> waitForRestoreStep
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(REPOSITORY_FIELD, repository)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeString(repository)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class ConvertIndexToRemoteActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
return ConvertIndexToRemoteAction.fromStreamInput(sin)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var repository: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.")
}
}

return ConvertIndexToRemoteAction(
repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" },
index = index,
)
}

override fun getActionType(): String {
return ConvertIndexToRemoteAction.name
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotException
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null

@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod")
override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
val repository = action.repository

try {
val mutableInfo = mutableMapOf<String, String>()

// List snapshots matching the pattern
val getSnapshotsRequest = GetSnapshotsRequest()
.repository(repository)
.snapshots(arrayOf("$indexName*"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this will retrieve snapshots with name started with index name, what if the snapshot is for many indexes (User has snapshot management to take a snapshot of group of indexes) and the name would not start with specific index.

I think $indexName* can be a good default value, but we better provide a param for user to specify other snapshot prefix.

.ignoreUnavailable(true)
.verbose(true)

val getSnapshotsResponse: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil {
getSnapshots(getSnapshotsRequest, it)
}
val snapshots = getSnapshotsResponse.snapshots
if (snapshots.isNullOrEmpty()) {
val message = getFailedMessage(indexName, "No snapshots found matching pattern [$indexName*]")
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

val successfulSnapshots = snapshots.filter { it.state() == SnapshotState.SUCCESS }

if (successfulSnapshots.isEmpty()) {
val message = getFailedMessage(
indexName,
"No successful snapshots found matching pattern [$indexName*]",
)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

// Select the latest snapshot
val latestSnapshotInfo = successfulSnapshots.maxByOrNull { it.endTime() }!!
logger.info("Restoring snapshot info: $latestSnapshotInfo")

// Use the snapshot name from the selected SnapshotInfo
snapshotName = latestSnapshotInfo.snapshotId().name

// Proceed with the restore operation
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName)
.indices(indexName)
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.renamePattern("^(.*)\$")
.renameReplacement("$1_remote")
.waitForCompletion(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an expert on searchable snapshot, I can see that it won't download the data but only some cluster state, I feel safe to just wait for completion true here as it probably won't take long. And this would simplify the workflow by about half 😜

@kotwanikunal to have a second opinion on this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating alot on this on my mind whether to wait for it so making the thread blocking and simplify or just make it false and wait for it to complete.

val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil {
restoreSnapshot(restoreSnapshotRequest, it)
}

when (response.status()) {
RestStatus.ACCEPTED, RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = getSuccessMessage(indexName)
}
else -> {
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}")
logger.warn("$message - $response")
stepStatus = StepStatus.FAILED
mutableInfo["message"] = message
mutableInfo["cause"] = response.toString()
}
}
info = mutableInfo.toMap()
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotException) {
handleRestoreException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotException) {
handleRestoreException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun handleRestoreException(indexName: String, e: SnapshotException) {
val message = getFailedRestoreMessage(indexName)
logger.debug(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetadata.actionMetaData
return currentMetadata.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = false

companion object {
val validTopContextFields = setOf("index", "indexUuid")
const val name = "attempt_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to start restore for [index=$index], cause: $cause"
fun getFailedRestoreMessage(index: String) = "Failed to start restore due to concurrent restore or snapshot in progress [index=$index]"
fun getSuccessMessage(index: String) = "Successfully started restore for [index=$index]"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.RestoreInProgress
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException

class WaitForRestoreStep : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index

try {
val clusterState = context.clusterService.state()
val restoreInProgress = clusterState.custom<RestoreInProgress>(RestoreInProgress.TYPE)

val restoreOngoing = restoreInProgress?.let { rip ->
rip.any { entry ->
entry.indices().contains(indexName)
}
} ?: false

if (restoreOngoing) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getPendingMessage(indexName))
} else {
// Restore is complete
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
}
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val cause = (e as? RemoteTransportException)?.cause

val errorMessage = cause?.message ?: e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = true

companion object {
const val name = "wait_for_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to check restore status for [index=$index], cause: $cause"
fun getPendingMessage(index: String) = "Restore not complete for [index=$index], retrying..."
fun getSuccessMessage(index: String) = "Restore complete for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ActionValidation(
"read_write" -> ValidateReadWrite(settings, clusterService, jvmService).execute(indexName)
"replica_count" -> ValidateReplicaCount(settings, clusterService, jvmService).execute(indexName)
"snapshot" -> ValidateSnapshot(settings, clusterService, jvmService).execute(indexName)
"convert_index_to_remote" -> ValidateConvertIndexToRemote(settings, clusterService, jvmService).execute(indexName)
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
Expand Down
Loading
Loading