From 543099bd0c13873f0d4f5961eb1263e9e547d6da Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:22:39 -0800 Subject: [PATCH] fix: temporarily remove publisher tests causing timeouts --- .../cloud/pubsub/v1/PublisherImplTest.java | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 9efea5e5c..500ee5eca 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -500,6 +500,150 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } + /* + Temporarily disabled due to https://github.com/googleapis/java-pubsub/issues/1861. + TODO(maitrimangal): Enable once resolved. + @Test + /** + * Make sure that resume publishing works as expected: + * + * <ol> + * <li>publish with key orderA which returns a failure. + * <li>publish with key orderA again, which should fail immediately + * <li>publish with key orderB, which should succeed + * <li>resume publishing on key orderA + * <li>publish with key orderA, which should now succeed + * </ol> + */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture<String> future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture<String> future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ZERO); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ZERO); + + try { + future1.get(); + fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture<String> future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture<String> future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture<String> future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture<String> future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture<String> future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture<String> future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + shutdownTestPublisher(publisher); + } + + @Test + public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(500)) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Send two messages that will fulfill the first batch, which will return a failure. + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture<String> publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); + ApiFuture<String> publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); + + // A third message will fail because the first attempt to publish failed. + ApiFuture<String> publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); + + try { + publishFuture1.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture2.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture3.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // A subsequent attempt fails immediately. + ApiFuture<String> publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); + try { + publishFuture4.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + }*/ + private ApiFuture<String> sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish(