diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java
index 9efea5e5c..500ee5eca 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java
@@ -500,6 +500,150 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}
+ /*
+ Temporarily disabled due to https://github.com/googleapis/java-pubsub/issues/1861.
+ TODO(maitrimangal): Enable once resolved.
+ @Test
+ /**
+ * Make sure that resume publishing works as expected:
+ *
+ *
+ * - publish with key orderA which returns a failure.
+ *
- publish with key orderA again, which should fail immediately
+ *
- publish with key orderB, which should succeed
+ *
- resume publishing on key orderA
+ *
- publish with key orderA, which should now succeed
+ *
+ */
+ public void testResumePublish() throws Exception {
+ Publisher publisher =
+ getTestPublisherBuilder()
+ .setBatchingSettings(
+ Publisher.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(2L)
+ .build())
+ .setEnableMessageOrdering(true)
+ .build();
+
+ ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA");
+ ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA");
+
+ fakeExecutor.advanceTime(Duration.ZERO);
+ assertFalse(future1.isDone());
+ assertFalse(future2.isDone());
+
+ // This exception should stop future publishing to the same key
+ testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
+
+ fakeExecutor.advanceTime(Duration.ZERO);
+
+ try {
+ future1.get();
+ fail("This should fail.");
+ } catch (ExecutionException e) {
+ }
+
+ try {
+ future2.get();
+ fail("This should fail.");
+ } catch (ExecutionException e) {
+ }
+
+ // Submit new requests with orderA that should fail.
+ ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA");
+ ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA");
+
+ try {
+ future3.get();
+ fail("This should fail.");
+ } catch (ExecutionException e) {
+ assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
+ }
+
+ try {
+ future4.get();
+ fail("This should fail.");
+ } catch (ExecutionException e) {
+ assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
+ }
+
+ // Submit a new request with orderB, which should succeed
+ ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB");
+ ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB");
+
+ testPublisherServiceImpl.addPublishResponse(
+ PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6"));
+
+ Assert.assertEquals("5", future5.get());
+ Assert.assertEquals("6", future6.get());
+
+ // Resume publishing of "orderA", which should now succeed
+ publisher.resumePublish("orderA");
+
+ ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA");
+ ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA");
+
+ testPublisherServiceImpl.addPublishResponse(
+ PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8"));
+
+ Assert.assertEquals("7", future7.get());
+ Assert.assertEquals("8", future8.get());
+
+ shutdownTestPublisher(publisher);
+ }
+
+ @Test
+ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception {
+ Publisher publisher =
+ getTestPublisherBuilder()
+ .setExecutorProvider(SINGLE_THREAD_EXECUTOR)
+ .setBatchingSettings(
+ Publisher.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(2L)
+ .setDelayThreshold(Duration.ofSeconds(500))
+ .build())
+ .setEnableMessageOrdering(true)
+ .build();
+
+ // Send two messages that will fulfill the first batch, which will return a failure.
+ testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
+ ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a");
+ ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a");
+
+ // A third message will fail because the first attempt to publish failed.
+ ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a");
+
+ try {
+ publishFuture1.get();
+ fail("Should have failed.");
+ } catch (ExecutionException e) {
+ }
+
+ try {
+ publishFuture2.get();
+ fail("Should have failed.");
+ } catch (ExecutionException e) {
+ }
+
+ try {
+ publishFuture3.get();
+ fail("Should have failed.");
+ } catch (ExecutionException e) {
+ assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
+ }
+
+ // A subsequent attempt fails immediately.
+ ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a");
+ try {
+ publishFuture4.get();
+ fail("Should have failed.");
+ } catch (ExecutionException e) {
+ assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
+ }
+ }*/
+
private ApiFuture sendTestMessageWithOrderingKey(
Publisher publisher, String data, String orderingKey) {
return publisher.publish(