From 01401ace198d05e0b735691655bdf14ee54ad9cb Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 08:04:53 -0800 Subject: [PATCH 01/22] Update PublisherImplTest.java --- .../test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 9985efc6b..0799cd5b4 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -66,6 +66,7 @@ @RunWith(JUnit4.class) public class PublisherImplTest { + // Testing private static final ProjectTopicName TEST_TOPIC = ProjectTopicName.of("test-project", "test-topic"); From e7c0c39cd68e707d2c77831c4706f867119b96bc Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 08:15:20 -0800 Subject: [PATCH 02/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 0799cd5b4..841d63c25 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -99,7 +99,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - testServer.shutdownNow().awaitTermination(); + testServer.shutdownNow().awaitTermination(10, TimeUnit.SECONDS); testChannel.shutdown(); } @@ -238,7 +238,7 @@ public void testPublishByShutdown() throws Exception { assertEquals("2", publishFuture2.get()); fakeExecutor.advanceTime(Duration.ofSeconds(100)); - publisher.awaitTermination(1, TimeUnit.MINUTES); + publisher.awaitTermination(10, TimeUnit.SECONDS); } @Test @@ -499,7 +499,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } @Test @@ -770,7 +770,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } @Test(expected = ExecutionException.class) @@ -799,7 +799,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce } finally { assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1); publisher.shutdown(); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } } @@ -823,7 +823,7 @@ public void testPublisherGetters() throws Exception { assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } @Test @@ -1024,7 +1024,7 @@ public void testAwaitTermination() throws Exception { .build(); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); publisher.shutdown(); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } @Test @@ -1035,13 +1035,13 @@ public void testShutDown() throws Exception { publisher.publish( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build())) .andReturn(apiFuture); - EasyMock.expect(publisher.awaitTermination(1, TimeUnit.MINUTES)).andReturn(true); + EasyMock.expect(publisher.awaitTermination(10, TimeUnit.SECONDS)).andReturn(true); publisher.shutdown(); EasyMock.expectLastCall().once(); EasyMock.replay(publisher); sendTestMessage(publisher, "A"); publisher.shutdown(); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } @Test @@ -1313,6 +1313,6 @@ private Builder getTestPublisherBuilder() { private void shutdownTestPublisher(Publisher publisher) throws InterruptedException { publisher.shutdown(); fakeExecutor.advanceTime(Duration.ofSeconds(10)); - assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } } From f58238c058c59c73853316f9df712d7f21c4a4e5 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 08:57:00 -0800 Subject: [PATCH 03/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 60 ------------------- 1 file changed, 60 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 841d63c25..78502d55f 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -592,66 +592,6 @@ public void testResumePublish() throws Exception { shutdownTestPublisher(publisher); } - @Test - public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(500)) - .build()) - .setEnableMessageOrdering(true) - .build(); - - // Send two messages that will fulfill the first batch, which will return a failure. - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); - ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); - - // A third message will fail because the first attempt to publish failed. - ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); - - try { - publishFuture1.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - } - - try { - publishFuture2.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - } - - try { - publishFuture3.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - - // A subsequent attempt fails immediately. - ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); - try { - publishFuture4.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - } - - private ApiFuture sendTestMessageWithOrderingKey( - Publisher publisher, String data, String orderingKey) { - return publisher.publish( - PubsubMessage.newBuilder() - .setOrderingKey(orderingKey) - .setData(ByteString.copyFromUtf8(data)) - .build()); - } - @Test public void testErrorPropagation() throws Exception { Publisher publisher = From 88a22eb068ac05fa16d8b28aaa64b51661e0e383 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 08:58:41 -0800 Subject: [PATCH 04/22] Update PublisherImplTest.java --- .../com/google/cloud/pubsub/v1/PublisherImplTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 78502d55f..1ad2b9884 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -366,6 +366,15 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { shutdownTestPublisher(publisher); } + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { // Limit the batching timeout to 100 seconds. From cc32d5b48ec116ceff69ac0465e526194bb93c5d Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 09:24:09 -0800 Subject: [PATCH 05/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 90 ------------------- 1 file changed, 90 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 1ad2b9884..8b2849beb 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -511,96 +511,6 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } - @Test - /** - * Make sure that resume publishing works as expected: - * - *
    - *
  1. publish with key orderA which returns a failure. - *
  2. publish with key orderA again, which should fail immediately - *
  3. publish with key orderB, which should succeed - *
  4. resume publishing on key orderA - *
  5. publish with key orderA, which should now succeed - *
- */ - public void testResumePublish() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .build()) - .setEnableMessageOrdering(true) - .build(); - - ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); - ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); - - fakeExecutor.advanceTime(Duration.ZERO); - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); - - // This exception should stop future publishing to the same key - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - - fakeExecutor.advanceTime(Duration.ZERO); - - try { - future1.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - } - - try { - future2.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - } - - // Submit new requests with orderA that should fail. - ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); - ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); - - try { - future3.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - - try { - future4.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - - // Submit a new request with orderB, which should succeed - ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); - ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); - - testPublisherServiceImpl.addPublishResponse( - PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); - - Assert.assertEquals("5", future5.get()); - Assert.assertEquals("6", future6.get()); - - // Resume publishing of "orderA", which should now succeed - publisher.resumePublish("orderA"); - - ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); - ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); - - testPublisherServiceImpl.addPublishResponse( - PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); - - Assert.assertEquals("7", future7.get()); - Assert.assertEquals("8", future8.get()); - - shutdownTestPublisher(publisher); - } - @Test public void testErrorPropagation() throws Exception { Publisher publisher = From 58a844179435017aeb4c9e1e9cb4b960dd8e9daf Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 12 Jan 2024 17:26:26 +0000 Subject: [PATCH 06/22] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 8b2849beb..f553be35e 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -56,7 +56,6 @@ import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; From 3fe0afda3f208ee48845290d9ca55fe443dacb73 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 09:27:55 -0800 Subject: [PATCH 07/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 69 ++++++++++++++++--- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index f553be35e..6877bdcaf 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -365,15 +365,6 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { shutdownTestPublisher(publisher); } - private ApiFuture sendTestMessageWithOrderingKey( - Publisher publisher, String data, String orderingKey) { - return publisher.publish( - PubsubMessage.newBuilder() - .setOrderingKey(orderingKey) - .setData(ByteString.copyFromUtf8(data)) - .build()); - } - @Test public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { // Limit the batching timeout to 100 seconds. @@ -510,6 +501,66 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); } + @Test + public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(500)) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Send two messages that will fulfill the first batch, which will return a failure. + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); + + // A third message will fail because the first attempt to publish failed. + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); + + try { + publishFuture1.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture2.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture3.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // A subsequent attempt fails immediately. + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); + try { + publishFuture4.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testErrorPropagation() throws Exception { Publisher publisher = From 3ac2b9385fb3c65c41e6bca1b531e813d4c94558 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 09:37:19 -0800 Subject: [PATCH 08/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 110 ++++++++++++++++-- 1 file changed, 100 insertions(+), 10 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 6877bdcaf..b60eee275 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -98,7 +98,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - testServer.shutdownNow().awaitTermination(10, TimeUnit.SECONDS); + testServer.shutdownNow().awaitTermination(1, TimeUnit.MINUTES); testChannel.shutdown(); } @@ -237,7 +237,7 @@ public void testPublishByShutdown() throws Exception { assertEquals("2", publishFuture2.get()); fakeExecutor.advanceTime(Duration.ofSeconds(100)); - publisher.awaitTermination(10, TimeUnit.SECONDS); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -498,7 +498,97 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + } + + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure. + *
  2. publish with key orderA again, which should fail immediately + *
  3. publish with key orderB, which should succeed + *
  4. resume publishing on key orderA + *
  5. publish with key orderA, which should now succeed + *
+ */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ofSeconds(1)); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ofSeconds(1)); + + try { + future1.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + shutdownTestPublisher(publisher); } @Test @@ -679,7 +769,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test(expected = ExecutionException.class) @@ -708,7 +798,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce } finally { assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } } @@ -732,7 +822,7 @@ public void testPublisherGetters() throws Exception { assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -933,7 +1023,7 @@ public void testAwaitTermination() throws Exception { .build(); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -944,13 +1034,13 @@ public void testShutDown() throws Exception { publisher.publish( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build())) .andReturn(apiFuture); - EasyMock.expect(publisher.awaitTermination(10, TimeUnit.SECONDS)).andReturn(true); + EasyMock.expect(publisher.awaitTermination(1, TimeUnit.MINUTES)).andReturn(true); publisher.shutdown(); EasyMock.expectLastCall().once(); EasyMock.replay(publisher); sendTestMessage(publisher, "A"); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -1222,6 +1312,6 @@ private Builder getTestPublisherBuilder() { private void shutdownTestPublisher(Publisher publisher) throws InterruptedException { publisher.shutdown(); fakeExecutor.advanceTime(Duration.ofSeconds(10)); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } } From b973b2a9cbe74c31e2e22013d674a1ec2cc67b31 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 12 Jan 2024 17:39:39 +0000 Subject: [PATCH 09/22] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../cloud/pubsub/v1/PublisherImplTest.java | 142 +++++++++--------- 1 file changed, 71 insertions(+), 71 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index b60eee275..97182b58c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -501,94 +501,94 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } - @Test - /** - * Make sure that resume publishing works as expected: - * - *
    - *
  1. publish with key orderA which returns a failure. - *
  2. publish with key orderA again, which should fail immediately - *
  3. publish with key orderB, which should succeed - *
  4. resume publishing on key orderA - *
  5. publish with key orderA, which should now succeed - *
- */ - public void testResumePublish() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .build()) - .setEnableMessageOrdering(true) - .build(); - - ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); - ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure. + *
  2. publish with key orderA again, which should fail immediately + *
  3. publish with key orderB, which should succeed + *
  4. resume publishing on key orderA + *
  5. publish with key orderA, which should now succeed + *
+ */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); fakeExecutor.advanceTime(Duration.ofSeconds(1)); - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); - // This exception should stop future publishing to the same key - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - fakeExecutor.advanceTime(Duration.ofSeconds(1)); + fakeExecutor.advanceTime(Duration.ofSeconds(1)); - try { - future1.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - } + try { + future1.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } - try { - future2.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - } + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } - // Submit new requests with orderA that should fail. - ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); - ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); - try { - future3.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } - try { - future4.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } - // Submit a new request with orderB, which should succeed - ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); - ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); - testPublisherServiceImpl.addPublishResponse( - PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); - Assert.assertEquals("5", future5.get()); - Assert.assertEquals("6", future6.get()); + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); - // Resume publishing of "orderA", which should now succeed - publisher.resumePublish("orderA"); + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); - ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); - ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); - testPublisherServiceImpl.addPublishResponse( - PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); - Assert.assertEquals("7", future7.get()); - Assert.assertEquals("8", future8.get()); + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); - shutdownTestPublisher(publisher); + shutdownTestPublisher(publisher); } @Test From 42191829534c95c6f8f0e1b3011a21e11cdda20e Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 09:43:30 -0800 Subject: [PATCH 10/22] Update PublisherImplTest.java --- .../java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 97182b58c..94c748aed 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -65,7 +65,6 @@ @RunWith(JUnit4.class) public class PublisherImplTest { - // Testing private static final ProjectTopicName TEST_TOPIC = ProjectTopicName.of("test-project", "test-topic"); @@ -98,7 +97,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - testServer.shutdownNow().awaitTermination(1, TimeUnit.MINUTES); + testServer.shutdownNow().awaitTermination(); testChannel.shutdown(); } @@ -527,15 +526,12 @@ public void testResumePublish() throws Exception { ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); - fakeExecutor.advanceTime(Duration.ofSeconds(1)); assertFalse(future1.isDone()); assertFalse(future2.isDone()); // This exception should stop future publishing to the same key testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - fakeExecutor.advanceTime(Duration.ofSeconds(1)); - try { future1.get(); Assert.fail("This should fail."); From 0c71e42d40d6255fc6d020e10bd9568ab276cb75 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 09:57:10 -0800 Subject: [PATCH 11/22] Update PublisherImplTest.java --- .../test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 94c748aed..d86a26067 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -56,6 +56,7 @@ import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; From 067706edfd594e8e82b9f69cff0848a751d4d92f Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:04:53 -0800 Subject: [PATCH 12/22] Update PublisherImplTest.java --- .../com/google/cloud/pubsub/v1/PublisherImplTest.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index d86a26067..45f66b536 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -524,14 +524,10 @@ public void testResumePublish() throws Exception { .setEnableMessageOrdering(true) .build(); - ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); - ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); - - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); - // This exception should stop future publishing to the same key testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); try { future1.get(); From 1ce4b9159b83a368aee1ac2e7fc80c2f8a535625 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:14:30 -0800 Subject: [PATCH 13/22] Update PublisherImplTest.java --- .../google/cloud/pubsub/v1/PublisherImplTest.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 45f66b536..474572691 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -524,11 +524,18 @@ public void testResumePublish() throws Exception { .setEnableMessageOrdering(true) .build(); - // This exception should stop future publishing to the same key - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + fakeExecutor.advanceTime(Duration.ZERO); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ZERO); + try { future1.get(); Assert.fail("This should fail."); @@ -633,6 +640,8 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } + + shutdownTestPublisher(publisher); } private ApiFuture sendTestMessageWithOrderingKey( From 93dcab0c9fd6671ab50e3765825278ab31b5cdf8 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:26:18 -0800 Subject: [PATCH 14/22] Update PublisherImplTest.java --- .../test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 474572691..b59d44596 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -587,8 +587,6 @@ public void testResumePublish() throws Exception { Assert.assertEquals("7", future7.get()); Assert.assertEquals("8", future8.get()); - - shutdownTestPublisher(publisher); } @Test From 7b92be770b74774a76de5c0770f3e4272ecb64a9 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:26:55 -0800 Subject: [PATCH 15/22] Update PublisherImplTest.java --- .../test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index b59d44596..4223b3bb6 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -638,8 +638,6 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } - - shutdownTestPublisher(publisher); } private ApiFuture sendTestMessageWithOrderingKey( From 45211cb50208e8c2af5c1f0c2ad90003bbdba6de Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:48:51 -0800 Subject: [PATCH 16/22] Update PublisherImplTest.java --- .../com/google/cloud/pubsub/v1/PublisherImplTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 4223b3bb6..e452548b1 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -550,7 +550,6 @@ public void testResumePublish() throws Exception { // Submit new requests with orderA that should fail. ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); - ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); try { future3.get(); @@ -559,13 +558,6 @@ public void testResumePublish() throws Exception { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } - try { - future4.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - // Submit a new request with orderB, which should succeed ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); From 5d6b1096b68de5cc4a452a132fbfa4dc92e28c44 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:52:29 -0800 Subject: [PATCH 17/22] Update PublisherImplTest.java --- .../google/cloud/pubsub/v1/PublisherImplTest.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index e452548b1..84fc50cfd 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -516,10 +516,12 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { public void testResumePublish() throws Exception { Publisher publisher = getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(100)) .build()) .setEnableMessageOrdering(true) .build(); @@ -527,15 +529,12 @@ public void testResumePublish() throws Exception { ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); - fakeExecutor.advanceTime(Duration.ZERO); assertFalse(future1.isDone()); assertFalse(future2.isDone()); // This exception should stop future publishing to the same key testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - fakeExecutor.advanceTime(Duration.ZERO); - try { future1.get(); Assert.fail("This should fail."); @@ -550,6 +549,7 @@ public void testResumePublish() throws Exception { // Submit new requests with orderA that should fail. ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); try { future3.get(); @@ -558,6 +558,13 @@ public void testResumePublish() throws Exception { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + // Submit a new request with orderB, which should succeed ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); From 7acbabfee691d9bb196fcd7b29e19caa1f0c4ed6 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:55:04 -0800 Subject: [PATCH 18/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 87 ------------------- 1 file changed, 87 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 84fc50cfd..d143bfd65 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -501,93 +501,6 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } - @Test - /** - * Make sure that resume publishing works as expected: - * - *
    - *
  1. publish with key orderA which returns a failure. - *
  2. publish with key orderA again, which should fail immediately - *
  3. publish with key orderB, which should succeed - *
  4. resume publishing on key orderA - *
  5. publish with key orderA, which should now succeed - *
- */ - public void testResumePublish() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100)) - .build()) - .setEnableMessageOrdering(true) - .build(); - - ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); - ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); - - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); - - // This exception should stop future publishing to the same key - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - - try { - future1.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - } - - try { - future2.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - } - - // Submit new requests with orderA that should fail. - ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); - ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); - - try { - future3.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - - try { - future4.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - - // Submit a new request with orderB, which should succeed - ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); - ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); - - testPublisherServiceImpl.addPublishResponse( - PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); - - Assert.assertEquals("5", future5.get()); - Assert.assertEquals("6", future6.get()); - - // Resume publishing of "orderA", which should now succeed - publisher.resumePublish("orderA"); - - ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); - ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); - - testPublisherServiceImpl.addPublishResponse( - PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); - - Assert.assertEquals("7", future7.get()); - Assert.assertEquals("8", future8.get()); - } - @Test public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { Publisher publisher = From d4230d2563d643f2c2bb35d3a84229e9a1b58ce4 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 12 Jan 2024 18:57:23 +0000 Subject: [PATCH 19/22] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index d143bfd65..e050dc963 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -56,7 +56,6 @@ import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; From 4f46d6845eebf0da0e0478e49a985a196c14359e Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:06:15 -0800 Subject: [PATCH 20/22] Update PublisherImplTest.java --- .../cloud/pubsub/v1/PublisherImplTest.java | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index e050dc963..9efea5e5c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -500,57 +500,6 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } - @Test - public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(500)) - .build()) - .setEnableMessageOrdering(true) - .build(); - - // Send two messages that will fulfill the first batch, which will return a failure. - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); - ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); - - // A third message will fail because the first attempt to publish failed. - ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); - - try { - publishFuture1.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - } - - try { - publishFuture2.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - } - - try { - publishFuture3.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - - // A subsequent attempt fails immediately. - ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); - try { - publishFuture4.get(); - fail("Should have failed."); - } catch (ExecutionException e) { - assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); - } - } - private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish( From 543099bd0c13873f0d4f5961eb1263e9e547d6da Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:22:39 -0800 Subject: [PATCH 21/22] fix: temporarily remove publisher tests causing timeouts --- .../cloud/pubsub/v1/PublisherImplTest.java | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 9efea5e5c..500ee5eca 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -500,6 +500,150 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } + /* + Temporarily disabled due to https://github.com/googleapis/java-pubsub/issues/1861. + TODO(maitrimangal): Enable once resolved. + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure. + *
  2. publish with key orderA again, which should fail immediately + *
  3. publish with key orderB, which should succeed + *
  4. resume publishing on key orderA + *
  5. publish with key orderA, which should now succeed + *
+ */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ZERO); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ZERO); + + try { + future1.get(); + fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + shutdownTestPublisher(publisher); + } + + @Test + public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(500)) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Send two messages that will fulfill the first batch, which will return a failure. + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); + + // A third message will fail because the first attempt to publish failed. + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); + + try { + publishFuture1.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture2.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture3.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // A subsequent attempt fails immediately. + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); + try { + publishFuture4.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + }*/ + private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish( From c6a619c356fff7bdb80b22b461a18feec4de7ecc Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:27:20 -0800 Subject: [PATCH 22/22] Update PublisherImplTest.java --- .../com/google/cloud/pubsub/v1/PublisherImplTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 500ee5eca..9785b7716 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -500,10 +500,6 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } - /* - Temporarily disabled due to https://github.com/googleapis/java-pubsub/issues/1861. - TODO(maitrimangal): Enable once resolved. - @Test /** * Make sure that resume publishing works as expected: * @@ -515,6 +511,10 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { *
  • publish with key orderA, which should now succeed * */ + /* + Temporarily disabled due to https://github.com/googleapis/java-pubsub/issues/1861. + TODO(maitrimangal): Enable once resolved. + @Test public void testResumePublish() throws Exception { Publisher publisher = getTestPublisherBuilder() @@ -642,7 +642,8 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } - }*/ + } + */ private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) {