diff --git a/changelog/@unreleased/pr-690.v2.yml b/changelog/@unreleased/pr-690.v2.yml new file mode 100644 index 000000000..b50fa26be --- /dev/null +++ b/changelog/@unreleased/pr-690.v2.yml @@ -0,0 +1,5 @@ +type: feature +feature: + description: Added a library to propagate traces through gRPC calls. + links: + - https://github.com/palantir/tracing-java/pull/690 diff --git a/readme.md b/readme.md index b803ff657..dc8a69d0e 100644 --- a/readme.md +++ b/readme.md @@ -8,6 +8,7 @@ - **com.palantir.tracing:tracing** - The key `Tracer` class, which stores trace information in a ThreadLocal. Also includes classes for convenient integration with SLF4J and executor services. - **com.palantir.tracing:tracing-api** - constants and pure data objects +- **com.palantir.tracing:tracing-grpc** - `TracingClientInterceptor`, which adds appropriate headers to outgoing gRPC requests and `TracingServerInterceptor`, which reads headers from incoming gRPC requests. - **com.palantir.tracing:tracing-jaxrs** - utilities to wrap `StreamingOutput` responses with a new trace. - **com.palantir.tracing:tracing-okhttp3** - `OkhttpTraceInterceptor`, which adds the appropriate headers to outgoing requests. - **com.palantir.tracing:tracing-jersey** - `TraceEnrichingFilter`, a jaxrs filter which reads headers from incoming requests and writes headers to outgoing responses. A traceId is stored in the jaxrs request context under the key `com.palantir.tracing.traceId`. diff --git a/settings.gradle b/settings.gradle index 8640fb1b2..329f44d92 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,6 +4,7 @@ include 'tracing' include 'tracing-api' include 'tracing-benchmarks' include 'tracing-demos' +include 'tracing-grpc' include 'tracing-jaxrs' include 'tracing-jersey' include 'tracing-okhttp3' diff --git a/tracing-grpc/build.gradle b/tracing-grpc/build.gradle new file mode 100644 index 000000000..4aa025e81 --- /dev/null +++ b/tracing-grpc/build.gradle @@ -0,0 +1,42 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply from: "${rootDir}/gradle/publish-jar.gradle" +apply plugin: 'com.palantir.revapi' + +dependencies { + api project(':tracing') + api('io.grpc:grpc-api') { + // gRPC pulls in the android version of guava, which we don't want + exclude group: 'com.google.guava', module: 'guava' + } + + implementation project(':tracing-api') + + testImplementation 'junit:junit' + testImplementation 'org.assertj:assertj-core' + testImplementation 'org.mockito:mockito-core' + + testImplementation('io.grpc:grpc-testing') { + exclude group: 'com.google.guava', module: 'guava' + } + testImplementation('io.grpc:grpc-testing-proto') { + exclude group: 'com.google.guava', module: 'guava' + } + + compileOnly "org.immutables:value::annotations" + testCompileOnly "org.immutables:value::annotations" +} diff --git a/tracing-grpc/src/main/java/com/palantir/tracing/grpc/GrpcTracing.java b/tracing-grpc/src/main/java/com/palantir/tracing/grpc/GrpcTracing.java new file mode 100644 index 000000000..7458a2cdc --- /dev/null +++ b/tracing-grpc/src/main/java/com/palantir/tracing/grpc/GrpcTracing.java @@ -0,0 +1,36 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tracing.grpc; + +import com.palantir.tracing.api.TraceHttpHeaders; +import io.grpc.Metadata; + +/** + * Internal utility class used to deduplicate logic between the client and server interceptors. + * + * Intentionally package-private. + */ +final class GrpcTracing { + static final Metadata.Key TRACE_ID = + Metadata.Key.of(TraceHttpHeaders.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER); + static final Metadata.Key SPAN_ID = + Metadata.Key.of(TraceHttpHeaders.SPAN_ID, Metadata.ASCII_STRING_MARSHALLER); + static final Metadata.Key IS_SAMPLED = + Metadata.Key.of(TraceHttpHeaders.IS_SAMPLED, Metadata.ASCII_STRING_MARSHALLER); + + private GrpcTracing() {} +} diff --git a/tracing-grpc/src/main/java/com/palantir/tracing/grpc/TracingClientInterceptor.java b/tracing-grpc/src/main/java/com/palantir/tracing/grpc/TracingClientInterceptor.java new file mode 100644 index 000000000..3b510e8d9 --- /dev/null +++ b/tracing-grpc/src/main/java/com/palantir/tracing/grpc/TracingClientInterceptor.java @@ -0,0 +1,116 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tracing.grpc; + +import com.palantir.tracing.CloseableSpan; +import com.palantir.tracing.DetachedSpan; +import com.palantir.tracing.Observability; +import com.palantir.tracing.TraceMetadata; +import com.palantir.tracing.Tracer; +import com.palantir.tracing.Tracers; +import com.palantir.tracing.api.SpanType; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import java.util.Optional; + +/** + * A {@link ClientInterceptor} which propagates Zipkin trace information through gRPC calls. + */ +public final class TracingClientInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new TracingClientCall<>( + next.newCall(method, callOptions), method, Tracer.maybeGetTraceMetadata(), Tracer.isTraceObservable()); + } + + private static final class TracingClientCall + extends ForwardingClientCall.SimpleForwardingClientCall { + private final MethodDescriptor method; + private final Optional metadata; + private final boolean isTraceObservable; + + TracingClientCall( + ClientCall delegate, + MethodDescriptor method, + Optional metadata, + boolean isTraceObservable) { + super(delegate); + this.method = method; + this.metadata = metadata; + this.isTraceObservable = isTraceObservable; + } + + @Override + public void start(Listener responseListener, Metadata headers) { + DetachedSpan span = detachedSpan(); + + // the only way to get at the metadata of a detached span is to make an attached child :( + try (CloseableSpan propagationSpan = span.childSpan("grpc: start", SpanType.CLIENT_OUTGOING)) { + TraceMetadata propagationMetadata = + Tracer.maybeGetTraceMetadata().get(); + headers.put(GrpcTracing.TRACE_ID, propagationMetadata.getTraceId()); + headers.put(GrpcTracing.SPAN_ID, propagationMetadata.getSpanId()); + headers.put(GrpcTracing.IS_SAMPLED, Tracer.isTraceObservable() ? "1" : "0"); + } + + super.start(new TracingClientCallListener<>(responseListener, span), headers); + } + + private DetachedSpan detachedSpan() { + return DetachedSpan.start( + getObservability(), + metadata.isPresent() ? metadata.get().getTraceId() : Tracers.randomId(), + metadata.map(TraceMetadata::getSpanId), + method.getFullMethodName(), + SpanType.LOCAL); + } + + private Observability getObservability() { + if (!metadata.isPresent()) { + return Observability.UNDECIDED; + } else if (isTraceObservable) { + return Observability.SAMPLE; + } else { + return Observability.DO_NOT_SAMPLE; + } + } + } + + private static final class TracingClientCallListener + extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + private final DetachedSpan span; + + TracingClientCallListener(ClientCall.Listener delegate, DetachedSpan span) { + super(delegate); + this.span = span; + } + + @Override + public void onClose(Status status, Metadata trailers) { + span.complete(); + super.onClose(status, trailers); + } + } +} diff --git a/tracing-grpc/src/main/java/com/palantir/tracing/grpc/TracingServerInterceptor.java b/tracing-grpc/src/main/java/com/palantir/tracing/grpc/TracingServerInterceptor.java new file mode 100644 index 000000000..a6400c1df --- /dev/null +++ b/tracing-grpc/src/main/java/com/palantir/tracing/grpc/TracingServerInterceptor.java @@ -0,0 +1,153 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tracing.grpc; + +import com.google.common.base.Strings; +import com.palantir.tracing.CloseableSpan; +import com.palantir.tracing.DetachedSpan; +import com.palantir.tracing.Observability; +import com.palantir.tracing.Tracers; +import com.palantir.tracing.api.SpanType; +import io.grpc.ForwardingServerCall; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link ServerInterceptor} which extracts Zipkin trace data propagated from the gRPC client and wraps the + * execution of the request in spans. + * + *

Depending on the style of gRPC request, the actual server handler code will run in the + * {@link #interceptCall(ServerCall, Metadata, ServerCallHandler)} method of the {@link TracingServerInterceptor}, or + * in the {@link TracingServerCallListener#onHalfClose()} method of the {@link TracingServerCallListener}. Certain + * user callbacks can also be invoked in other {@link TracingServerCallListener} methods, so they are all spanned. + * + *

The request is considered completed when the {@link ServerCall#close(Status, Metadata)} method is invoked. + * Since the ordering of a close call and certain terminal {@link ServerCall.Listener} callbacks are not specified, + * there's some extra logic to try to avoid creating child spans after the root span has already been completed. + */ +public final class TracingServerInterceptor implements ServerInterceptor { + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + String maybeTraceId = headers.get(GrpcTracing.TRACE_ID); + boolean newTraceId = maybeTraceId == null; + String traceId = newTraceId ? Tracers.randomId() : maybeTraceId; + + DetachedSpan span = detachedSpan(newTraceId, traceId, call, headers); + AtomicReference spanRef = new AtomicReference<>(span); + + try (CloseableSpan guard = span.childSpan("grpc: interceptCall")) { + return new TracingServerCallListener<>( + next.startCall(new TracingServerCall<>(call, spanRef), headers), spanRef); + } + } + + private DetachedSpan detachedSpan(boolean newTrace, String traceId, ServerCall call, Metadata headers) { + return DetachedSpan.start( + getObservabilityFromHeader(headers), + traceId, + newTrace ? Optional.empty() : Optional.ofNullable(headers.get(GrpcTracing.SPAN_ID)), + call.getMethodDescriptor().getFullMethodName(), + SpanType.SERVER_INCOMING); + } + + private Observability getObservabilityFromHeader(Metadata headers) { + String header = headers.get(GrpcTracing.IS_SAMPLED); + if (Strings.isNullOrEmpty(header)) { + return Observability.UNDECIDED; + } else { + return "1".equals(header) ? Observability.SAMPLE : Observability.DO_NOT_SAMPLE; + } + } + + private static final class TracingServerCall + extends ForwardingServerCall.SimpleForwardingServerCall { + private final AtomicReference span; + + TracingServerCall(ServerCall delegate, AtomicReference span) { + super(delegate); + this.span = span; + } + + @Override + public void close(Status status, Metadata trailers) { + DetachedSpan maybeSpan = span.getAndSet(null); + if (maybeSpan != null) { + maybeSpan.complete(); + } + super.close(status, trailers); + } + } + + private static final class TracingServerCallListener + extends ForwardingServerCallListener.SimpleForwardingServerCallListener { + private final AtomicReference span; + + TracingServerCallListener(Listener delegate, AtomicReference span) { + super(delegate); + this.span = span; + } + + @Override + public void onMessage(ReqT message) { + maybeSpanned("grpc: onMessage", () -> super.onMessage(message)); + } + + @Override + public void onHalfClose() { + maybeSpanned("grpc: onHalfClose", super::onHalfClose); + } + + @Override + public void onCancel() { + maybeSpanned("grpc: onCancel", super::onCancel); + } + + @Override + public void onComplete() { + maybeSpanned("grpc: onComplete", super::onComplete); + } + + @Override + public void onReady() { + maybeSpanned("grpc: onReady", super::onReady); + } + + /** + * Wraps a callback in a span if the root span has not already been closed. The gRPC glue can call listener + * methods after the ServerCall has already been closed in some cases, and we want to avoid log spam from the + * tracing internals warning about making a child off a completed span. + */ + private void maybeSpanned(String spanName, Runnable runnable) { + DetachedSpan maybeSpan = span.get(); + if (maybeSpan == null) { + runnable.run(); + } else { + try (CloseableSpan guard = maybeSpan.childSpan(spanName)) { + runnable.run(); + } + } + } + } +} diff --git a/tracing-grpc/src/test/java/com/palantir/tracing/grpc/TracingClientInterceptorTest.java b/tracing-grpc/src/test/java/com/palantir/tracing/grpc/TracingClientInterceptorTest.java new file mode 100644 index 000000000..c34c6e4ca --- /dev/null +++ b/tracing-grpc/src/test/java/com/palantir/tracing/grpc/TracingClientInterceptorTest.java @@ -0,0 +1,134 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tracing.grpc; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.palantir.tracing.Observability; +import com.palantir.tracing.TraceSampler; +import com.palantir.tracing.Tracer; +import com.palantir.tracing.api.SpanType; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleResponse; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TracingClientInterceptorTest { + private static final SimpleServiceGrpc.SimpleServiceImplBase SERVICE = + new SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc(SimpleRequest _request, StreamObserver responseObserver) { + responseObserver.onNext(SimpleResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + @Captor + private ArgumentCaptor metadataCaptor; + + @Mock + private TraceSampler traceSampler; + + private final ServerInterceptor mockServerInterceptor = + mock(ServerInterceptor.class, delegatesTo(new ServerInterceptor() { + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + return next.startCall(call, headers); + } + })); + + private SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub; + + @Before + public void before() throws Exception { + Tracer.setSampler(traceSampler); + + when(traceSampler.sample()).thenReturn(true); + + String serverName = InProcessServerBuilder.generateName(); + grpcCleanup.register(InProcessServerBuilder.forName(serverName) + .addService(ServerInterceptors.intercept(SERVICE, mockServerInterceptor)) + .build() + .start()); + ManagedChannel channel = + grpcCleanup.register(InProcessChannelBuilder.forName(serverName).build()); + blockingStub = SimpleServiceGrpc.newBlockingStub( + ClientInterceptors.intercept(channel, new TracingClientInterceptor())); + } + + @Test + public void traceIsPropagated_whenNoTraceIsPresent() { + Tracer.getAndClearTrace(); + blockingStub.unaryRpc(SimpleRequest.newBuilder().build()); + + verify(mockServerInterceptor).interceptCall(any(), metadataCaptor.capture(), any()); + verifyNoMoreInteractions(mockServerInterceptor); + + Metadata intercepted = metadataCaptor.getValue(); + assertThat(intercepted.getAll(GrpcTracing.TRACE_ID)).hasSize(1); + assertThat(intercepted.getAll(GrpcTracing.SPAN_ID)).hasSize(1); + assertThat(intercepted.getAll(GrpcTracing.IS_SAMPLED)).hasSize(1); + } + + @Test + public void traceIsPropagated_whenTraceIsPresent() { + Tracer.initTraceWithSpan(Observability.SAMPLE, "id", "operation", SpanType.LOCAL); + try { + blockingStub.unaryRpc(SimpleRequest.newBuilder().build()); + } finally { + Tracer.fastCompleteSpan(); + } + + verify(mockServerInterceptor).interceptCall(any(), metadataCaptor.capture(), any()); + verifyNoMoreInteractions(mockServerInterceptor); + + Metadata intercepted = metadataCaptor.getValue(); + assertThat(intercepted.getAll(GrpcTracing.TRACE_ID)).containsExactly("id"); + assertThat(intercepted.getAll(GrpcTracing.SPAN_ID)).hasSize(1); + assertThat(intercepted.getAll(GrpcTracing.IS_SAMPLED)).containsExactly("1"); + } +} diff --git a/tracing-grpc/src/test/java/com/palantir/tracing/grpc/TracingServerInterceptorTest.java b/tracing-grpc/src/test/java/com/palantir/tracing/grpc/TracingServerInterceptorTest.java new file mode 100644 index 000000000..577eb88b3 --- /dev/null +++ b/tracing-grpc/src/test/java/com/palantir/tracing/grpc/TracingServerInterceptorTest.java @@ -0,0 +1,208 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tracing.grpc; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.palantir.tracing.TraceSampler; +import com.palantir.tracing.Tracer; +import com.palantir.tracing.api.Span; +import com.palantir.tracing.api.SpanObserver; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ForwardingClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.testing.StreamRecorder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleResponse; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import java.util.List; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TracingServerInterceptorTest { + private static final SimpleServiceGrpc.SimpleServiceImplBase SERVICE = + new SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc(SimpleRequest _request, StreamObserver responseObserver) { + Tracer.fastStartSpan("handler"); + Tracer.fastCompleteSpan(); + responseObserver.onNext(SimpleResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public StreamObserver clientStreamingRpc( + StreamObserver responseObserver) { + Tracer.fastStartSpan("handler"); + Tracer.fastCompleteSpan(); + + return new StreamObserver() { + @Override + public void onNext(SimpleRequest _value) { + Tracer.fastStartSpan("observer"); + Tracer.fastCompleteSpan(); + } + + @Override + public void onError(Throwable _throwable) {} + + @Override + public void onCompleted() { + responseObserver.onNext(SimpleResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } + }; + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + @Mock + private TraceSampler traceSampler; + + @Mock + private SpanObserver spanObserver; + + @Captor + private ArgumentCaptor spanCaptor; + + private ManagedChannel channel; + + @Before + public void before() throws Exception { + Tracer.subscribe("", spanObserver); + Tracer.setSampler(traceSampler); + + String serverName = InProcessServerBuilder.generateName(); + grpcCleanup.register(InProcessServerBuilder.forName(serverName) + .addService(ServerInterceptors.intercept(SERVICE, new TracingServerInterceptor())) + .build() + .start()); + channel = + grpcCleanup.register(InProcessChannelBuilder.forName(serverName).build()); + + when(traceSampler.sample()).thenReturn(true); + } + + @After + public void after() { + Tracer.unsubscribe(""); + } + + @Test + public void whenNoTraceIsInHeader_generatesNewTrace() { + SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(channel); + + blockingStub.unaryRpc(SimpleRequest.newBuilder().build()); + + verify(traceSampler).sample(); + verifyNoMoreInteractions(traceSampler); + + verify(spanObserver, atLeastOnce()).consume(spanCaptor.capture()); + List spans = spanCaptor.getAllValues(); + assertThat(spans).map(Span::getTraceId).containsOnly(spans.get(0).getTraceId()); + } + + @Test + public void whenTraceIsInHeader_usesGivenTraceId() { + SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = + SimpleServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(ClientCall.Listener responseListener, Metadata headers) { + headers.put(GrpcTracing.TRACE_ID, "traceId"); + headers.put(GrpcTracing.SPAN_ID, "spanId"); + headers.put(GrpcTracing.IS_SAMPLED, "1"); + super.start(responseListener, headers); + } + }; + } + })); + + blockingStub.unaryRpc(SimpleRequest.newBuilder().build()); + + verifyNoInteractions(traceSampler); + + verify(spanObserver, atLeastOnce()).consume(spanCaptor.capture()); + List spans = spanCaptor.getAllValues(); + assertThat(spans).map(Span::getTraceId).containsOnly("traceId"); + assertThat(spans).map(Span::getParentSpanId).contains(Optional.of("spanId")); + } + + @Test + public void unaryCallHandlerIsSpanned() { + SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(channel); + + blockingStub.unaryRpc(SimpleRequest.newBuilder().build()); + + verify(traceSampler).sample(); + verifyNoMoreInteractions(traceSampler); + + verify(spanObserver, atLeastOnce()).consume(spanCaptor.capture()); + List spans = spanCaptor.getAllValues(); + assertThat(spans).map(Span::getOperation).contains("handler"); + } + + @Test + public void serverStreamObserverIsSpanned() throws Exception { + SimpleServiceGrpc.SimpleServiceStub stub = SimpleServiceGrpc.newStub(channel); + + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = stub.clientStreamingRpc(responseObserver); + requestObserver.onNext(SimpleRequest.newBuilder().build()); + requestObserver.onCompleted(); + responseObserver.awaitCompletion(); + + verify(traceSampler).sample(); + verifyNoMoreInteractions(traceSampler); + + verify(spanObserver, atLeastOnce()).consume(spanCaptor.capture()); + List spans = spanCaptor.getAllValues(); + assertThat(spans).map(Span::getOperation).contains("handler", "observer"); + assertThat(spans).map(Span::getTraceId).containsOnly(spans.get(0).getTraceId()); + } +} diff --git a/versions.lock b/versions.lock index 86e4b7aeb..a82dce44e 100644 --- a/versions.lock +++ b/versions.lock @@ -4,8 +4,8 @@ com.fasterxml.jackson.core:jackson-core:2.10.4 (16 constraints: fe3a68dc) com.fasterxml.jackson.core:jackson-databind:2.10.4 (22 constraints: 19998e7a) com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.4 (3 constraints: 4130b135) com.fasterxml.jackson.module:jackson-module-afterburner:2.10.4 (3 constraints: 4130b135) -com.google.code.findbugs:jsr305:3.0.2 (16 constraints: 50e8dc30) -com.google.errorprone:error_prone_annotations:2.3.4 (3 constraints: 1b2b3ae9) +com.google.code.findbugs:jsr305:3.0.2 (22 constraints: 011f6dfc) +com.google.errorprone:error_prone_annotations:2.4.0 (9 constraints: c361e74c) com.google.guava:failureaccess:1.0.1 (1 constraints: 140ae1b4) com.google.guava:guava:28.2-jre (6 constraints: 5c5ba522) com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava (1 constraints: bd17c918) @@ -14,6 +14,8 @@ com.palantir.safe-logging:preconditions:1.8.2 (2 constraints: 3c19cfb5) com.palantir.safe-logging:safe-logging:1.8.2 (3 constraints: 422aeb21) com.squareup.okhttp3:okhttp:3.9.0 (1 constraints: 0e051536) com.squareup.okio:okio:1.13.0 (1 constraints: 7f0cb509) +io.grpc:grpc-api:1.35.0 (5 constraints: b32bd923) +io.grpc:grpc-context:1.35.0 (2 constraints: 6311050a) io.undertow:undertow-core:2.0.17.Final (1 constraints: 54072861) jakarta.annotation:jakarta.annotation-api:1.3.5 (4 constraints: c93b8593) jakarta.servlet:jakarta.servlet-api:4.0.4 (9 constraints: e4866913) @@ -22,6 +24,7 @@ jakarta.ws.rs:jakarta.ws.rs-api:2.1.6 (11 constraints: 21bc0ae5) net.sf.jopt-simple:jopt-simple:4.6 (1 constraints: 610a91b7) org.apache.commons:commons-math3:3.2 (1 constraints: 5c0a8ab7) org.checkerframework:checker-qual:3.4.0 (2 constraints: 411a465e) +org.codehaus.mojo:animal-sniffer-annotations:1.19 (6 constraints: b3359899) org.glassfish.hk2:osgi-resource-locator:1.0.3 (2 constraints: 7d23b265) org.glassfish.hk2.external:jakarta.inject:2.6.1 (11 constraints: 6ab14a97) org.glassfish.jersey.core:jersey-client:2.31 (4 constraints: 78509cb2) @@ -51,7 +54,11 @@ com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.10.4 (2 constraints: 1 com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.4 (1 constraints: 76175d3d) com.fasterxml.jackson.module:jackson-module-parameter-names:2.10.4 (1 constraints: b10e595e) com.github.ben-manes.caffeine:caffeine:2.8.4 (4 constraints: 633c8aa1) +com.google.android:annotations:4.1.1.4 (1 constraints: 6e08d68b) +com.google.api.grpc:proto-google-common-protos:2.0.1 (1 constraints: d109cda9) com.google.code.findbugs:annotations:3.0.1 (1 constraints: 9e0aafc3) +com.google.code.gson:gson:2.8.6 (1 constraints: 16083e7b) +com.google.protobuf:protobuf-java:3.12.0 (1 constraints: 040adeb3) com.helger:profiler:1.1.1 (1 constraints: e21053b8) com.palantir.conjure.java.runtime:conjure-java-jackson-serialization:4.19.0 (1 constraints: 4005563b) com.palantir.safe-logging:preconditions-assertj:1.8.2 (1 constraints: 0d050a36) @@ -80,12 +87,20 @@ io.dropwizard.metrics:metrics-json:4.1.9 (1 constraints: ed106ab8) io.dropwizard.metrics:metrics-jvm:4.1.9 (2 constraints: 341ee147) io.dropwizard.metrics:metrics-logback:4.1.9 (1 constraints: 860ee64f) io.dropwizard.metrics:metrics-servlets:4.1.9 (1 constraints: 480d411f) +io.grpc:grpc-core:1.35.0 (1 constraints: 940908a7) +io.grpc:grpc-protobuf:1.35.0 (1 constraints: f50b14f5) +io.grpc:grpc-protobuf-lite:1.35.0 (1 constraints: 070ae3b3) +io.grpc:grpc-stub:1.35.0 (2 constraints: 8815e6f4) +io.grpc:grpc-testing:1.35.0 (1 constraints: 3b05403b) +io.grpc:grpc-testing-proto:1.35.0 (1 constraints: 3b05403b) +io.opencensus:opencensus-api:0.28.0 (1 constraints: 950907a7) +io.perfmark:perfmark-api:0.23.0 (1 constraints: 3b085f83) io.zipkin.java:zipkin:1.13.1 (1 constraints: 3805333b) jakarta.activation:jakarta.activation-api:1.2.2 (3 constraints: 4d37e9b5) jakarta.xml.bind:jakarta.xml.bind-api:2.3.3 (2 constraints: c6277400) javax.servlet:javax.servlet-api:3.1.0 (1 constraints: 830dcc28) joda-time:joda-time:2.10.6 (2 constraints: 622437b1) -junit:junit:4.13.2 (4 constraints: b052e308) +junit:junit:4.13.2 (5 constraints: e35b87b2) net.bytebuddy:byte-buddy:1.10.20 (1 constraints: 6e0ba2e9) net.bytebuddy:byte-buddy-agent:1.10.20 (1 constraints: 6e0ba2e9) net.jcip:jcip-annotations:1.0 (1 constraints: 560ff165) diff --git a/versions.props b/versions.props index 47e5517e0..2f8c403cc 100644 --- a/versions.props +++ b/versions.props @@ -8,6 +8,7 @@ com.palantir.safe-logging:* = 1.8.2 com.squareup.okhttp3:okhttp = 3.9.0 io.dropwizard:dropwizard-* = 2.0.10 io.undertow:undertow-core = 2.0.17.Final +io.grpc:* = 1.35.0 io.zipkin.java:* = 1.13.1 jakarta.servlet:jakarta.servlet-api = 4.0.4 jakarta.ws.rs:jakarta.ws.rs-api = 2.1.6