Skip to content

Commit

Permalink
Add step function sync execution (#66)
Browse files Browse the repository at this point in the history
Co-authored-by: Sameer Ahmad <mohammadsameer.ahmad@capitalone.com>
  • Loading branch information
samblighty and Sameer Ahmad authored May 21, 2024
1 parent 2308bc7 commit aeb817a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/main/scala/dev/joss/gatling/sfn/SfnDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ trait SfnDsl {
builder: StartExecutionDslBuilder
): ActionBuilder = builder.build

implicit def sfnDslBuilder2ActionBuilder(
builder: StartSyncExecutionActionBuilder
): ActionBuilder = builder.build

implicit def sfnDslBuilder2ActionBuilder(
builder: CheckSucceededDslBuilder
): ActionBuilder = builder.build
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package uk.co.capitalone.services.decision.processor.utils

import dev.joss.gatling.sfn.action.SfnActionBase
import dev.joss.gatling.sfn.request.attributes.SfnExecuteAttributes
import io.gatling.commons.util.Clock
import io.gatling.core.CoreComponents
import io.gatling.core.action.Action
import io.gatling.core.session._
import io.gatling.core.stats.StatsEngine
import software.amazon.awssdk.services.sfn.SfnClient
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest

case class StartSyncExecutionAction(
sfnClient: SfnClient,
coreComponents: CoreComponents,
next: Action,
id: String,
attributes: SfnExecuteAttributes
) extends SfnActionBase {
override def statsEngine: StatsEngine = coreComponents.statsEngine
override def clock: Clock = coreComponents.clock
override def name: String = "Step Function Sync Execution"
override def execute(session: Session): Unit = {
startSyncExecution(session)
}

private def startSyncExecution(
session: Session
): Unit = {
val request = StartSyncExecutionRequest.builder()

var stateMachineArn = ""
attributes.stateMachineArn(session).map { arn =>
request.stateMachineArn(arn)
stateMachineArn = arn
}

attributes.input(session).map { arn =>
request.input(arn)
}

val tryStartExecutionResponse =
trySfnRequest(() => sfnClient.startSyncExecution(request.build()))

val startTime = tryStartExecutionResponse.get.startDate().toEpochMilli
val endTime = tryStartExecutionResponse.get.stopDate().toEpochMilli
if (tryStartExecutionResponse.get.status().toString.equals("SUCCEEDED")) {
logSuccess(
name,
session,
startTime,
endTime,
"The step function successfully completed!"
)
} else {
val failedMessage: String =
tryStartExecutionResponse.failed.get.getMessage
logFailure(
name,
session,
startTime,
endTime,
s"Could not execute step function with ARN: $stateMachineArn. Reason: $failedMessage"
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package uk.co.capitalone.services.decision.processor.utils

import dev.joss.gatling.sfn.action.SfnActionBuilderBase
import dev.joss.gatling.sfn.request.attributes.SfnExecuteAttributes
import io.gatling.core.action.Action
import io.gatling.core.structure.ScenarioContext
import io.gatling.core.util.NameGen

case class StartSyncExecutionActionBuilder(attributes: SfnExecuteAttributes)
extends SfnActionBuilderBase
with NameGen {
override def build(ctx: ScenarioContext, next: Action): Action = {
val protocol = getProtocol(ctx)
val client = protocol.sfnClient
StartSyncExecutionAction(
client,
ctx.coreComponents,
next,
genName(""),
attributes
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package dev.joss.gatling.sfn.request
import dev.joss.gatling.sfn.action.{
CheckStateSucceededActionBuilder,
CheckSucceededActionBuilder,
StartExecutionActionBuilder
StartExecutionActionBuilder,
StartSyncExecutionActionBuilder
}
import dev.joss.gatling.sfn.request.attributes.{
SfnCheckStateAttributes,
Expand All @@ -16,6 +17,8 @@ import software.amazon.awssdk.services.sfn.model.HistoryEventType
final class SfnDslBuilderBase(requestName: Expression[String]) {
def startExecution: StartExecutionDslBuilder.Arn =
new StartExecutionDslBuilder.Arn(requestName)
def startSyncExecution: StartSyncExecutionDslBuilder.Arn =
new StartSyncExecutionDslBuilder.Arn(requestName)

def checkSucceeded: CheckSucceededDslBuilder =
CheckSucceededDslBuilder(requestName, None)
Expand All @@ -27,6 +30,31 @@ final class SfnDslBuilderBase(requestName: Expression[String]) {
new CheckStateSucceededDslBuilder.StateName(requestName)
}

object StartSyncExecutionDslBuilder {
final class Arn(requestName: Expression[String]) {
def arn(arn: Expression[String]): Payload =
new Payload(requestName, arn)
}

final class Payload(
requestName: Expression[String],
executionArn: Expression[String]
) {
def payload(payload: Expression[String]): StartSyncExecutionDslBuilder =
StartSyncExecutionDslBuilder(
SfnExecuteAttributes(requestName, executionArn, payload),
StartSyncExecutionActionBuilder
)
}
}

final case class StartSyncExecutionDslBuilder(
attributes: SfnExecuteAttributes,
factory: SfnExecuteAttributes => StartSyncExecutionActionBuilder
) {
def build: ActionBuilder = factory(attributes)
}

object StartExecutionDslBuilder {
final class Arn(requestName: Expression[String]) {
def arn(arn: Expression[String]): Payload =
Expand Down

0 comments on commit aeb817a

Please sign in to comment.