Skip to content

Commit

Permalink
Update PublisherImplTest.java
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahrogers-google authored Jan 12, 2024
1 parent 3fe0afd commit 3ac2b93
Showing 1 changed file with 100 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
testServer.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
testServer.shutdownNow().awaitTermination(1, TimeUnit.MINUTES);
testChannel.shutdown();
}

Expand Down Expand Up @@ -237,7 +237,7 @@ public void testPublishByShutdown() throws Exception {
assertEquals("2", publishFuture2.get());

fakeExecutor.advanceTime(Duration.ofSeconds(100));
publisher.awaitTermination(10, TimeUnit.SECONDS);
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -498,7 +498,97 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {

assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
/**
* Make sure that resume publishing works as expected:
*
* <ol>
* <li>publish with key orderA which returns a failure.
* <li>publish with key orderA again, which should fail immediately
* <li>publish with key orderB, which should succeed
* <li>resume publishing on key orderA
* <li>publish with key orderA, which should now succeed
* </ol>
*/
public void testResumePublish() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.build())
.setEnableMessageOrdering(true)
.build();

ApiFuture<String> future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA");
ApiFuture<String> future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA");

fakeExecutor.advanceTime(Duration.ofSeconds(1));
assertFalse(future1.isDone());
assertFalse(future2.isDone());

// This exception should stop future publishing to the same key
testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));

fakeExecutor.advanceTime(Duration.ofSeconds(1));

try {
future1.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
}

try {
future2.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
}

// Submit new requests with orderA that should fail.
ApiFuture<String> future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA");
ApiFuture<String> future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA");

try {
future3.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

try {
future4.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

// Submit a new request with orderB, which should succeed
ApiFuture<String> future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB");
ApiFuture<String> future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB");

testPublisherServiceImpl.addPublishResponse(
PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6"));

Assert.assertEquals("5", future5.get());
Assert.assertEquals("6", future6.get());

// Resume publishing of "orderA", which should now succeed
publisher.resumePublish("orderA");

ApiFuture<String> future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA");
ApiFuture<String> future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA");

testPublisherServiceImpl.addPublishResponse(
PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8"));

Assert.assertEquals("7", future7.get());
Assert.assertEquals("8", future8.get());

shutdownTestPublisher(publisher);
}

@Test
Expand Down Expand Up @@ -679,7 +769,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test(expected = ExecutionException.class)
Expand Down Expand Up @@ -708,7 +798,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
} finally {
assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1);
publisher.shutdown();
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}
}

Expand All @@ -732,7 +822,7 @@ public void testPublisherGetters() throws Exception {
assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold());
assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold());
publisher.shutdown();
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
Expand Down Expand Up @@ -933,7 +1023,7 @@ public void testAwaitTermination() throws Exception {
.build();
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
publisher.shutdown();
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
Expand All @@ -944,13 +1034,13 @@ public void testShutDown() throws Exception {
publisher.publish(
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build()))
.andReturn(apiFuture);
EasyMock.expect(publisher.awaitTermination(10, TimeUnit.SECONDS)).andReturn(true);
EasyMock.expect(publisher.awaitTermination(1, TimeUnit.MINUTES)).andReturn(true);
publisher.shutdown();
EasyMock.expectLastCall().once();
EasyMock.replay(publisher);
sendTestMessage(publisher, "A");
publisher.shutdown();
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
Expand Down Expand Up @@ -1222,6 +1312,6 @@ private Builder getTestPublisherBuilder() {
private void shutdownTestPublisher(Publisher publisher) throws InterruptedException {
publisher.shutdown();
fakeExecutor.advanceTime(Duration.ofSeconds(10));
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
}
}

0 comments on commit 3ac2b93

Please sign in to comment.