diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 3c2fe14cd..929b541e2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -61,6 +61,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; @@ -127,9 +128,7 @@ public class Publisher implements PublisherInterface { private final GrpcCallContext publishContext; private final GrpcCallContext publishContextWithCompression; - - private OpenTelemetry openTelemetry; - private Tracer openTelemetryTracer; + private Optional tracer = Optional.empty(); /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { @@ -214,10 +213,9 @@ private Publisher(Builder builder) throws IOException { this.publishContextWithCompression = GrpcCallContext.createDefault() .withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION)); - this.openTelemetry = builder.openTelemetry; - if (this.openTelemetry != null) { + if (builder.openTelemetry.isPresent()) { // Create a tracer if we have an instance of OpenTelemetry - this.openTelemetryTracer = this.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); + this.tracer = Optional.of(builder.openTelemetry.get().getTracer(OPEN_TELEMETRY_TRACER_NAME)); } } @@ -262,7 +260,6 @@ public String getTopicNameString() { @Override public ApiFuture publish(PubsubMessage message) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); - final String orderingKey = message.getOrderingKey(); Preconditions.checkState( orderingKey.isEmpty() || enableMessageOrdering, @@ -270,17 +267,25 @@ public ApiFuture publish(PubsubMessage message) { + "Publisher client. Please create a Publisher client with " + "setEnableMessageOrdering(true) in the builder."); - final OutstandingPublish outstandingPublish = - new OutstandingPublish(messageTransform.apply(message)); + PubsubMessageWrapper pubsubMessageWrapper = + PubsubMessageWrapper.newBuilder(this.messageTransform.apply(message)) + .setTopicName(this.topicName) + .build(); + pubsubMessageWrapper.startPublishSpan(this.tracer); + + final OutstandingPublish outstandingPublish = new OutstandingPublish(pubsubMessageWrapper); if (flowController != null) { + outstandingPublish.pubsubMessageWrapper.startPublishFlowControlSpan(this.tracer); try { flowController.acquire(outstandingPublish.messageSize); + outstandingPublish.pubsubMessageWrapper.endPublishFlowControlSpan(); } catch (FlowController.FlowControlException e) { if (!orderingKey.isEmpty()) { sequentialExecutor.stopPublish(orderingKey); } outstandingPublish.publishResult.setException(e); + outstandingPublish.pubsubMessageWrapper.setPublishFlowControlSpanException(e); return outstandingPublish.publishResult; } } @@ -288,9 +293,12 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { + outstandingPublish.pubsubMessageWrapper.startPublishSchedulerSpan(this.tracer); if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) { outstandingPublish.publishResult.setException( SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); + outstandingPublish.pubsubMessageWrapper.setPublishSchedulerException( + SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); return outstandingPublish.publishResult; } MessagesBatch messagesBatch = messagesBatches.get(orderingKey); @@ -463,12 +471,22 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) { context = publishContextWithCompression; } + + int numMessagesInBatch = outstandingBatch.size(); + List pubsubMessageList = new ArrayList(numMessagesInBatch); + + for (PubsubMessageWrapper pubsubMessageWrapper : outstandingBatch.getMessageWrappers()) { + pubsubMessageWrapper.endPublishSchedulerSpan(); + pubsubMessageWrapper.startPublishRpcSpan(this.tracer, numMessagesInBatch); + pubsubMessageList.add(pubsubMessageWrapper.getPubsubMessage()); + } + return publisherStub .publishCallable() .futureCall( PublishRequest.newBuilder() .setTopic(topicName) - .addAllMessages(outstandingBatch.getMessages()) + .addAllMessages(pubsubMessageList) .build(), context); } @@ -478,6 +496,7 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { logger.log(Level.WARNING, "Attempted to publish batch with zero messages."); return; } + final ApiFutureCallback futureCallback = new ApiFutureCallback() { @Override @@ -566,10 +585,10 @@ int size() { return outstandingPublishes.size(); } - private List getMessages() { - List results = new ArrayList<>(outstandingPublishes.size()); + private List getMessageWrappers() { + List results = new ArrayList<>(outstandingPublishes.size()); for (OutstandingPublish outstandingPublish : outstandingPublishes) { - results.add(outstandingPublish.message); + results.add(outstandingPublish.pubsubMessageWrapper); } return results; } @@ -579,31 +598,36 @@ private void onFailure(Throwable t) { if (flowController != null) { flowController.release(outstandingPublish.messageSize); } + outstandingPublish.pubsubMessageWrapper.setPublishRpcSpanException(t); outstandingPublish.publishResult.setException(t); } } private void onSuccess(Iterable results) { Iterator messagesResultsIt = outstandingPublishes.iterator(); + for (String messageId : results) { OutstandingPublish nextPublish = messagesResultsIt.next(); if (flowController != null) { flowController.release(nextPublish.messageSize); } nextPublish.publishResult.set(messageId); + nextPublish.pubsubMessageWrapper.setPublishSpanMessageIdAttribute(messageId); + nextPublish.pubsubMessageWrapper.endPublishRpcSpan(); + nextPublish.pubsubMessageWrapper.endPublishSpan(); } } } private static final class OutstandingPublish { final SettableApiFuture publishResult; - final PubsubMessage message; + final PubsubMessageWrapper pubsubMessageWrapper; final int messageSize; - OutstandingPublish(PubsubMessage message) { + OutstandingPublish(PubsubMessageWrapper pubsubMessageWrapper) { this.publishResult = SettableApiFuture.create(); - this.message = message; - this.messageSize = message.getSerializedSize(); + this.pubsubMessageWrapper = pubsubMessageWrapper; + this.messageSize = pubsubMessageWrapper.getPubsubMessage().getSerializedSize(); } } @@ -759,7 +783,7 @@ public PubsubMessage apply(PubsubMessage input) { private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION; private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD; - private OpenTelemetry openTelemetry; + private Optional openTelemetry = Optional.empty(); private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); @@ -893,7 +917,7 @@ public static BatchingSettings getDefaultBatchingSettings() { /** Sets the Open Telemetry Instance to allow for tracing. */ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { - this.openTelemetry = openTelemetry; + this.openTelemetry = Optional.of(openTelemetry); return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 7361cbf01..117909673 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import com.google.pubsub.v1.PubsubMessage; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import java.util.Optional; @@ -26,12 +27,16 @@ public class PubsubMessageWrapper { private final PubsubMessage pubsubMessage; - private static final String MESSAGE_ATTRIBUTE_PREFIX = "googclient_"; + // Pubsub Message attribute + private static final String PUBSUB_MESSAGE_ATTRIBUTE_PREFIX = "googclient_"; + + private String topicName; + private String subscriptionName; /** * Publish Spans are hierarchical - they must be open and closed in the following order: * - *

Publish -> (optional) Flow Control -> (optional) Scheduler -> PublishRpc + *

Publish -> (optional) Flow Control / (optional) Scheduler / PublishRpc */ private static final String SEND = "send"; @@ -40,6 +45,20 @@ public class PubsubMessageWrapper { private static final String PUBLISH_SCHEDULER_SPAN_NAME = "publish scheduler"; private static final String PUBLISH_RPC_SPAN_NAME = "send Publish"; + // Open Telemetry Span attributes + private static final String PUBLISH_SPAN_SYSTEM_ATTRIBUTE_KEY = "messaging.system"; + private static final String PUBLISH_SPAN_SYSTEM_ATTRIBUTE_VALUE = "pubsub"; + private static final String PUBLISH_SPAN_DESTINATION_ATTRIBUTE_KEY = "messaging.destination"; + private static final String PUBLISH_SPAN_DESTINATION_KIND_ATTRIBUTE_KEY = + "messaging.destination_kind"; + private static final String PUBLISH_SPAN_DESTINATION_KIND_ATTRIBUTE_VALUE = "topic"; + private static final String PUBLISH_SPAN_MESSAGE_ID_ATTRIBUTE_KEY = "messaging.message_id"; + private static final String PUBLISH_SPAN_MESSAGE_PAYLOAD_SIZE_BYTES_ATTRIBUTE_KEY = + "messaging.message_payload_size_bytes"; + private static final String PUBLISH_SPAN_ORDERING_KEY_ATTRIBUTE_KEY = "messaging.ordering_key"; + private static final String PUBLISH_RPC_SPAN_NUM_MESSAGES_IN_BATCH_ATTRIBUTE_KEY = + "messaging.pubsub.num_messages_in_batch"; + private Optional publishSpan = Optional.empty(); private Optional publishFlowControlSpan = Optional.empty(); private Optional publishSchedulerSpan = Optional.empty(); @@ -48,8 +67,8 @@ public class PubsubMessageWrapper { /** * Subscribe Spans are hierarchical - they must be open and closed in the following order: * - *

Receive -> (optional) Flow Control -> (optional) Scheduler -> Process -> ModifyAckDeadline - * -> Acknowledgement OR Negative Acknowledgement + *

Receive -> (optional) Flow Control / (optional) Scheduler -> Process -> ModifyAckDeadline -> + * Acknowledgement OR Negative Acknowledgement */ private static final String RECEIVE = "receive"; @@ -74,18 +93,43 @@ protected PubsubMessageWrapper(Builder builder) { this.pubsubMessage = builder.pubsubMessage; if (builder.topicName.isPresent()) { + this.topicName = builder.topicName.get(); this.PUBLISH_SPAN_NAME = builder.topicName.get() + " " + this.SEND; } if (builder.subscriptionName.isPresent()) { + this.subscriptionName = builder.subscriptionName.get(); this.RECEIVE_SPAN_NAME = builder.subscriptionName.get() + " " + this.RECEIVE; this.PROCESS_SPAN_NAME = builder.subscriptionName.get() + " " + this.PROCESS; } } + public PubsubMessage getPubsubMessage() { + return pubsubMessage; + } + public void startPublishSpan(Optional tracer) { if (tracer.isPresent()) { this.publishSpan = Optional.of(createAndStartSpan(tracer.get(), PUBLISH_SPAN_NAME)); + // Set required span attribute(s) + this.publishSpan + .get() + .setAttribute(PUBLISH_SPAN_SYSTEM_ATTRIBUTE_KEY, PUBLISH_SPAN_SYSTEM_ATTRIBUTE_VALUE); + this.publishSpan.get().setAttribute(PUBLISH_SPAN_DESTINATION_ATTRIBUTE_KEY, this.topicName); + this.publishSpan + .get() + .setAttribute( + PUBLISH_SPAN_DESTINATION_KIND_ATTRIBUTE_KEY, + PUBLISH_SPAN_DESTINATION_KIND_ATTRIBUTE_VALUE); + this.publishSpan + .get() + .setAttribute( + PUBLISH_SPAN_MESSAGE_PAYLOAD_SIZE_BYTES_ATTRIBUTE_KEY, + this.pubsubMessage.getSerializedSize()); + this.publishSpan + .get() + .setAttribute( + PUBLISH_SPAN_ORDERING_KEY_ATTRIBUTE_KEY, this.pubsubMessage.getOrderingKey()); } } @@ -95,6 +139,16 @@ public void endPublishSpan() { } } + /** + * Set the MessageId attribute for the Publish Span. This must be set AFTER the publish is done as + * we receive a messageId from the server + */ + public void setPublishSpanMessageIdAttribute(String messageId) { + if (this.publishSpan.isPresent()) { + this.publishSpan.get().setAttribute(PUBLISH_SPAN_MESSAGE_ID_ATTRIBUTE_KEY, messageId); + } + } + /** (Optional) Start Publish Flow Control Span */ public void startPublishFlowControlSpan(Optional tracer) { if (tracer.isPresent()) { @@ -112,16 +166,23 @@ public void endPublishFlowControlSpan() { } } + public void setPublishFlowControlSpanException(Throwable throwable) { + if (this.publishFlowControlSpan.isPresent()) { + this.publishFlowControlSpan + .get() + .setStatus(StatusCode.ERROR, "Publish flow control exception caught."); + this.publishFlowControlSpan.get().recordException(throwable); + this.endAllPublishSpans(); + } + } + /** (Optional) Start Flow Control Span */ public void startPublishSchedulerSpan(Optional tracer) { if (tracer.isPresent()) { - // Check for optional parent - Span parent = - this.publishFlowControlSpan.isPresent() - ? this.publishFlowControlSpan.get() - : this.publishSpan.get(); this.publishSchedulerSpan = - Optional.of(this.createAndStartSpan(tracer.get(), PUBLISH_SCHEDULER_SPAN_NAME, parent)); + Optional.of( + this.createAndStartSpan( + tracer.get(), PUBLISH_SCHEDULER_SPAN_NAME, this.publishSpan.get())); } } @@ -132,18 +193,25 @@ public void endPublishSchedulerSpan() { } } - public void startPublishRpcSpan(Optional tracer) { + public void setPublishSchedulerException(Throwable throwable) { + if (this.publishSchedulerSpan.isPresent()) { + this.publishSchedulerSpan + .get() + .setStatus(StatusCode.ERROR, "Publish scheduler exception caught."); + this.publishSchedulerSpan.get().recordException(throwable); + this.endAllPublishSpans(); + } + } + + public void startPublishRpcSpan(Optional tracer, int numMessagesInBatch) { if (tracer.isPresent()) { - Span parent; - if (this.publishSchedulerSpan.isPresent()) { - parent = this.publishSchedulerSpan.get(); - } else if (this.publishFlowControlSpan.isPresent()) { - parent = this.publishFlowControlSpan.get(); - } else { - parent = this.publishSpan.get(); - } this.publishRpcSpan = - Optional.of(createAndStartSpan(tracer.get(), PUBLISH_RPC_SPAN_NAME, parent)); + Optional.of( + createAndStartSpan(tracer.get(), PUBLISH_RPC_SPAN_NAME, this.publishSpan.get())); + // Set required span attribute(s) + this.publishRpcSpan + .get() + .setAttribute(PUBLISH_RPC_SPAN_NUM_MESSAGES_IN_BATCH_ATTRIBUTE_KEY, numMessagesInBatch); } } @@ -153,6 +221,16 @@ public void endPublishRpcSpan() { } } + public void setPublishRpcSpanException(Throwable throwable) { + if (this.publishRpcSpan.isPresent()) { + this.publishRpcSpan + .get() + .setStatus(StatusCode.ERROR, "Publish flow control exception caught."); + this.publishRpcSpan.get().recordException(throwable); + this.endAllPublishSpans(); + } + } + public void startSubscribeReceiveSpan(Optional tracer) { if (tracer.isPresent()) { this.subscribeSpan = Optional.of(createAndStartSpan(tracer.get(), RECEIVE_SPAN_NAME)); @@ -268,6 +346,14 @@ private Span createAndStartSpan(Tracer tracer, String spanName, Span parent) { return tracer.spanBuilder(spanName).setParent(Context.current().with(parent)).startSpan(); } + /** Helper function used after setting an exception to end all publish spans. */ + private void endAllPublishSpans() { + this.endPublishRpcSpan(); + this.endPublishSchedulerSpan(); + this.endPublishFlowControlSpan(); + this.endPublishSpan(); + } + public static PubsubMessageWrapper.Builder newBuilder(PubsubMessage pubsubMessage) { return new Builder(pubsubMessage); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 1add33117..b8e671475 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -22,8 +22,12 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; @@ -44,6 +48,8 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; +import com.google.testing.junit.testparameterinjector.TestParameter; +import com.google.testing.junit.testparameterinjector.TestParameterInjector; import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; @@ -51,6 +57,9 @@ import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -63,10 +72,9 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -@RunWith(JUnit4.class) +@RunWith(TestParameterInjector.class) public class PublisherImplTest { private static final ProjectTopicName TEST_TOPIC = @@ -86,6 +94,34 @@ public class PublisherImplTest { private Server testServer; + // Open Telemetry constants + private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.pubsub.v1"; + private static final String PUBLISH_SPAN_NAME = TEST_TOPIC + " send"; + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publish flow control"; + private static final String PUBLISH_SCHEDULER_SPAN_NAME = "publish scheduler"; + private static final String PUBLISH_RPC_SPAN_NAME = "send Publish"; + + // Open Telemetry Span attribute contants + private static final String PUBLISH_SPAN_SYSTEM_ATTRIBUTE_KEY = "messaging.system"; + private static final String PUBLISH_SPAN_SYSTEM_ATTRIBUTE_VALUE = "pubsub"; + private static final String PUBLISH_SPAN_DESTINATION_ATTRIBUTE_KEY = "messaging.destination"; + private static final String PUBLISH_SPAN_DESTINATION_KIND_ATTRIBUTE_KEY = + "messaging.destination_kind"; + private static final String PUBLISH_SPAN_DESTINATION_KIND_ATTRIBUTE_VALUE = "topic"; + private static final String PUBLISH_SPAN_MESSAGE_ID_ATTRIBUTE_KEY = "messaging.message_id"; + private static final String PUBLISH_SPAN_MESSAGE_PAYLOAD_SIZE_BYTES_ATTRIBUTE_KEY = + "messaging.message_payload_size_bytes"; + private static final String PUBLISH_SPAN_ORDERING_KEY_ATTRIBUTE_KEY = "messaging.ordering_key"; + private static final String PUBLISH_RPC_SPAN_NUM_MESSAGES_IN_BATCH_ATTRIBUTE_KEY = + "messaging.pubsub.num_messages_in_batch"; + + private OpenTelemetry mockOpenTelemetry; + private Tracer mockTracer; + private Span mockPublishSpan; + private Span mockFlowControlSpan; + private Span mockSchedulerSpan; + private Span mockPublishRpcSpan; + @Before public void setUp() throws Exception { testPublisherServiceImpl = new FakePublisherServiceImpl(); @@ -97,6 +133,13 @@ public void setUp() throws Exception { testServer.start(); fakeExecutor = new FakeScheduledExecutorService(); + + this.mockOpenTelemetry = mock(OpenTelemetry.class, RETURNS_DEEP_STUBS); + this.mockTracer = mock(Tracer.class, RETURNS_DEEP_STUBS); + this.mockPublishSpan = mock(Span.class); + this.mockFlowControlSpan = mock(Span.class); + this.mockSchedulerSpan = mock(Span.class); + this.mockPublishRpcSpan = mock(Span.class); } @After @@ -106,17 +149,22 @@ public void tearDown() throws Exception { } @Test - public void testPublishByDuration() throws Exception { - Publisher publisher = + public void testPublishByDuration(@TestParameter boolean useTracer) throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() - // To demonstrate that reaching duration will trigger publish .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setDelayThreshold(Duration.ofSeconds(5)) .setElementCountThreshold(10L) - .build()) - .build(); + .build()); + + if (useTracer) { + configureOpenTelemetryMocks(false, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); @@ -131,22 +179,34 @@ public void testPublishByDuration() throws Exception { assertEquals("1", publishFuture1.get()); assertEquals("2", publishFuture2.get()); - assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); shutdownTestPublisher(publisher); + + verify(this.mockPublishSpan, times(useTracer ? 2 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 2 : 0)).end(); + verify(this.mockPublishSpan, times(useTracer ? 1 : 0)) + .setAttribute(PUBLISH_SPAN_MESSAGE_ID_ATTRIBUTE_KEY, "1"); + verify(this.mockPublishSpan, times(useTracer ? 1 : 0)) + .setAttribute(PUBLISH_SPAN_MESSAGE_ID_ATTRIBUTE_KEY, "2"); } @Test - public void testPublishByNumBatchedMessages() throws Exception { - Publisher publisher = + public void testPublishByNumBatchedMessages(@TestParameter boolean useTracer) throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) .setDelayThreshold(Duration.ofSeconds(100)) - .build()) - .build(); + .build()); + + if (useTracer) { + configureOpenTelemetryMocks(true, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); testPublisherServiceImpl .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) @@ -170,22 +230,33 @@ public void testPublishByNumBatchedMessages() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); - fakeExecutor.advanceTime(Duration.ofSeconds(100)); shutdownTestPublisher(publisher); + + verify(this.mockPublishSpan, times(useTracer ? 4 : 0)).end(); + verify(this.mockSchedulerSpan, times(useTracer ? 4 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 4 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 4 : 0)) + .setAttribute(PUBLISH_RPC_SPAN_NUM_MESSAGES_IN_BATCH_ATTRIBUTE_KEY, 2); } @Test - public void testSinglePublishByNumBytes() throws Exception { - Publisher publisher = + public void testSinglePublishByNumBytes(@TestParameter boolean useTracer) throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) .setDelayThreshold(Duration.ofSeconds(100)) - .build()) - .build(); + .build()); + + if (useTracer) { + configureOpenTelemetryMocks(true, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); testPublisherServiceImpl .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) @@ -199,9 +270,11 @@ public void testSinglePublishByNumBytes() throws Exception { assertEquals("1", publishFuture1.get()); assertEquals("2", publishFuture2.get()); + assertFalse(publishFuture3.isDone()); ApiFuture publishFuture4 = sendTestMessage(publisher, "D"); + assertEquals("3", publishFuture3.get()); assertEquals("4", publishFuture4.get()); @@ -209,6 +282,12 @@ public void testSinglePublishByNumBytes() throws Exception { fakeExecutor.advanceTime(Duration.ofSeconds(100)); shutdownTestPublisher(publisher); + + verify(this.mockPublishSpan, times(useTracer ? 4 : 0)).end(); + verify(this.mockSchedulerSpan, times(useTracer ? 4 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 4 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 4 : 0)) + .setAttribute(PUBLISH_RPC_SPAN_NUM_MESSAGES_IN_BATCH_ATTRIBUTE_KEY, 2); } @Test @@ -258,6 +337,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -310,9 +390,70 @@ public void testPublishWithCompression() throws Exception { shutdownTestPublisher(publisher); } - private ApiFuture sendTestMessage(Publisher publisher, String data) { - return publisher.publish( - PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); + @Test + public void testBatchedMessagesWithOrderingKeyByNum(@TestParameter boolean useTracer) + throws Exception { + // Limit the number of maximum elements in a single batch to 3. + Builder publisherBuilder = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(3L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true); + + if (useTracer) { + configureOpenTelemetryMocks(true, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 3. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // One of the batches reaches the limit. + ApiFuture publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA"); + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get())); + + // The other batch reaches the limit. + ApiFuture publishFuture6 = sendTestMessageWithOrderingKey(publisher, "m6", "OrderB"); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + assertTrue(Integer.parseInt(publishFuture4.get()) < Integer.parseInt(publishFuture6.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); + + verify(this.mockPublishSpan, times(useTracer ? 6 : 0)).end(); + verify(this.mockSchedulerSpan, times(useTracer ? 6 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 6 : 0)).end(); + verify(this.mockPublishRpcSpan, times(useTracer ? 6 : 0)) + .setAttribute(PUBLISH_RPC_SPAN_NUM_MESSAGES_IN_BATCH_ATTRIBUTE_KEY, 3); } @Test @@ -494,6 +635,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); @@ -645,18 +787,9 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E } } - private ApiFuture sendTestMessageWithOrderingKey( - Publisher publisher, String data, String orderingKey) { - return publisher.publish( - PubsubMessage.newBuilder() - .setOrderingKey(orderingKey) - .setData(ByteString.copyFromUtf8(data)) - .build()); - } - @Test - public void testErrorPropagation() throws Exception { - Publisher publisher = + public void testErrorPropagation(@TestParameter boolean useTracer) throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( @@ -664,8 +797,15 @@ public void testErrorPropagation() throws Exception { .toBuilder() .setElementCountThreshold(1L) .setDelayThreshold(Duration.ofSeconds(5)) - .build()) - .build(); + .build()); + + if (useTracer) { + configureOpenTelemetryMocks(true, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); + testPublisherServiceImpl.addPublishError(Status.DATA_LOSS.asException()); try { sendTestMessage(publisher, "A").get(); @@ -673,6 +813,8 @@ public void testErrorPropagation() throws Exception { } catch (ExecutionException e) { assertThat(e.getCause()).isInstanceOf(DataLossException.class); } + verify(this.mockPublishRpcSpan, times(useTracer ? 1 : 0)) + .recordException(any(DataLossException.class)); } @Test @@ -689,6 +831,7 @@ public void testPublishFailureRetries() throws Exception { .build(); // To demonstrate that reaching duration will trigger publish testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -700,8 +843,9 @@ public void testPublishFailureRetries() throws Exception { } @Test(expected = ExecutionException.class) - public void testPublishFailureRetries_retriesDisabled() throws Exception { - Publisher publisher = + public void testPublishFailureRetries_retriesDisabled(@TestParameter boolean useTracer) + throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setRetrySettings( @@ -709,8 +853,14 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception { .toBuilder() .setTotalTimeout(Duration.ofSeconds(10)) .setMaxAttempts(1) - .build()) - .build(); + .build()); + + if (useTracer) { + configureOpenTelemetryMocks(true, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); @@ -722,6 +872,8 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception { assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1); shutdownTestPublisher(publisher); } + verify(this.mockPublishRpcSpan, times(useTracer ? 1 : 0)) + .recordException(any(DataLossException.class)); } @Test @@ -739,6 +891,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception { testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -764,6 +917,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -776,8 +930,9 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception } @Test(expected = ExecutionException.class) - public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exception { - Publisher publisher = + public void testPublishFailureRetries_nonRetryableFailsImmediately( + @TestParameter boolean useTracer) throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setRetrySettings( @@ -790,8 +945,15 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce .toBuilder() .setElementCountThreshold(1L) .setDelayThreshold(Duration.ofSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + .build()); + // To demonstrate that reaching duration will trigger publish + + if (useTracer) { + configureOpenTelemetryMocks(true, false); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -803,26 +965,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce publisher.shutdown(); assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } - } - - @Test - public void testPublishOpenTelemetry() throws Exception { - OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class, RETURNS_DEEP_STUBS); - Publisher.Builder builder = getTestPublisherBuilder(); - Publisher publisher = - getTestPublisherBuilder() - .setOpenTelemetry(mockOpenTelemetry) - .setBatchingSettings( - Publisher.Builder.getDefaultBatchingSettings() - .toBuilder() - .setElementCountThreshold(1L) - .build()) - .build(); - - PublishResponse publishResponse = PublishResponse.newBuilder().addMessageIds("1").build(); - testPublisherServiceImpl.addPublishResponse(publishResponse); - ApiFuture publishFuture = sendTestMessage(publisher, "A"); - assertEquals("1", publishFuture.get()); + verify(this.mockPublishRpcSpan, times(useTracer ? 1 : 0)) + .recordException(any(DataLossException.class)); } @Test @@ -1145,8 +1289,9 @@ public void testMessageExceedsFlowControlLimits_throwException() throws Exceptio } @Test - public void testPublishFlowControl_throwException() throws Exception { - Publisher publisher = + public void testPublishFlowControl_throwException(@TestParameter boolean useTracer) + throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( @@ -1161,8 +1306,14 @@ public void testPublishFlowControl_throwException() throws Exception { .setMaxOutstandingElementCount(1L) .setMaxOutstandingRequestBytes(10L) .build()) - .build()) - .build(); + .build()); + + if (useTracer) { + configureOpenTelemetryMocks(true, true); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); // Sending a message less than the byte limit succeeds. ApiFuture publishFuture1 = sendTestMessage(publisher, "AAAA"); @@ -1182,13 +1333,17 @@ public void testPublishFlowControl_throwException() throws Exception { // Sending another message succeeds. ApiFuture publishFuture4 = sendTestMessage(publisher, "AAAA"); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("2")); assertEquals("2", publishFuture4.get()); + verify(this.mockFlowControlSpan, times(useTracer ? 1 : 0)) + .recordException(any(FlowController.MaxOutstandingElementCountReachedException.class)); } @Test - public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Exception { - Publisher publisher = + public void testPublishFlowControl_throwExceptionWithOrderingKey(@TestParameter boolean useTracer) + throws Exception { + Builder publisherBuilder = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( @@ -1204,8 +1359,14 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except .setMaxOutstandingRequestBytes(10L) .build()) .build()) - .setEnableMessageOrdering(true) - .build(); + .setEnableMessageOrdering(true); + + if (useTracer) { + configureOpenTelemetryMocks(true, true); + publisherBuilder.setOpenTelemetry(this.mockOpenTelemetry); + } + + Publisher publisher = publisherBuilder.build(); // Sending a message less than the byte limit succeeds. ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); @@ -1231,6 +1392,8 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } + verify(this.mockFlowControlSpan, times(useTracer ? 1 : 0)) + .recordException(any(FlowController.MaxOutstandingElementCountReachedException.class)); } @Test @@ -1318,12 +1481,56 @@ public void run() { }); publish3Completed.await(); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); response3Sent.countDown(); publish4Completed.await(); } + /** Helper function to set up our Open Telemetry mocks */ + private void configureOpenTelemetryMocks(boolean useScheduler, boolean useFlowControl) { + when(this.mockOpenTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME)).thenReturn(this.mockTracer); + when(this.mockTracer.spanBuilder(PUBLISH_SPAN_NAME).startSpan()) + .thenReturn(this.mockPublishSpan); + + if (useScheduler) { + when(this.mockTracer + .spanBuilder(PUBLISH_SCHEDULER_SPAN_NAME) + .setParent(Context.current().with(this.mockPublishSpan)) + .startSpan()) + .thenReturn(this.mockSchedulerSpan); + } + + if (useFlowControl) { + when(this.mockTracer + .spanBuilder(PUBLISH_FLOW_CONTROL_SPAN_NAME) + .setParent(Context.current().with(this.mockPublishSpan)) + .startSpan()) + .thenReturn(this.mockFlowControlSpan); + } + + when(this.mockTracer + .spanBuilder(PUBLISH_RPC_SPAN_NAME) + .setParent(Context.current().with(this.mockPublishSpan)) + .startSpan()) + .thenReturn(this.mockPublishRpcSpan); + } + + private ApiFuture sendTestMessage(Publisher publisher, String data) { + return publisher.publish( + PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PubsubMessageWrapperTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PubsubMessageWrapperTest.java index 4bf6f2a6e..db7f01e21 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PubsubMessageWrapperTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PubsubMessageWrapperTest.java @@ -77,9 +77,6 @@ public void testPublishSpans( mockTracer = Optional.of(mock(Tracer.class, RETURNS_DEEP_STUBS)); when(mockTracer.get().spanBuilder(PUBLISH_SPAN_NAME).startSpan()).thenReturn(mockPublishSpan); - // Need to set up our parent span(s) for the optional spans - Span publishRpcSpanParent = mockPublishSpan; - if (useFlowControl) { when(mockTracer .get() @@ -87,24 +84,21 @@ public void testPublishSpans( .setParent(Context.current().with(mockPublishSpan)) .startSpan()) .thenReturn(mockFlowControlSpan); - publishRpcSpanParent = mockFlowControlSpan; } if (useScheduler) { - Span schedulerSpanParent = useFlowControl ? mockFlowControlSpan : mockPublishSpan; when(mockTracer .get() .spanBuilder(PUBLISH_SCHEDULER_SPAN_NAME) - .setParent(Context.current().with(schedulerSpanParent)) + .setParent(Context.current().with(mockPublishSpan)) .startSpan()) .thenReturn(mockSchedulerSpan); - publishRpcSpanParent = schedulerSpanParent; } when(mockTracer .get() .spanBuilder(PUBLISH_RPC_SPAN_NAME) - .setParent(Context.current().with(mockSchedulerSpan)) + .setParent(Context.current().with(mockPublishSpan)) .startSpan()) .thenReturn(mockPublishRpcSpan); } @@ -118,7 +112,7 @@ public void testPublishSpans( pubsubMessageWrapper.startPublishSchedulerSpan(mockTracer); } - pubsubMessageWrapper.startPublishRpcSpan(mockTracer); + pubsubMessageWrapper.startPublishRpcSpan(mockTracer, 1); pubsubMessageWrapper.endPublishRpcSpan(); pubsubMessageWrapper.endPublishSchedulerSpan();