-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inject trace context into AWS Step Functions input #7585
base: master
Are you sure you want to change the base?
Changes from all commits
7f8721d
a974597
7ecfa8c
fa36832
22c652b
eb642b0
0778f3d
f9d0eea
421856b
895341a
9d76dec
aba1119
e56f2d3
3039780
370df6d
a644a8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
muzzle { | ||
pass { | ||
group = "software.amazon.awssdk" | ||
module = "sfn" | ||
versions = "[2.15.35,)" | ||
assertInverse = true | ||
} | ||
} | ||
|
||
apply from: "$rootDir/gradle/java.gradle" | ||
|
||
addTestSuiteForDir('latestDepTest', 'test') | ||
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test') | ||
|
||
dependencies { | ||
compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35' | ||
|
||
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk. | ||
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') | ||
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2') | ||
testImplementation 'software.amazon.awssdk:sfn:2.15.35' | ||
testImplementation libs.testcontainers | ||
|
||
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+' | ||
} | ||
|
||
tasks.withType(Test).configureEach { | ||
usesService(testcontainersLimit) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
||
import datadog.trace.bootstrap.JsonBuffer; | ||
import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
|
||
public class InputAttributeInjector { | ||
public static String buildTraceContext(AgentSpan span) { | ||
// Extract span tags | ||
JsonBuffer spanTagsJSON = new JsonBuffer(); | ||
spanTagsJSON.beginObject(); | ||
span.getTags() | ||
.forEach((tagKey, tagValue) -> spanTagsJSON.name(tagKey).value(tagValue.toString())); | ||
spanTagsJSON.endObject(); | ||
|
||
// Build DD trace context object | ||
JsonBuffer ddTraceContextJSON = new JsonBuffer(); | ||
ddTraceContextJSON | ||
.beginObject() | ||
.name("_datadog") | ||
.beginObject() | ||
.name("x-datadog-trace-id") | ||
.value(span.getTraceId().toString()) | ||
.name("x-datadog-parent-id") | ||
.value(String.valueOf(span.getSpanId())) | ||
.name("x-datadog-tags") | ||
.value(spanTagsJSON) | ||
.endObject() | ||
.endObject(); | ||
|
||
return ddTraceContextJSON.toString(); | ||
} | ||
|
||
public static String getModifiedInput(String request, String ddTraceContextJSON) { | ||
StringBuilder modifiedInput = new StringBuilder(request); | ||
int startPos = modifiedInput.indexOf("{"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm StringBuilder.indexOf() does not take a char type |
||
int endPos = modifiedInput.lastIndexOf("}"); | ||
String inputContent = modifiedInput.substring(startPos + 1, endPos); | ||
if (inputContent.isEmpty()) { | ||
modifiedInput.insert(endPos, ddTraceContextJSON); | ||
} else { | ||
modifiedInput.insert( | ||
endPos, ",".concat(ddTraceContextJSON)); // prepend comma to existing input | ||
} | ||
return modifiedInput.toString(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
||
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
|
||
import com.google.auto.service.AutoService; | ||
import datadog.trace.agent.tooling.Instrumenter; | ||
import datadog.trace.agent.tooling.InstrumenterModule; | ||
import java.util.List; | ||
import net.bytebuddy.asm.Advice; | ||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; | ||
|
||
/** AWS SDK v2 Step Function instrumentation */ | ||
@AutoService(InstrumenterModule.class) | ||
public final class SfnClientInstrumentation extends InstrumenterModule.Tracing | ||
implements Instrumenter.ForSingleType { | ||
|
||
public SfnClientInstrumentation() { | ||
super("sfn", "aws-sdk"); | ||
} | ||
|
||
@Override | ||
public String instrumentedType() { | ||
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"; | ||
} | ||
|
||
@Override | ||
public void methodAdvice(MethodTransformer transformer) { | ||
transformer.applyAdvice( | ||
isMethod().and(named("resolveExecutionInterceptors")), | ||
SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice"); | ||
} | ||
|
||
@Override | ||
public String[] helperClassNames() { | ||
return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"}; | ||
} | ||
|
||
public static class AwsSfnBuilderAdvice { | ||
@Advice.OnMethodExit(suppress = Throwable.class) | ||
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) { | ||
for (ExecutionInterceptor interceptor : interceptors) { | ||
if (interceptor instanceof SfnInterceptor) { | ||
return; | ||
} | ||
} | ||
interceptors.add(new SfnInterceptor()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
||
import datadog.trace.bootstrap.InstanceStore; | ||
import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
import software.amazon.awssdk.core.SdkRequest; | ||
import software.amazon.awssdk.core.interceptor.Context; | ||
import software.amazon.awssdk.core.interceptor.ExecutionAttribute; | ||
import software.amazon.awssdk.core.interceptor.ExecutionAttributes; | ||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; | ||
import software.amazon.awssdk.services.sfn.model.StartExecutionRequest; | ||
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest; | ||
|
||
public class SfnInterceptor implements ExecutionInterceptor { | ||
|
||
public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE = | ||
InstanceStore.of(ExecutionAttribute.class) | ||
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan")); | ||
|
||
public SfnInterceptor() {} | ||
|
||
@Override | ||
public SdkRequest modifyRequest( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably guard against any exceptions escaping from our listeners. |
||
Context.ModifyRequest context, ExecutionAttributes executionAttributes) { | ||
try { | ||
return modifyRequestImpl(context, executionAttributes); | ||
} catch (Exception e) { | ||
return context.request(); | ||
} | ||
} | ||
|
||
public SdkRequest modifyRequestImpl( | ||
Context.ModifyRequest context, ExecutionAttributes executionAttributes) { | ||
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); | ||
// StartExecutionRequest | ||
if (context.request() instanceof StartExecutionRequest) { | ||
StartExecutionRequest request = (StartExecutionRequest) context.request(); | ||
if (request.input() == null) { | ||
return request; | ||
} | ||
return injectTraceContext(span, request); | ||
} | ||
|
||
// StartSyncExecutionRequest | ||
if (context.request() instanceof StartSyncExecutionRequest) { | ||
StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request(); | ||
if (request.input() == null) { | ||
return request; | ||
} | ||
return injectTraceContext(span, request); | ||
} | ||
|
||
return context.request(); | ||
} | ||
|
||
private SdkRequest injectTraceContext(AgentSpan span, StartExecutionRequest request) { | ||
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span); | ||
// Inject the trace context into the StartExecutionRequest input | ||
String modifiedInput = | ||
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON); | ||
|
||
return request.toBuilder().input(modifiedInput).build(); | ||
} | ||
|
||
private SdkRequest injectTraceContext(AgentSpan span, StartSyncExecutionRequest request) { | ||
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span); | ||
// Inject the trace context into the StartSyncExecutionRequest input | ||
String modifiedInput = | ||
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON); | ||
|
||
return request.toBuilder().input(modifiedInput).build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import datadog.trace.agent.test.naming.VersionedNamingTestBase | ||
import datadog.trace.agent.test.utils.TraceUtils | ||
import datadog.trace.api.DDSpanTypes | ||
import datadog.trace.bootstrap.instrumentation.api.Tags | ||
import groovy.json.JsonSlurper | ||
import org.testcontainers.containers.GenericContainer | ||
import org.testcontainers.utility.DockerImageName | ||
import software.amazon.awssdk.services.sfn.SfnClient | ||
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse | ||
import software.amazon.awssdk.regions.Region | ||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider | ||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials | ||
import spock.lang.Shared | ||
|
||
import java.time.Duration | ||
|
||
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan | ||
|
||
|
||
abstract class SfnClientTest extends VersionedNamingTestBase { | ||
@Shared GenericContainer localStack | ||
@Shared SfnClient sfnClient | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@Shared String testStateMachineARN | ||
@Shared Object endPoint | ||
|
||
def setupSpec() { | ||
localStack = new GenericContainer(DockerImageName.parse("localstack/localstack")) | ||
.withExposedPorts(4566) | ||
.withEnv("SERVICES", "stepfunctions") | ||
.withReuse(true) | ||
.withStartupTimeout(Duration.ofSeconds(120)) | ||
localStack.start() | ||
endPoint = "http://" + localStack.getHost() + ":" + localStack.getMappedPort(4566) | ||
sfnClient = SfnClient.builder() | ||
.endpointOverride(URI.create(endPoint)) | ||
.region(Region.US_EAST_1) | ||
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) | ||
.build() | ||
|
||
def response = sfnClient.createStateMachine { builder -> | ||
builder.name("testStateMachine") | ||
.definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}") | ||
.build() | ||
} | ||
testStateMachineARN = response.stateMachineArn() | ||
} | ||
|
||
def cleanupSpec() { | ||
sfnClient.close() | ||
localStack.stop() | ||
} | ||
|
||
def "Step Functions span is created"() { | ||
when: | ||
StartExecutionResponse response | ||
TraceUtils.runUnderTrace('parent', { | ||
response = sfnClient.startExecution { builder -> | ||
builder.stateMachineArn(testStateMachineARN) | ||
.input("{\"key\": \"value\"}") | ||
.build() | ||
} | ||
}) | ||
|
||
then: | ||
assertTraces(1) { | ||
trace(2) { | ||
basicSpan(it, "parent") | ||
span { | ||
serviceName service() | ||
operationName operation() | ||
resourceName "Sfn.StartExecution" | ||
spanType DDSpanTypes.HTTP_CLIENT | ||
errored false | ||
measured true | ||
childOf(span(0)) | ||
tags { | ||
"$Tags.COMPONENT" "java-aws-sdk" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT | ||
"$Tags.HTTP_URL" endPoint+'/' | ||
"$Tags.HTTP_METHOD" "POST" | ||
"$Tags.HTTP_STATUS" 200 | ||
"$Tags.PEER_PORT" localStack.getMappedPort(4566) | ||
"$Tags.PEER_HOSTNAME" localStack.getHost() | ||
"aws.service" "Sfn" | ||
"aws.operation" "StartExecution" | ||
"aws.agent" "java-aws-sdk" | ||
"aws.requestId" response.responseMetadata().requestId() | ||
"aws_service" "Sfn" | ||
defaultTags() | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
def "Trace context is injected to Step Functions input"() { | ||
when: | ||
StartExecutionResponse response | ||
TraceUtils.runUnderTrace('parent', { | ||
response = sfnClient.startExecution { builder -> | ||
builder.stateMachineArn(testStateMachineARN) | ||
.input("{\"key\": \"value\"}") | ||
.build() | ||
} | ||
}) | ||
|
||
then: | ||
def execution = sfnClient.describeExecution { builder -> | ||
builder.executionArn(response.executionArn()) | ||
.build() | ||
} | ||
def input = new JsonSlurper().parseText(execution.input()) | ||
input["key"] == "value" | ||
input["_datadog"]["x-datadog-trace-id"] != null | ||
input["_datadog"]["x-datadog-parent-id"] != null | ||
input["_datadog"]["x-datadog-tags"] != null | ||
} | ||
} | ||
|
||
class SfnClientV0Test extends SfnClientTest { | ||
@Override | ||
int version() { | ||
0 | ||
} | ||
|
||
@Override | ||
String service() { | ||
return "java-aws-sdk" | ||
} | ||
|
||
@Override | ||
String operation() { | ||
return "aws.http" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this JSON format for the
x-datadog-tags
expected? Not urgent but need to know at some point if we need to handle this format or not.The value from this impl looks like this
{"component":"java-aws-sdk","thread":{"name":"main","id":"1"},"env":"dev","span":{"kind":"client"}}
Usually we expect something like
_dd.p.dm=-0,_dd.p.tid=658b547000000000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does seem like this was written as a json encoding of the tags from the start of this pr. and reading the other associated PR's, it's not entirely clear if they expect the tags to be json-encoded or comma-separated-key=values ... will dig further.