diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 6877bdcaf..b60eee275 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -98,7 +98,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - testServer.shutdownNow().awaitTermination(10, TimeUnit.SECONDS); + testServer.shutdownNow().awaitTermination(1, TimeUnit.MINUTES); testChannel.shutdown(); } @@ -237,7 +237,7 @@ public void testPublishByShutdown() throws Exception { assertEquals("2", publishFuture2.get()); fakeExecutor.advanceTime(Duration.ofSeconds(100)); - publisher.awaitTermination(10, TimeUnit.SECONDS); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -498,7 +498,97 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + } + + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure. + *
  2. publish with key orderA again, which should fail immediately + *
  3. publish with key orderB, which should succeed + *
  4. resume publishing on key orderA + *
  5. publish with key orderA, which should now succeed + *
+ */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ofSeconds(1)); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ofSeconds(1)); + + try { + future1.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + shutdownTestPublisher(publisher); } @Test @@ -679,7 +769,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test(expected = ExecutionException.class) @@ -708,7 +798,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce } finally { assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } } @@ -732,7 +822,7 @@ public void testPublisherGetters() throws Exception { assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -933,7 +1023,7 @@ public void testAwaitTermination() throws Exception { .build(); ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -944,13 +1034,13 @@ public void testShutDown() throws Exception { publisher.publish( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build())) .andReturn(apiFuture); - EasyMock.expect(publisher.awaitTermination(10, TimeUnit.SECONDS)).andReturn(true); + EasyMock.expect(publisher.awaitTermination(1, TimeUnit.MINUTES)).andReturn(true); publisher.shutdown(); EasyMock.expectLastCall().once(); EasyMock.replay(publisher); sendTestMessage(publisher, "A"); publisher.shutdown(); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } @Test @@ -1222,6 +1312,6 @@ private Builder getTestPublisherBuilder() { private void shutdownTestPublisher(Publisher publisher) throws InterruptedException { publisher.shutdown(); fakeExecutor.advanceTime(Duration.ofSeconds(10)); - assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } }