-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Convert-Index-To-Remote Action for issue #808 #1302
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.action | ||
|
||
import org.opensearch.core.common.io.stream.StreamInput | ||
import org.opensearch.core.common.io.stream.StreamOutput | ||
import org.opensearch.core.xcontent.ToXContent | ||
import org.opensearch.core.xcontent.XContentBuilder | ||
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep | ||
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.WaitForRestoreStep | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext | ||
|
||
class ConvertIndexToRemoteAction( | ||
val repository: String, | ||
index: Int, | ||
) : Action(name, index) { | ||
|
||
companion object { | ||
const val name = "convert_index_to_remote" | ||
const val REPOSITORY_FIELD = "repository" | ||
|
||
@JvmStatic | ||
fun fromStreamInput(si: StreamInput): ConvertIndexToRemoteAction { | ||
val repository = si.readString() | ||
val index = si.readInt() | ||
return ConvertIndexToRemoteAction(repository, index) | ||
} | ||
} | ||
|
||
private val attemptRestoreStep = AttemptRestoreStep(this) | ||
private val waitForRestoreStep = WaitForRestoreStep() | ||
|
||
private val steps = listOf(attemptRestoreStep, waitForRestoreStep) | ||
|
||
@Suppress("ReturnCount") | ||
override fun getStepToExecute(context: StepContext): Step { | ||
// If stepMetaData is null, return the first step (attemptRestoreStep) | ||
val stepMetaData = context.metadata.stepMetaData ?: return attemptRestoreStep | ||
|
||
// If the current step has completed, return the next step | ||
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { | ||
return when (stepMetaData.name) { | ||
AttemptRestoreStep.name -> waitForRestoreStep | ||
else -> attemptRestoreStep // Default to the first step | ||
} | ||
} | ||
|
||
// If the current step is not completed, continue executing it | ||
return when (stepMetaData.name) { | ||
AttemptRestoreStep.name -> attemptRestoreStep | ||
else -> waitForRestoreStep | ||
} | ||
} | ||
|
||
override fun getSteps(): List<Step> = steps | ||
|
||
override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { | ||
builder.startObject(type) | ||
builder.field(REPOSITORY_FIELD, repository) | ||
builder.endObject() | ||
} | ||
|
||
override fun populateAction(out: StreamOutput) { | ||
out.writeString(repository) | ||
out.writeInt(actionIndex) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.action | ||
|
||
import org.opensearch.core.common.io.stream.StreamInput | ||
import org.opensearch.core.xcontent.XContentParser | ||
import org.opensearch.core.xcontent.XContentParser.Token | ||
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken | ||
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser | ||
|
||
class ConvertIndexToRemoteActionParser : ActionParser() { | ||
override fun fromStreamInput(sin: StreamInput): Action { | ||
return ConvertIndexToRemoteAction.fromStreamInput(sin) | ||
} | ||
|
||
override fun fromXContent(xcp: XContentParser, index: Int): Action { | ||
var repository: String? = null | ||
|
||
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != Token.END_OBJECT) { | ||
val fieldName = xcp.currentName() | ||
xcp.nextToken() | ||
|
||
when (fieldName) { | ||
REPOSITORY_FIELD -> repository = xcp.text() | ||
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.") | ||
} | ||
} | ||
|
||
return ConvertIndexToRemoteAction( | ||
repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" }, | ||
index = index, | ||
) | ||
} | ||
|
||
override fun getActionType(): String { | ||
return ConvertIndexToRemoteAction.name | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.step.restore | ||
|
||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.ExceptionsHelper | ||
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest | ||
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse | ||
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest | ||
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse | ||
import org.opensearch.core.rest.RestStatus | ||
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction | ||
import org.opensearch.indexmanagement.opensearchapi.suspendUntil | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData | ||
import org.opensearch.snapshots.SnapshotException | ||
import org.opensearch.snapshots.SnapshotState | ||
import org.opensearch.transport.RemoteTransportException | ||
|
||
class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(name) { | ||
|
||
private val logger = LogManager.getLogger(javaClass) | ||
private var stepStatus = StepStatus.STARTING | ||
private var info: Map<String, Any>? = null | ||
private var snapshotName: String? = null | ||
|
||
@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod") | ||
override suspend fun execute(): Step { | ||
val context = this.context ?: return this | ||
val indexName = context.metadata.index | ||
val repository = action.repository | ||
|
||
try { | ||
val mutableInfo = mutableMapOf<String, String>() | ||
|
||
// List snapshots matching the pattern | ||
val getSnapshotsRequest = GetSnapshotsRequest() | ||
.repository(repository) | ||
.snapshots(arrayOf("$indexName*")) | ||
.ignoreUnavailable(true) | ||
.verbose(true) | ||
|
||
val getSnapshotsResponse: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil { | ||
getSnapshots(getSnapshotsRequest, it) | ||
} | ||
val snapshots = getSnapshotsResponse.snapshots | ||
if (snapshots.isNullOrEmpty()) { | ||
val message = getFailedMessage(indexName, "No snapshots found matching pattern [$indexName*]") | ||
stepStatus = StepStatus.FAILED | ||
info = mapOf("message" to message) | ||
return this | ||
} | ||
|
||
val successfulSnapshots = snapshots.filter { it.state() == SnapshotState.SUCCESS } | ||
|
||
if (successfulSnapshots.isEmpty()) { | ||
val message = getFailedMessage( | ||
indexName, | ||
"No successful snapshots found matching pattern [$indexName*]", | ||
) | ||
stepStatus = StepStatus.FAILED | ||
info = mapOf("message" to message) | ||
return this | ||
} | ||
|
||
// Select the latest snapshot | ||
val latestSnapshotInfo = successfulSnapshots.maxByOrNull { it.endTime() }!! | ||
logger.info("Restoring snapshot info: $latestSnapshotInfo") | ||
|
||
// Use the snapshot name from the selected SnapshotInfo | ||
snapshotName = latestSnapshotInfo.snapshotId().name | ||
|
||
// Proceed with the restore operation | ||
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName) | ||
.indices(indexName) | ||
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) | ||
.renamePattern("^(.*)\$") | ||
.renameReplacement("$1_remote") | ||
.waitForCompletion(false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not an expert on searchable snapshot, I can see that it won't download the data but only some cluster state, I feel safe to just wait for completion true here as it probably won't take long. And this would simplify the workflow by about half 😜 @kotwanikunal to have a second opinion on this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was debating alot on this on my mind whether to wait for it so making the thread blocking and simplify or just make it false and wait for it to complete. |
||
val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil { | ||
restoreSnapshot(restoreSnapshotRequest, it) | ||
} | ||
|
||
when (response.status()) { | ||
RestStatus.ACCEPTED, RestStatus.OK -> { | ||
stepStatus = StepStatus.COMPLETED | ||
mutableInfo["message"] = getSuccessMessage(indexName) | ||
} | ||
else -> { | ||
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}") | ||
logger.warn("$message - $response") | ||
stepStatus = StepStatus.FAILED | ||
mutableInfo["message"] = message | ||
mutableInfo["cause"] = response.toString() | ||
} | ||
} | ||
info = mutableInfo.toMap() | ||
} catch (e: RemoteTransportException) { | ||
val cause = ExceptionsHelper.unwrapCause(e) | ||
if (cause is SnapshotException) { | ||
handleRestoreException(indexName, cause) | ||
} else { | ||
handleException(indexName, cause as Exception) | ||
} | ||
} catch (e: SnapshotException) { | ||
handleRestoreException(indexName, e) | ||
} catch (e: Exception) { | ||
handleException(indexName, e) | ||
} | ||
|
||
return this | ||
} | ||
|
||
private fun handleRestoreException(indexName: String, e: SnapshotException) { | ||
val message = getFailedRestoreMessage(indexName) | ||
logger.debug(message, e) | ||
stepStatus = StepStatus.FAILED | ||
val mutableInfo = mutableMapOf<String, Any>("message" to message) | ||
val errorMessage = e.message | ||
if (errorMessage != null) mutableInfo["cause"] = errorMessage | ||
info = mutableInfo.toMap() | ||
} | ||
|
||
private fun handleException(indexName: String, e: Exception) { | ||
val message = getFailedMessage(indexName, e.message ?: "Unknown error") | ||
logger.error(message, e) | ||
stepStatus = StepStatus.FAILED | ||
val mutableInfo = mutableMapOf<String, Any>("message" to message) | ||
val errorMessage = e.message | ||
if (errorMessage != null) mutableInfo["cause"] = errorMessage | ||
info = mutableInfo.toMap() | ||
} | ||
|
||
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { | ||
val currentActionMetaData = currentMetadata.actionMetaData | ||
return currentMetadata.copy( | ||
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)), | ||
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), | ||
transitionTo = null, | ||
info = info, | ||
) | ||
} | ||
|
||
override fun isIdempotent(): Boolean = false | ||
|
||
companion object { | ||
val validTopContextFields = setOf("index", "indexUuid") | ||
const val name = "attempt_restore" | ||
fun getFailedMessage(index: String, cause: String) = "Failed to start restore for [index=$index], cause: $cause" | ||
fun getFailedRestoreMessage(index: String) = "Failed to start restore due to concurrent restore or snapshot in progress [index=$index]" | ||
fun getSuccessMessage(index: String) = "Successfully started restore for [index=$index]" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.step.restore | ||
|
||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.cluster.RestoreInProgress | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData | ||
import org.opensearch.transport.RemoteTransportException | ||
|
||
class WaitForRestoreStep : Step(name) { | ||
|
||
private val logger = LogManager.getLogger(javaClass) | ||
private var stepStatus = StepStatus.STARTING | ||
private var info: Map<String, Any>? = null | ||
|
||
override suspend fun execute(): Step { | ||
val context = this.context ?: return this | ||
val indexName = context.metadata.index | ||
|
||
try { | ||
val clusterState = context.clusterService.state() | ||
val restoreInProgress = clusterState.custom<RestoreInProgress>(RestoreInProgress.TYPE) | ||
|
||
val restoreOngoing = restoreInProgress?.let { rip -> | ||
rip.any { entry -> | ||
entry.indices().contains(indexName) | ||
} | ||
} ?: false | ||
|
||
if (restoreOngoing) { | ||
stepStatus = StepStatus.CONDITION_NOT_MET | ||
info = mapOf("message" to getPendingMessage(indexName)) | ||
} else { | ||
// Restore is complete | ||
stepStatus = StepStatus.COMPLETED | ||
info = mapOf("message" to getSuccessMessage(indexName)) | ||
} | ||
} catch (e: Exception) { | ||
handleException(indexName, e) | ||
} | ||
|
||
return this | ||
} | ||
|
||
private fun handleException(indexName: String, e: Exception) { | ||
val message = getFailedMessage(indexName, e.message ?: "Unknown error") | ||
logger.error(message, e) | ||
stepStatus = StepStatus.FAILED | ||
val mutableInfo = mutableMapOf("message" to message) | ||
val cause = (e as? RemoteTransportException)?.cause | ||
|
||
val errorMessage = cause?.message ?: e.message | ||
if (errorMessage != null) mutableInfo["cause"] = errorMessage | ||
info = mutableInfo.toMap() | ||
} | ||
|
||
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { | ||
return currentMetadata.copy( | ||
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), | ||
transitionTo = null, | ||
info = info, | ||
) | ||
} | ||
|
||
override fun isIdempotent(): Boolean = true | ||
|
||
companion object { | ||
const val name = "wait_for_restore" | ||
fun getFailedMessage(index: String, cause: String) = "Failed to check restore status for [index=$index], cause: $cause" | ||
fun getPendingMessage(index: String) = "Restore not complete for [index=$index], retrying..." | ||
fun getSuccessMessage(index: String) = "Restore complete for [index=$index]" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this will retrieve snapshots with name started with index name, what if the snapshot is for many indexes (User has snapshot management to take a snapshot of group of indexes) and the name would not start with specific index.
I think
$indexName*
can be a good default value, but we better provide a param for user to specify other snapshot prefix.