Skip to content

Commit

Permalink
Update PublisherImplTest.java
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahrogers-google authored Jan 12, 2024
1 parent 58a8441 commit 3fe0afd
Showing 1 changed file with 60 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,6 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception {
shutdownTestPublisher(publisher);
}

private ApiFuture<String> sendTestMessageWithOrderingKey(
Publisher publisher, String data, String orderingKey) {
return publisher.publish(
PubsubMessage.newBuilder()
.setOrderingKey(orderingKey)
.setData(ByteString.copyFromUtf8(data))
.build());
}

@Test
public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
// Limit the batching timeout to 100 seconds.
Expand Down Expand Up @@ -510,6 +501,66 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {
assertTrue(publisher.awaitTermination(10, TimeUnit.SECONDS));
}

@Test
public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(500))
.build())
.setEnableMessageOrdering(true)
.build();

// Send two messages that will fulfill the first batch, which will return a failure.
testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
ApiFuture<String> publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a");
ApiFuture<String> publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a");

// A third message will fail because the first attempt to publish failed.
ApiFuture<String> publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a");

try {
publishFuture1.get();
fail("Should have failed.");
} catch (ExecutionException e) {
}

try {
publishFuture2.get();
fail("Should have failed.");
} catch (ExecutionException e) {
}

try {
publishFuture3.get();
fail("Should have failed.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

// A subsequent attempt fails immediately.
ApiFuture<String> publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a");
try {
publishFuture4.get();
fail("Should have failed.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}
}

private ApiFuture<String> sendTestMessageWithOrderingKey(
Publisher publisher, String data, String orderingKey) {
return publisher.publish(
PubsubMessage.newBuilder()
.setOrderingKey(orderingKey)
.setData(ByteString.copyFromUtf8(data))
.build());
}

@Test
public void testErrorPropagation() throws Exception {
Publisher publisher =
Expand Down

0 comments on commit 3fe0afd

Please sign in to comment.