Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject trace context into AWS Step Functions input #7585

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions dd-java-agent/instrumentation/aws-java-sfn-2.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
muzzle {
pass {
group = "software.amazon.awssdk"
module = "sfn"
versions = "[2.15.35,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4')
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2')
testImplementation 'software.amazon.awssdk:sfn:2.15.35'
testImplementation libs.testcontainers

latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+'
}

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import datadog.trace.bootstrap.JsonBuffer;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;

public class InputAttributeInjector {
public static String buildTraceContext(AgentSpan span) {
// Extract span tags
JsonBuffer spanTagsJSON = new JsonBuffer();
spanTagsJSON.beginObject();
span.getTags()
.forEach((tagKey, tagValue) -> spanTagsJSON.name(tagKey).value(tagValue.toString()));
spanTagsJSON.endObject();
Copy link

@avedmala avedmala Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this JSON format for the x-datadog-tags expected? Not urgent but need to know at some point if we need to handle this format or not.

The value from this impl looks like this {"component":"java-aws-sdk","thread":{"name":"main","id":"1"},"env":"dev","span":{"kind":"client"}}

Usually we expect something like _dd.p.dm=-0,_dd.p.tid=658b547000000000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does seem like this was written as a json encoding of the tags from the start of this pr. and reading the other associated PR's, it's not entirely clear if they expect the tags to be json-encoded or comma-separated-key=values ... will dig further.


// Build DD trace context object
JsonBuffer ddTraceContextJSON = new JsonBuffer();
ddTraceContextJSON
.beginObject()
.name("_datadog")
.beginObject()
.name("x-datadog-trace-id")
.value(span.getTraceId().toString())
.name("x-datadog-parent-id")
.value(String.valueOf(span.getSpanId()))
.name("x-datadog-tags")
.value(spanTagsJSON)
.endObject()
.endObject();

return ddTraceContextJSON.toString();
}

public static String getModifiedInput(String request, String ddTraceContextJSON) {
StringBuilder modifiedInput = new StringBuilder(request);
int startPos = modifiedInput.indexOf("{");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

Suggested change
int startPos = modifiedInput.indexOf("{");
int startPos = modifiedInput.indexOf('{');
Do not use a string with a single letter (...read more)

When using indexOf with only one character, use a character and not a string as it executes faster.

View in Datadog  Leave us feedback  Documentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm StringBuilder.indexOf() does not take a char type

int endPos = modifiedInput.lastIndexOf("}");
String inputContent = modifiedInput.substring(startPos + 1, endPos);
if (inputContent.isEmpty()) {
modifiedInput.insert(endPos, ddTraceContextJSON);
} else {
modifiedInput.insert(
endPos, ",".concat(ddTraceContextJSON)); // prepend comma to existing input
}
return modifiedInput.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import java.util.List;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

/** AWS SDK v2 Step Function instrumentation */
@AutoService(InstrumenterModule.class)
public final class SfnClientInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public SfnClientInstrumentation() {
super("sfn", "aws-sdk");
}

@Override
public String instrumentedType() {
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("resolveExecutionInterceptors")),
SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"};
}

public static class AwsSfnBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) {
for (ExecutionInterceptor interceptor : interceptors) {
if (interceptor instanceof SfnInterceptor) {
return;
}
}
interceptors.add(new SfnInterceptor());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.sfn.model.StartExecutionRequest;
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest;

public class SfnInterceptor implements ExecutionInterceptor {

public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
InstanceStore.of(ExecutionAttribute.class)
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan"));

public SfnInterceptor() {}

@Override
public SdkRequest modifyRequest(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably guard against any exceptions escaping from our listeners.
I'd suggest moving the logic into a modifyRequestImpl and then have modifyRequest wrap modifyRequestImpl and catch any exceptions.

Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
try {
return modifyRequestImpl(context, executionAttributes);
} catch (Exception e) {
return context.request();
}
}

public SdkRequest modifyRequestImpl(
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
// StartExecutionRequest
if (context.request() instanceof StartExecutionRequest) {
StartExecutionRequest request = (StartExecutionRequest) context.request();
if (request.input() == null) {
return request;
}
return injectTraceContext(span, request);
}

// StartSyncExecutionRequest
if (context.request() instanceof StartSyncExecutionRequest) {
StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request();
if (request.input() == null) {
return request;
}
return injectTraceContext(span, request);
}

return context.request();
}

private SdkRequest injectTraceContext(AgentSpan span, StartExecutionRequest request) {
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the StartExecutionRequest input
String modifiedInput =
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);

return request.toBuilder().input(modifiedInput).build();
}

private SdkRequest injectTraceContext(AgentSpan span, StartSyncExecutionRequest request) {
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the StartSyncExecutionRequest input
String modifiedInput =
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);

return request.toBuilder().input(modifiedInput).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.services.sfn.SfnClient
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import spock.lang.Shared

import java.time.Duration

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan


abstract class SfnClientTest extends VersionedNamingTestBase {
@Shared GenericContainer localStack
@Shared SfnClient sfnClient
DylanLovesCoffee marked this conversation as resolved.
Show resolved Hide resolved
@Shared String testStateMachineARN
@Shared Object endPoint

def setupSpec() {
localStack = new GenericContainer(DockerImageName.parse("localstack/localstack"))
.withExposedPorts(4566)
.withEnv("SERVICES", "stepfunctions")
.withReuse(true)
.withStartupTimeout(Duration.ofSeconds(120))
localStack.start()
endPoint = "http://" + localStack.getHost() + ":" + localStack.getMappedPort(4566)
sfnClient = SfnClient.builder()
.endpointOverride(URI.create(endPoint))
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
.build()

def response = sfnClient.createStateMachine { builder ->
builder.name("testStateMachine")
.definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}")
.build()
}
testStateMachineARN = response.stateMachineArn()
}

def cleanupSpec() {
sfnClient.close()
localStack.stop()
}

def "Step Functions span is created"() {
when:
StartExecutionResponse response
TraceUtils.runUnderTrace('parent', {
response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}
})

then:
assertTraces(1) {
trace(2) {
basicSpan(it, "parent")
span {
serviceName service()
operationName operation()
resourceName "Sfn.StartExecution"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" endPoint+'/'
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" localStack.getMappedPort(4566)
"$Tags.PEER_HOSTNAME" localStack.getHost()
"aws.service" "Sfn"
"aws.operation" "StartExecution"
"aws.agent" "java-aws-sdk"
"aws.requestId" response.responseMetadata().requestId()
"aws_service" "Sfn"
defaultTags()
}
}
}
}
}

def "Trace context is injected to Step Functions input"() {
when:
StartExecutionResponse response
TraceUtils.runUnderTrace('parent', {
response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}
})

then:
def execution = sfnClient.describeExecution { builder ->
builder.executionArn(response.executionArn())
.build()
}
def input = new JsonSlurper().parseText(execution.input())
input["key"] == "value"
input["_datadog"]["x-datadog-trace-id"] != null
input["_datadog"]["x-datadog-parent-id"] != null
input["_datadog"]["x-datadog-tags"] != null
}
}

class SfnClientV0Test extends SfnClientTest {
@Override
int version() {
0
}

@Override
String service() {
return "java-aws-sdk"
}

@Override
String operation() {
return "aws.http"
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ include ':dd-java-agent:instrumentation:aws-java-sns-1.0'
include ':dd-java-agent:instrumentation:aws-java-sns-2.0'
include ':dd-java-agent:instrumentation:aws-java-sqs-1.0'
include ':dd-java-agent:instrumentation:aws-java-sqs-2.0'
include ':dd-java-agent:instrumentation:aws-java-sfn-2.0'
include ':dd-java-agent:instrumentation:aws-lambda-handler'
include ':dd-java-agent:instrumentation:axis-2'
include ':dd-java-agent:instrumentation:axway-api'
Expand Down
Loading