From be78e64f9fdfc9ddf0790189311fac673754e219 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Sat, 12 Oct 2024 01:10:18 +0200 Subject: [PATCH] feat: track batch size using serialized size of PublishRequest (#2113) * feat: track batch size using serialized size of PublishRequest * fix: compare against batchedBytes instead of messageSize in flush condition * fix: also count static overhead in flow control * fix: adjust thresholds in tests * fix: clean up merge issue * fix: revert use of topicNameSize in MessageFlowController * fix: store topicNameSize as initialBatchedBytes in MessagesBatch --------- Co-authored-by: Mike Prieto --- .../com/google/cloud/pubsub/v1/Publisher.java | 21 ++++++++++++++----- .../cloud/pubsub/v1/PublisherImplTest.java | 8 +++---- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 99d0be17b..3713cf69b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -46,6 +46,7 @@ import com.google.cloud.pubsub.v1.stub.PublisherStub; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.common.base.Preconditions; +import com.google.protobuf.CodedOutputStream; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; @@ -99,6 +100,7 @@ public class Publisher implements PublisherInterface { private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1"; private final String topicName; + private final int topicNameSize; private final BatchingSettings batchingSettings; private final boolean enableMessageOrdering; @@ -145,6 +147,8 @@ public static long getApiMaxRequestBytes() { private Publisher(Builder builder) throws IOException { topicName = builder.topicName; + topicNameSize = + CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName); this.batchingSettings = builder.batchingSettings; FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings(); @@ -309,7 +313,7 @@ public ApiFuture publish(PubsubMessage message) { } MessagesBatch messagesBatch = messagesBatches.get(orderingKey); if (messagesBatch == null) { - messagesBatch = new MessagesBatch(batchingSettings, orderingKey); + messagesBatch = new MessagesBatch(batchingSettings, topicNameSize, orderingKey); messagesBatches.put(orderingKey, messagesBatch); } @@ -636,7 +640,9 @@ private static final class OutstandingPublish { OutstandingPublish(PubsubMessageWrapper messageWrapper) { this.publishResult = SettableApiFuture.create(); this.messageWrapper = messageWrapper; - this.messageSize = messageWrapper.getPubsubMessage().getSerializedSize(); + this.messageSize = + CodedOutputStream.computeMessageSize( + PublishRequest.MESSAGES_FIELD_NUMBER, messageWrapper.getPubsubMessage()); } } @@ -1093,12 +1099,15 @@ void release(long messageSize) { private class MessagesBatch { private List messages; + private int initialBatchedBytes; private int batchedBytes; private String orderingKey; private final BatchingSettings batchingSettings; - private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) { + private MessagesBatch( + BatchingSettings batchingSettings, int initialBatchedBytes, String orderingKey) { this.batchingSettings = batchingSettings; + this.initialBatchedBytes = initialBatchedBytes; this.orderingKey = orderingKey; reset(); } @@ -1111,7 +1120,7 @@ private OutstandingBatch popOutstandingBatch() { private void reset() { messages = new LinkedList<>(); - batchedBytes = 0; + batchedBytes = initialBatchedBytes; } private boolean isEmpty() { @@ -1150,7 +1159,9 @@ && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) { // immediately. // Alternatively if after adding the message we have reached the batch max messages then we // have a batch to send. - if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) + // Note that exceeding {@link Publisher#getApiMaxRequestBytes()} will result in failed + // publishes without compression and may yet fail if a request is not sufficiently compressed. + if ((hasBatchingBytes() && getBatchedBytes() >= getMaxBatchBytes()) || getMessagesCount() == batchingSettings.getElementCountThreshold()) { batchesToSend.add(popOutstandingBatch()); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 219326db5..411b61d15 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -447,7 +447,7 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(10L) - .setRequestByteThreshold(20L) + .setRequestByteThreshold(64L) .setDelayThreshold(Duration.ofSeconds(100)) .build()) .setEnableMessageOrdering(true) @@ -1150,7 +1150,7 @@ public void testPublishFlowControl_throwException() throws Exception { .setLimitExceededBehavior( FlowController.LimitExceededBehavior.ThrowException) .setMaxOutstandingElementCount(1L) - .setMaxOutstandingRequestBytes(10L) + .setMaxOutstandingRequestBytes(13L) .build()) .build()) .build(); @@ -1192,7 +1192,7 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except .setLimitExceededBehavior( FlowController.LimitExceededBehavior.ThrowException) .setMaxOutstandingElementCount(1L) - .setMaxOutstandingRequestBytes(10L) + .setMaxOutstandingRequestBytes(13L) .build()) .build()) .setEnableMessageOrdering(true) @@ -1238,7 +1238,7 @@ public void testPublishFlowControl_block() throws Exception { FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .setMaxOutstandingElementCount(2L) - .setMaxOutstandingRequestBytes(10L) + .setMaxOutstandingRequestBytes(13L) .build()) .build()) .build();