Skip to content

Commit

Permalink
feat: Add Ack/Nack/ModAck RPC spans to the subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelpri10 committed Jul 2, 2024
1 parent 0cdfd2f commit 496ef8a
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,10 @@ void processOutstandingOperations() {
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List<AckRequestData> ackRequestData;
private boolean isReceiptModack;

ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
Expand All @@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
return ackRequestData;
}

public boolean getIsReceiptModack() {
return isReceiptModack;
}

public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}

public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
this.isReceiptModack = isReceiptModack;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub.v1;

import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
Expand All @@ -28,7 +29,9 @@

public class OpenTelemetryUtil {
private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
private static final String PROJECT_ATTR_KEY = "gcp_pubsub.project_id";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";

private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";

Expand All @@ -50,7 +53,7 @@ public static final AttributesBuilder createCommonSpanAttributesBuilder(

/**
* Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional
* links with the publisher parent span are created for each message in the batch.
* links with the publisher parent span are created for sampled messages in the batch.
*/
public static final Span startPublishRpcSpan(
Tracer tracer,
Expand All @@ -71,11 +74,13 @@ public static final Span startPublishRpcSpan(
.startSpan();

for (PubsubMessageWrapper message : messages) {
Attributes linkAttributes =
Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
message.addPublishStartEvent();
if (message.getPublisherSpan().getSpanContext().isSampled()) {
Attributes linkAttributes =
Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
publishRpcSpan.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
message.addPublishStartEvent();
}
}
return publishRpcSpan;
}
Expand All @@ -102,4 +107,76 @@ public static final void setPublishRpcSpanException(
endPublishRpcSpan(publishRpcSpan, enableOpenTelemetryTracing);
}
}

/**
* Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links
* to parent subscribe span for sampled messages are added.
*/
public static final Span startSubscribeRpcSpan(
Tracer tracer,
SubscriptionName subscriptionName,
String rpcOperation,
List<PubsubMessageWrapper> messages,
int ackDeadline,
boolean isReceiptModack,
boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && tracer != null) {
String codeFunction =
rpcOperation == "ack"
? "StreamingSubscriberConnection.sendAckOperations"
: "StreamingSubscriberConnection.sendModAckOperations";
AttributesBuilder attributesBuilder =
createCommonSpanAttributesBuilder(
subscriptionName.getSubscription(),
subscriptionName.getProject(),
codeFunction,
rpcOperation)
.put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size());

// Ack deadline and receipt modack are specific to the modack operation
if (rpcOperation == "modack") {
attributesBuilder
.put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
.put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
}

Span rpcSpan =
tracer
.spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation)
.setSpanKind(SpanKind.CLIENT)
.setAllAttributes(attributesBuilder.build())
.startSpan();

for (PubsubMessageWrapper message : messages) {
if (message.getSubscriberSpan().getSpanContext().isSampled()) {
Attributes linkAttributes =
Attributes.builder()
.put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation)
.build();
rpcSpan.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes);
message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes);
switch (rpcOperation) {
case "ack":
message.addAckStartEvent();
break;
case "modack":
message.addModAckStartEvent();
break;
case "nack":
message.addNackStartEvent();
break;
}
}
}
return rpcSpan;
}
return null;
}

/** Ends the given subscribe RPC span if it exists. */
public static final void endSubscribeRpcSpan(Span rpcSpan, boolean enableOpenTelemetryTracing) {
if (enableOpenTelemetryTracing && rpcSpan != null) {
rpcSpan.end();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public class PubsubMessageWrapper {
"subscriber concurrency control";
private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
private String SUBSCRIBE_PROCESS_SPAN_NAME;
private static final String MODACK_START_EVENT = "modack start";
private static final String MODACK_END_EVENT = "modack end";
private static final String NACK_START_EVENT = "nack start";
private static final String NACK_END_EVENT = "nack end";
private static final String ACK_START_EVENT = "ack start";
private static final String ACK_END_EVENT = "ack start";

private static final String GOOGCLIENT_PREFIX = "googclient_";

Expand Down Expand Up @@ -116,11 +122,16 @@ public PubsubMessage getPubsubMessage() {
return message;
}

/** Returns the parent span for this message wrapper. */
/** Returns the parent publisher span for this message wrapper. */
public Span getPublisherSpan() {
return publisherSpan;
}

/** Returns the parent subscriber span for this message wrapper. */
public Span getSubscriberSpan() {
return subscriberSpan;
}

/** Returns the delivery attempt for the received PubsubMessage. */
public int getDeliveryAttempt() {
return deliveryAttempt;
Expand Down Expand Up @@ -302,6 +313,56 @@ public void startSubscribeProcessSpan(Tracer tracer) {
}
}

/**
* Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding
* RPC span start and end times.
*/
public void addModAckStartEvent() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.addEvent(MODACK_START_EVENT);
}
}

public void addModAckEndEvent() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.addEvent(MODACK_END_EVENT);
}
}

public void addNackStartEvent() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.addEvent(NACK_START_EVENT);
}
}

public void addNackEndEvent() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.addEvent(NACK_END_EVENT);
}
}

public void addAckStartEvent() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.addEvent(ACK_START_EVENT);
}
}

public void addAckEndEvent() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.addEvent(ACK_END_EVENT);
}
}

public void addEndRpcEvent(boolean isModack, int ackDeadline) {
if (!isModack) {
addAckEndEvent();
} else if (ackDeadline == 0) {
addNackEndEvent();
} else {
addModAckEndEvent();
}
}

/** Ends the subscriber parent span if exists. */
public void endSubscriberSpan() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
Expand Down Expand Up @@ -335,6 +396,15 @@ public void endSubscribeProcessSpan(String action) {
}
}

/** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */
public void setSubscriberSpanException(Throwable t, String exception) {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
subscriberSpan.setStatus(StatusCode.ERROR, exception);
subscriberSpan.recordException(t);
endAllSubscribeSpans();
}
}

/** Sets result of the parent subscriber span to expired and ends its. */
public void setSubscriberSpanExpirationResult() {
if (enableOpenTelemetryTracing && subscriberSpan != null) {
Expand Down
Loading

0 comments on commit 496ef8a

Please sign in to comment.