From 3fe0afda3f208ee48845290d9ca55fe443dacb73 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 09:27:55 -0800 Subject: [PATCH] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 69 ++++++++++++++++--- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index f553be35e..6877bdcaf 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -365,15 +365,6 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { shutdownTestPublisher(publisher); } - private ApiFuture sendTestMessageWithOrderingKey( - Publisher publisher, String data, String orderingKey) { - return publisher.publish( - PubsubMessage.newBuilder() - .setOrderingKey(orderingKey) - .setData(ByteString.copyFromUtf8(data)) - .build()); - } - @Test public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { // Limit the batching timeout to 100 seconds. @@ -510,6 +501,66 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } + @Test + public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(500)) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Send two messages that will fulfill the first batch, which will return a failure. + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); + + // A third message will fail because the first attempt to publish failed. + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); + + try { + publishFuture1.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture2.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture3.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // A subsequent attempt fails immediately. + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); + try { + publishFuture4.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testErrorPropagation() throws Exception { Publisher publisher =