diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 9686ec445..a56d2c712 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -511,7 +511,6 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { *
  • publish with key orderA, which should now succeed * */ - /* @Test public void testResumePublish() throws Exception { System.out.println("Starting testResumePublish"); @@ -532,6 +531,8 @@ public void testResumePublish() throws Exception { assertFalse(future1.isDone()); assertFalse(future2.isDone()); + System.out.println("Publish 1 and 2"); + // This exception should stop future publishing to the same key testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); @@ -541,12 +542,14 @@ public void testResumePublish() throws Exception { future1.get(); fail("This should fail."); } catch (ExecutionException e) { + System.out.println("Message 1 failed"); } try { future2.get(); fail("This should fail."); } catch (ExecutionException e) { + System.out.println("Message 2 failed"); } // Submit new requests with orderA that should fail. @@ -558,6 +561,7 @@ public void testResumePublish() throws Exception { fail("This should fail."); } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + System.out.println("Message 3 failed"); } try { @@ -565,6 +569,7 @@ public void testResumePublish() throws Exception { fail("This should fail."); } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + System.out.println("Message 4 failed"); } // Submit a new request with orderB, which should succeed @@ -577,26 +582,32 @@ public void testResumePublish() throws Exception { assertEquals("5", future5.get()); assertEquals("6", future6.get()); + System.out.println("Publish and receive 5 and 6"); + // Resume publishing of "orderA", which should now succeed publisher.resumePublish("orderA"); + System.out.println("Resume publishing of orderA"); + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + System.out.println("Publish of 7 and 8"); assertEquals("7", future7.get()); assertEquals("8", future8.get()); + System.out.println("Receive 7 and 8"); + shutdownTestPublisher(publisher); System.out.println("Ending testResumePublish"); } - */ @Test public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { - System.out.println("Starting testPublishThrowExceptionForUnsubmittedOrderingKeyMessage"); Publisher publisher = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) @@ -635,7 +646,7 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } - fakeExecutor.advanceTime(Duration.ofSeconds(5)); + fakeExecutor.advanceTime(Duration.ZERO); // A subsequent attempt fails immediately. ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); @@ -645,7 +656,6 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); } - System.out.println("Ending testPublishThrowExceptionForUnsubmittedOrderingKeyMessage"); } private ApiFuture sendTestMessageWithOrderingKey(