Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a library for gRPC trace propagation #690

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-690.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: feature
feature:
description: Added a library to propagate traces through gRPC calls.
links:
- https://github.com/palantir/tracing-java/pull/690
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- **com.palantir.tracing:tracing** - The key `Tracer` class, which stores trace information in a ThreadLocal. Also includes classes for convenient integration with SLF4J and executor services.
- **com.palantir.tracing:tracing-api** - constants and pure data objects
- **com.palantir.tracing:tracing-grpc** - `TracingClientInterceptor`, which adds appropriate headers to outgoing gRPC requests and `TracingServerInterceptor`, which reads headers from incoming gRPC requests.
- **com.palantir.tracing:tracing-jaxrs** - utilities to wrap `StreamingOutput` responses with a new trace.
- **com.palantir.tracing:tracing-okhttp3** - `OkhttpTraceInterceptor`, which adds the appropriate headers to outgoing requests.
- **com.palantir.tracing:tracing-jersey** - `TraceEnrichingFilter`, a jaxrs filter which reads headers from incoming requests and writes headers to outgoing responses. A traceId is stored in the jaxrs request context under the key `com.palantir.tracing.traceId`.
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ include 'tracing'
include 'tracing-api'
include 'tracing-benchmarks'
include 'tracing-demos'
include 'tracing-grpc'
include 'tracing-jaxrs'
include 'tracing-jersey'
include 'tracing-okhttp3'
Expand Down
42 changes: 42 additions & 0 deletions tracing-grpc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply from: "${rootDir}/gradle/publish-jar.gradle"
apply plugin: 'com.palantir.revapi'

dependencies {
api project(':tracing')
api('io.grpc:grpc-api') {
// gRPC pulls in the android version of guava, which we don't want
exclude group: 'com.google.guava', module: 'guava'
}

implementation project(':tracing-api')

testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.mockito:mockito-core'

testImplementation('io.grpc:grpc-testing') {
exclude group: 'com.google.guava', module: 'guava'
}
testImplementation('io.grpc:grpc-testing-proto') {
exclude group: 'com.google.guava', module: 'guava'
}

compileOnly "org.immutables:value::annotations"
testCompileOnly "org.immutables:value::annotations"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.tracing.grpc;

import com.palantir.tracing.api.TraceHttpHeaders;
import io.grpc.Metadata;

/**
* Internal utility class used to deduplicate logic between the client and server interceptors.
*
* Intentionally package-private.
*/
final class GrpcTracing {
static final Metadata.Key<String> TRACE_ID =
Metadata.Key.of(TraceHttpHeaders.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> SPAN_ID =
Metadata.Key.of(TraceHttpHeaders.SPAN_ID, Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> IS_SAMPLED =
Metadata.Key.of(TraceHttpHeaders.IS_SAMPLED, Metadata.ASCII_STRING_MARSHALLER);

private GrpcTracing() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.tracing.grpc;

import com.palantir.tracing.CloseableSpan;
import com.palantir.tracing.DetachedSpan;
import com.palantir.tracing.Observability;
import com.palantir.tracing.TraceMetadata;
import com.palantir.tracing.Tracer;
import com.palantir.tracing.Tracers;
import com.palantir.tracing.api.SpanType;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.Optional;

/**
* A {@link ClientInterceptor} which propagates Zipkin trace information through gRPC calls.
*/
public final class TracingClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new TracingClientCall<>(
next.newCall(method, callOptions), method, Tracer.maybeGetTraceMetadata(), Tracer.isTraceObservable());
}

private static final class TracingClientCall<ReqT, RespT>
extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final Optional<TraceMetadata> metadata;
private final boolean isTraceObservable;

TracingClientCall(
ClientCall<ReqT, RespT> delegate,
MethodDescriptor<ReqT, RespT> method,
Optional<TraceMetadata> metadata,
boolean isTraceObservable) {
super(delegate);
this.method = method;
this.metadata = metadata;
this.isTraceObservable = isTraceObservable;
}

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
DetachedSpan span = detachedSpan();

// the only way to get at the metadata of a detached span is to make an attached child :(
try (CloseableSpan propagationSpan = span.childSpan("grpc: start", SpanType.CLIENT_OUTGOING)) {
TraceMetadata propagationMetadata =
Tracer.maybeGetTraceMetadata().get();
headers.put(GrpcTracing.TRACE_ID, propagationMetadata.getTraceId());
headers.put(GrpcTracing.SPAN_ID, propagationMetadata.getSpanId());
headers.put(GrpcTracing.IS_SAMPLED, Tracer.isTraceObservable() ? "1" : "0");
}

super.start(new TracingClientCallListener<>(responseListener, span), headers);
}

private DetachedSpan detachedSpan() {
return DetachedSpan.start(
getObservability(),
metadata.isPresent() ? metadata.get().getTraceId() : Tracers.randomId(),
metadata.map(TraceMetadata::getSpanId),
method.getFullMethodName(),
SpanType.LOCAL);
}

private Observability getObservability() {
if (!metadata.isPresent()) {
return Observability.UNDECIDED;
} else if (isTraceObservable) {
return Observability.SAMPLE;
} else {
return Observability.DO_NOT_SAMPLE;
}
}
}

private static final class TracingClientCallListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
private final DetachedSpan span;

TracingClientCallListener(ClientCall.Listener<RespT> delegate, DetachedSpan span) {
super(delegate);
this.span = span;
}

@Override
public void onClose(Status status, Metadata trailers) {
span.complete();
super.onClose(status, trailers);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.tracing.grpc;

import com.google.common.base.Strings;
import com.palantir.tracing.CloseableSpan;
import com.palantir.tracing.DetachedSpan;
import com.palantir.tracing.Observability;
import com.palantir.tracing.Tracers;
import com.palantir.tracing.api.SpanType;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
* A {@link ServerInterceptor} which extracts Zipkin trace data propagated from the gRPC client and wraps the
* execution of the request in spans.
*
* <p>Depending on the style of gRPC request, the actual server handler code will run in the
* {@link #interceptCall(ServerCall, Metadata, ServerCallHandler)} method of the {@link TracingServerInterceptor}, or
* in the {@link TracingServerCallListener#onHalfClose()} method of the {@link TracingServerCallListener}. Certain
* user callbacks can also be invoked in other {@link TracingServerCallListener} methods, so they are all spanned.
*
* <p>The request is considered completed when the {@link ServerCall#close(Status, Metadata)} method is invoked.
* Since the ordering of a close call and certain terminal {@link ServerCall.Listener} callbacks are not specified,
* there's some extra logic to try to avoid creating child spans after the root span has already been completed.
*/
public final class TracingServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String maybeTraceId = headers.get(GrpcTracing.TRACE_ID);
boolean newTraceId = maybeTraceId == null;
String traceId = newTraceId ? Tracers.randomId() : maybeTraceId;

DetachedSpan span = detachedSpan(newTraceId, traceId, call, headers);
AtomicReference<DetachedSpan> spanRef = new AtomicReference<>(span);

try (CloseableSpan guard = span.childSpan("grpc: interceptCall")) {
return new TracingServerCallListener<>(
next.startCall(new TracingServerCall<>(call, spanRef), headers), spanRef);
}
}

private DetachedSpan detachedSpan(boolean newTrace, String traceId, ServerCall<?, ?> call, Metadata headers) {
return DetachedSpan.start(
getObservabilityFromHeader(headers),
traceId,
newTrace ? Optional.empty() : Optional.ofNullable(headers.get(GrpcTracing.SPAN_ID)),
call.getMethodDescriptor().getFullMethodName(),
SpanType.SERVER_INCOMING);
}

private Observability getObservabilityFromHeader(Metadata headers) {
String header = headers.get(GrpcTracing.IS_SAMPLED);
if (Strings.isNullOrEmpty(header)) {
return Observability.UNDECIDED;
} else {
return "1".equals(header) ? Observability.SAMPLE : Observability.DO_NOT_SAMPLE;
}
}

private static final class TracingServerCall<ReqT, RespT>
extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
private final AtomicReference<DetachedSpan> span;

TracingServerCall(ServerCall<ReqT, RespT> delegate, AtomicReference<DetachedSpan> span) {
super(delegate);
this.span = span;
}

@Override
public void close(Status status, Metadata trailers) {
DetachedSpan maybeSpan = span.getAndSet(null);
if (maybeSpan != null) {
maybeSpan.complete();
}
super.close(status, trailers);
}
}

private static final class TracingServerCallListener<ReqT>
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
private final AtomicReference<DetachedSpan> span;

TracingServerCallListener(Listener<ReqT> delegate, AtomicReference<DetachedSpan> span) {
super(delegate);
this.span = span;
}

@Override
public void onMessage(ReqT message) {
maybeSpanned("grpc: onMessage", () -> super.onMessage(message));
}

@Override
public void onHalfClose() {
maybeSpanned("grpc: onHalfClose", super::onHalfClose);
}

@Override
public void onCancel() {
maybeSpanned("grpc: onCancel", super::onCancel);
}

@Override
public void onComplete() {
maybeSpanned("grpc: onComplete", super::onComplete);
}

@Override
public void onReady() {
maybeSpanned("grpc: onReady", super::onReady);
}

/**
* Wraps a callback in a span if the root span has not already been closed. The gRPC glue can call listener
* methods after the ServerCall has already been closed in some cases, and we want to avoid log spam from the
* tracing internals warning about making a child off a completed span.
*/
private void maybeSpanned(String spanName, Runnable runnable) {
DetachedSpan maybeSpan = span.get();
if (maybeSpan == null) {
runnable.run();
} else {
try (CloseableSpan guard = maybeSpan.childSpan(spanName)) {
runnable.run();
}
}
}
}
}
Loading