Skip to content

Commit

Permalink
Opentelemetry subscribe (#2100)
Browse files Browse the repository at this point in the history
* feat: Add OpenTelemetry tracing to the SubscriberClient

* feat: Add link to publisher create span in the subscribe process span

* feat: Add Ack/Nack/ModAck RPC spans to the subscribe
  • Loading branch information
michaelpri10 authored Jul 2, 2024
1 parent e275efa commit 456ac83
Show file tree
Hide file tree
Showing 10 changed files with 646 additions and 76 deletions.
4 changes: 2 additions & 2 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.38.0</version>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>1.38.0</version>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional<SettableApiFuture<AckResponse>> messageFuture;
private PubsubMessageWrapper messageWrapper;

protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
this.messageWrapper = builder.messageWrapper;
}

public String getAckId() {
Expand All @@ -36,6 +38,10 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

public PubsubMessageWrapper getMessageWrapper() {
return messageWrapper;
}

public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
Expand Down Expand Up @@ -68,6 +74,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();
private PubsubMessageWrapper messageWrapper;

protected Builder(String ackId) {
this.ackId = ackId;
Expand All @@ -78,6 +85,11 @@ public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
return this;
}

public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
this.messageWrapper = messageWrapper;
return this;
}

public AckRequestData build() {
return new AckRequestData(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -104,6 +106,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private final Tracer tracer;

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
Expand Down Expand Up @@ -157,6 +163,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
forget();
}

Expand All @@ -169,9 +176,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -217,6 +226,10 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
tracer = builder.tracer;
}

private boolean shouldSetMessageFuture() {
Expand Down Expand Up @@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}

private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}

public PubsubMessageWrapper messageWrapper() {
return this.ackHandler.ackRequestData.getMessageWrapper();
}
}

private static class ReceiptCompleteData {
Expand Down Expand Up @@ -390,10 +405,21 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
SubscriptionName.parse(subscriptionName),
message.getAckId(),
message.getDeliveryAttempt(),
enableOpenTelemetryTracing)
.build();
builder.setMessageWrapper(messageWrapper);
messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);

if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
Expand Down Expand Up @@ -457,30 +483,39 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer);
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
message.messageWrapper().endSubscribeConcurrencyControlSpan();
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
addDeliveryInfoCount(message.messageWrapper());
processOutstandingMessage(message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
PubsubMessage originalMessage = receivedMessage.getMessage();
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
return PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
messageWrapper.setPubsubMessage(
PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build());
}
return originalMessage;
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
private void processOutstandingMessage(final AckHandler ackHandler) {
// Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
// AckHandler object.
PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
PubsubMessage message = messageWrapper.getPubsubMessage();

// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
Expand All @@ -499,8 +534,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
messageWrapper.setSubscriberSpanExpirationResult();
return;
}
messageWrapper.startSubscribeProcessSpan(tracer);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -521,7 +558,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
messageWrapper.startSubscribeSchedulerSpan(tracer);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
messageWrapper.endSubscribeSchedulerSpan();
}
}

Expand Down Expand Up @@ -607,8 +646,10 @@ void processOutstandingOperations() {
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

Expand Down Expand Up @@ -645,6 +686,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private Tracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
Expand Down Expand Up @@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}

public Builder setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
return this;
}

public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
}

public Builder setTracer(Tracer tracer) {
this.tracer = tracer;
return this;
}

public MessageDispatcher build() {
return new MessageDispatcher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List<AckRequestData> ackRequestData;
private boolean isReceiptModack;

ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
Expand All @@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
return ackRequestData;
}

public boolean getIsReceiptModack() {
return isReceiptModack;
}

public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}

public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
this.isReceiptModack = isReceiptModack;
return this;
}
}
Loading

0 comments on commit 456ac83

Please sign in to comment.