Skip to content

Commit

Permalink
feat: Add OpenTelemetry tracing to the Publisher and Subscriber (#2086)
Browse files Browse the repository at this point in the history
* feat: Initial publish side Open Telemetry support

* feat: Publish-side trace context injection

* feat: Tests and improvements to publish side OTel tracing

* feat: More tests and refactoring for publish-side OpenTelemetry

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: Formatting files

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: Publisher test changes

* test: Fix OpenTelemetry test

* Feat: Use OpenTelemetry semconv

* test: Fix some dependency issues

* feat: Test fix

* feat: Add comment for setter in builder

* Opentelemetry subscribe (#2100)

* feat: Add OpenTelemetry tracing to the SubscriberClient

* feat: Add link to publisher create span in the subscribe process span

* feat: Add Ack/Nack/ModAck RPC spans to the subscribe

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Opentelemetry subscribe (#2101)

* feat: Add OpenTelemetry tracing to the SubscriberClient

* feat: Add link to publisher create span in the subscribe process span

* feat: Add Ack/Nack/ModAck RPC spans to the subscribe

* fix: Fix test errors caused by otel changes

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: Fix build errors in Publisher

* test: Ignore org.assertj:assertj-core which is required for OTel testing assertions

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* test: Add tests for subscriber OTel functions

* feat: Changes to OpenTelemetry implementation to add links earlier and prevent methods from being exposed to users

* feat: Refactor OpenTelemetry implementation to use a context aware wrapper for the tracer and a  PubsubTracer interface

* feat: Initialize default no-op PubsubTracer in Publisher and Subscriber

* feat: Ensure SubscriberStreamingConnection and MessageDispatcher have default no-op tracers by default for tests

* samples: Add OpenTelemetry publisher and subscriber samples

* feat: Add additional sampling checks to the Otel implementation

* samples: Update pom.xml for samples with Cloud Trace exporter

* feat: Make OTel classes/methods package-private and remove non-generic PubsubTracer interface

* feat: Lint fixes for Pub/Sub

* feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names

* feat: Format OTel changes

* Revert "feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names"

This reverts commit 305610e.

* feat: trigger build

* chore: generate libraries at Mon Sep 30 20:37:03 UTC 2024

* feat: trigger build

* feat: Fix file overwrite from bad merge

* chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024

* Revert "chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024"

This reverts commit 5ebbbf9.

* chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024

* Revert "chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024"

This reverts commit 23f3a70.

* chore: generate libraries at Mon Sep 30 21:14:11 UTC 2024

* feat: Prevent new files for OpenTelemetry from being overwritten

* feat: Revert automated file deletion for OpenTelemetry changes

* feat: Remove OpenTelemetry samples as the samples use a released library version to run

* chore: generate libraries at Mon Sep 30 22:11:14 UTC 2024

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: cloud-java-bot <[email protected]>
  • Loading branch information
3 people authored Oct 1, 2024
1 parent c126927 commit db522b6
Show file tree
Hide file tree
Showing 16 changed files with 2,008 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .github/.OwlBot-hermetic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ deep-preserve-regex:
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StatusUtilTest.java"
Expand All @@ -51,8 +52,10 @@ deep-preserve-regex:
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PublisherInterface.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java"
Expand Down
28 changes: 28 additions & 0 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@
<artifactId>google-http-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -142,6 +154,21 @@
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down Expand Up @@ -174,6 +201,7 @@
<ignoredUnusedDeclaredDependency>com.google.auth:google-auth-library-oauth2-http:jar</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opencensus:opencensus-impl</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.assertj:assertj-core</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional<SettableApiFuture<AckResponse>> messageFuture;
private PubsubMessageWrapper messageWrapper;

protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
this.messageWrapper = builder.messageWrapper;
}

public String getAckId() {
Expand All @@ -36,6 +38,17 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

/**
* Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods
* that use this method to be unit tested.
*/
public PubsubMessageWrapper getMessageWrapper() {
if (this.messageWrapper == null) {
return PubsubMessageWrapper.newBuilder(null, null).build();
}
return messageWrapper;
}

public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
Expand Down Expand Up @@ -68,6 +81,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();
private PubsubMessageWrapper messageWrapper;

protected Builder(String ackId) {
this.ackId = ackId;
Expand All @@ -78,6 +92,11 @@ public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
return this;
}

public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
this.messageWrapper = messageWrapper;
return this;
}

public AckRequestData build() {
return new AckRequestData(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
Expand Down Expand Up @@ -157,6 +161,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
forget();
}

Expand All @@ -169,9 +174,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -217,6 +224,12 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
if (builder.tracer != null) {
tracer = builder.tracer;
}
}

private boolean shouldSetMessageFuture() {
Expand Down Expand Up @@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}

private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}

public PubsubMessageWrapper messageWrapper() {
return this.ackHandler.ackRequestData.getMessageWrapper();
}
}

private static class ReceiptCompleteData {
Expand Down Expand Up @@ -390,10 +405,20 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
subscriptionName,
message.getAckId(),
message.getDeliveryAttempt())
.build();
builder.setMessageWrapper(messageWrapper);
tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);

if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
Expand Down Expand Up @@ -457,30 +482,40 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
tracer.setSubscribeConcurrencyControlSpanException(
message.messageWrapper(), unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
addDeliveryInfoCount(message.messageWrapper());
processOutstandingMessage(message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
PubsubMessage originalMessage = receivedMessage.getMessage();
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
return PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
messageWrapper.setPubsubMessage(
PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build());
}
return originalMessage;
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
private void processOutstandingMessage(final AckHandler ackHandler) {
// Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
// AckHandler object.
PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
PubsubMessage message = messageWrapper.getPubsubMessage();

// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
Expand All @@ -499,8 +534,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
tracer.setSubscriberSpanExpirationResult(messageWrapper);
return;
}
tracer.startSubscribeProcessSpan(messageWrapper);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -521,7 +558,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
tracer.startSubscribeSchedulerSpan(messageWrapper);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
tracer.endSubscribeSchedulerSpan(messageWrapper);
}
}

Expand Down Expand Up @@ -607,8 +646,10 @@ void processOutstandingOperations() {
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

Expand Down Expand Up @@ -645,6 +686,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
Expand Down Expand Up @@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}

public Builder setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
return this;
}

public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
}

public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
this.tracer = tracer;
return this;
}

public MessageDispatcher build() {
return new MessageDispatcher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List<AckRequestData> ackRequestData;
private boolean isReceiptModack;

ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
Expand All @@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
return ackRequestData;
}

public boolean getIsReceiptModack() {
return isReceiptModack;
}

public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}

public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
this.isReceiptModack = isReceiptModack;
return this;
}
}
Loading

0 comments on commit db522b6

Please sign in to comment.