Skip to content

Commit

Permalink
feat: Refactor OpenTelemetry implementation to use a context aware wr…
Browse files Browse the repository at this point in the history
…apper for the tracer and a PubsubTracer interface
  • Loading branch information
michaelpri10 committed Sep 12, 2024
1 parent 6c5e03c commit d257b6b
Show file tree
Hide file tree
Showing 10 changed files with 799 additions and 632 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
*/
public PubsubMessageWrapper getMessageWrapper() {
if (this.messageWrapper == null) {
return PubsubMessageWrapper.newBuilder(null, null, false).build();
return PubsubMessageWrapper.newBuilder(null, null).build();
}
return messageWrapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -107,7 +106,7 @@ class MessageDispatcher {

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private final Tracer tracer;
private final PubsubTracer tracer;

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
Expand Down Expand Up @@ -162,7 +161,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
forget();
}

Expand All @@ -175,11 +174,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack");
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -409,11 +408,10 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
message.getMessage(),
subscriptionName,
message.getAckId(),
message.getDeliveryAttempt(),
enableOpenTelemetryTracing)
message.getDeliveryAttempt())
.build();
builder.setMessageWrapper(messageWrapper);
messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get());
tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
Expand Down Expand Up @@ -482,13 +480,14 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer);
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
try {
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
message.messageWrapper().endSubscribeConcurrencyControlSpan();
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException);
tracer.setSubscribeConcurrencyControlSpanException(
message.messageWrapper(), unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
addDeliveryInfoCount(message.messageWrapper());
Expand Down Expand Up @@ -533,10 +532,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
messageWrapper.setSubscriberSpanExpirationResult();
tracer.setSubscriberSpanExpirationResult(messageWrapper);
return;
}
messageWrapper.startSubscribeProcessSpan(tracer);
tracer.startSubscribeProcessSpan(messageWrapper);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -557,9 +556,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
messageWrapper.startSubscribeSchedulerSpan(tracer);
tracer.startSubscribeSchedulerSpan(messageWrapper);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
messageWrapper.endSubscribeSchedulerSpan();
tracer.endSubscribeSchedulerSpan(messageWrapper);
}
}

Expand Down Expand Up @@ -687,7 +686,7 @@ public static final class Builder {

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private Tracer tracer;
private PubsubTracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
Expand Down Expand Up @@ -769,7 +768,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing)
return this;
}

public Builder setTracer(Tracer tracer) {
public Builder setTracer(PubsubTracer tracer) {
this.tracer = tracer;
return this;
}
Expand Down
Loading

0 comments on commit d257b6b

Please sign in to comment.