diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8721be429..92946f184 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,12 @@
# Changelog
+## [1.126.4](https://github.com/googleapis/java-pubsub/compare/v1.126.3...v1.126.4) (2024-02-09)
+
+
+### Bug Fixes
+
+* Message ordering fix for [#1889](https://github.com/googleapis/java-pubsub/issues/1889) ([#1903](https://github.com/googleapis/java-pubsub/issues/1903)) ([22a87c6](https://github.com/googleapis/java-pubsub/commit/22a87c67f07b55266e277f83f5ceb17d9f32f67e))
+
## [1.126.3](https://github.com/googleapis/java-pubsub/compare/v1.126.2...v1.126.3) (2024-02-08)
diff --git a/README.md b/README.md
index 89680132d..e5617be6d 100644
--- a/README.md
+++ b/README.md
@@ -44,7 +44,7 @@ If you are using Maven without the BOM, add this to your dependencies:
com.google.cloud
google-cloud-pubsub
- 1.126.3
+ 1.126.4
```
@@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-pubsub'
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-pubsub:1.126.3'
+implementation 'com.google.cloud:google-cloud-pubsub:1.126.4'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.126.3"
+libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.126.4"
```
@@ -409,7 +409,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsub/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-pubsub.svg
-[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.126.3
+[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.126.4
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
diff --git a/google-cloud-pubsub-bom/pom.xml b/google-cloud-pubsub-bom/pom.xml
index 276983e5d..53f63069f 100644
--- a/google-cloud-pubsub-bom/pom.xml
+++ b/google-cloud-pubsub-bom/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-pubsub-bom
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
pom
com.google.cloud
@@ -52,17 +52,17 @@
com.google.cloud
google-cloud-pubsub
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
com.google.api.grpc
grpc-google-cloud-pubsub-v1
- 1.108.4-SNAPSHOT
+ 1.108.5-SNAPSHOT
com.google.api.grpc
proto-google-cloud-pubsub-v1
- 1.108.4-SNAPSHOT
+ 1.108.5-SNAPSHOT
diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml
index 860c849af..567acd183 100644
--- a/google-cloud-pubsub/pom.xml
+++ b/google-cloud-pubsub/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-pubsub
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
jar
Google Cloud Pub/Sub
https://github.com/googleapis/java-pubsub
@@ -11,7 +11,7 @@
com.google.cloud
google-cloud-pubsub-parent
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
google-cloud-pubsub
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
index 635bc92d5..1810badd2 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
@@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -92,8 +93,8 @@ class MessageDispatcher {
private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>();
- private final ConcurrentMap outstandingReceipts =
- new ConcurrentHashMap();
+ private final LinkedHashMap outstandingReceipts =
+ new LinkedHashMap();
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
@@ -397,7 +398,9 @@ void processReceivedMessages(List messages) {
if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
// process the receipt modack. If that is successful then we process the message.
- outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
+ synchronized (outstandingReceipts) {
+ outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
+ }
} else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
// previously-mapped element.
@@ -417,33 +420,36 @@ void processReceivedMessages(List messages) {
}
void notifyAckSuccess(AckRequestData ackRequestData) {
-
- if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
- outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
- List outstandingBatch = new ArrayList<>();
-
- for (Iterator> it =
- outstandingReceipts.entrySet().iterator();
- it.hasNext(); ) {
- Map.Entry receipt = it.next();
- // If receipt is complete then add to outstandingBatch to process the batch
- if (receipt.getValue().isReceiptComplete()) {
- it.remove();
- if (pendingMessages.putIfAbsent(
- receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
- == null) {
- outstandingBatch.add(receipt.getValue().getOutstandingMessage());
+ synchronized (outstandingReceipts) {
+ if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
+ outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
+ List outstandingBatch = new ArrayList<>();
+
+ for (Iterator> it =
+ outstandingReceipts.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry receipt = it.next();
+ // If receipt is complete then add to outstandingBatch to process the batch
+ if (receipt.getValue().isReceiptComplete()) {
+ it.remove();
+ if (pendingMessages.putIfAbsent(
+ receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
+ == null) {
+ outstandingBatch.add(receipt.getValue().getOutstandingMessage());
+ }
+ } else {
+ break;
}
- } else {
- break;
}
+ processBatch(outstandingBatch);
}
- processBatch(outstandingBatch);
}
}
void notifyAckFailed(AckRequestData ackRequestData) {
- outstandingReceipts.remove(ackRequestData.getAckId());
+ synchronized (outstandingReceipts) {
+ outstandingReceipts.remove(ackRequestData.getAckId());
+ }
}
private void processBatch(List batch) {
diff --git a/grpc-google-cloud-pubsub-v1/pom.xml b/grpc-google-cloud-pubsub-v1/pom.xml
index 2b9af8e22..5eed7d4f0 100644
--- a/grpc-google-cloud-pubsub-v1/pom.xml
+++ b/grpc-google-cloud-pubsub-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-pubsub-v1
- 1.108.4-SNAPSHOT
+ 1.108.5-SNAPSHOT
grpc-google-cloud-pubsub-v1
GRPC library for grpc-google-cloud-pubsub-v1
com.google.cloud
google-cloud-pubsub-parent
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index 8c5700e1e..4271f426d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-pubsub-parent
pom
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
Google Cloud Pub/Sub Parent
https://github.com/googleapis/java-pubsub
@@ -69,17 +69,17 @@
com.google.api.grpc
proto-google-cloud-pubsub-v1
- 1.108.4-SNAPSHOT
+ 1.108.5-SNAPSHOT
com.google.api.grpc
grpc-google-cloud-pubsub-v1
- 1.108.4-SNAPSHOT
+ 1.108.5-SNAPSHOT
com.google.cloud
google-cloud-pubsub
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
@@ -104,7 +104,7 @@
com.google.truth
truth
- 1.3.0
+ 1.4.0
test
diff --git a/proto-google-cloud-pubsub-v1/pom.xml b/proto-google-cloud-pubsub-v1/pom.xml
index 0ad7b2a8f..f0c1bcae9 100644
--- a/proto-google-cloud-pubsub-v1/pom.xml
+++ b/proto-google-cloud-pubsub-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-pubsub-v1
- 1.108.4-SNAPSHOT
+ 1.108.5-SNAPSHOT
proto-google-cloud-pubsub-v1
PROTO library for proto-google-cloud-pubsub-v1
com.google.cloud
google-cloud-pubsub-parent
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index b6169a62c..79d2e3bd2 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsub
- 1.126.3
+ 1.126.4
@@ -69,7 +69,7 @@
com.google.truth
truth
- 1.3.0
+ 1.4.0
test
diff --git a/samples/native-image-sample/pom.xml b/samples/native-image-sample/pom.xml
index 3a75434c4..e114bce2d 100644
--- a/samples/native-image-sample/pom.xml
+++ b/samples/native-image-sample/pom.xml
@@ -56,7 +56,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
com.google.truth
truth
- 1.3.0
+ 1.4.0
test
@@ -107,7 +107,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
org.junit.vintage
junit-vintage-engine
- 5.10.1
+ 5.10.2
test
@@ -134,7 +134,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
org.graalvm.buildtools
native-maven-plugin
- 0.9.28
+ 0.10.0
true
pubsub.NativeImagePubSubSample
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 94159b432..62ac26115 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -43,7 +43,7 @@
com.google.cloud
google-cloud-pubsub
- 1.126.4-SNAPSHOT
+ 1.126.5-SNAPSHOT
@@ -73,7 +73,7 @@
com.google.truth
truth
- 1.3.0
+ 1.4.0
test
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index dec1fffa3..34c2c4bde 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -92,7 +92,7 @@
com.google.truth
truth
- 1.3.0
+ 1.4.0
test
diff --git a/samples/snippets/src/main/java/pubsub/EodSub.java b/samples/snippets/src/main/java/pubsub/EodSub.java
new file mode 100644
index 000000000..d644db849
--- /dev/null
+++ b/samples/snippets/src/main/java/pubsub/EodSub.java
@@ -0,0 +1,97 @@
+package pubsub;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
+import com.google.cloud.pubsub.v1.AckResponse;
+import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.threeten.bp.Duration;
+
+public class EodSub {
+ public static void main(String... args) throws Exception {
+ // TODO(developer): Replace these variables before running the sample.
+ String projectId = "cloud-pubsub-experiments";
+ String subscriptionId = "mike-java-sub";
+
+ subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
+ }
+
+ public static void subscribeWithExactlyOnceConsumerWithResponseExample(
+ String projectId, String subscriptionId) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of("cloud-pubsub-experiments", subscriptionId);
+
+ // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
+ // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
+ // When exactly once delivery is enabled on the subscription, the message is guaranteed
+ // to not be delivered again if the ack future succeeds.
+ MessageReceiverWithAckResponse receiverWithResponse =
+ (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
+ try {
+ // Handle incoming message, then ack the message, and receive an ack response.
+ // System.out.println("Message received: " + message.getData().toStringUtf8());
+ Future ackResponseFuture = consumerWithResponse.ack();
+
+ // Retrieve the completed future for the ack response from the server.
+ AckResponse ackResponse = ackResponseFuture.get();
+
+ switch (ackResponse) {
+ case SUCCESSFUL:
+ // Success code means that this MessageID will not be delivered again.
+ if (message.getData().toStringUtf8().equals("hello #499")) {
+ System.out.println("Message successfully acked: " + message.getData().toStringUtf8());
+ }
+ break;
+ case INVALID:
+ System.out.println(
+ "Message failed to ack with a response of Invalid. Id: "
+ + message.getMessageId());
+ break;
+ case PERMISSION_DENIED:
+ System.out.println(
+ "Message failed to ack with a response of Permission Denied. Id: "
+ + message.getMessageId());
+ break;
+ case FAILED_PRECONDITION:
+ System.out.println(
+ "Message failed to ack with a response of Failed Precondition. Id: "
+ + message.getMessageId());
+ break;
+ case OTHER:
+ System.out.println(
+ "Message failed to ack with a response of Other. Id: "
+ + message.getMessageId());
+ break;
+ default:
+ break;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ System.out.println(
+ "MessageId: " + message.getMessageId() + " failed when retrieving future");
+ } catch (Throwable t) {
+ System.out.println("Throwable caught" + t.getMessage());
+ }
+ };
+
+ Subscriber subscriber = null;
+ try {
+ subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse)
+ .setMaxDurationPerAckExtension(Duration.ofSeconds(600))
+ .setMinDurationPerAckExtension(Duration.ofSeconds(300))
+ .build();
+ // Start the subscriber.
+ subscriber.startAsync().awaitRunning();
+ System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
+ // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
+ subscriber.awaitTerminated(7200, TimeUnit.SECONDS);
+ } catch (TimeoutException timeoutException) {
+ // Shut down the subscriber after 30s. Stop receiving messages.
+ subscriber.stopAsync();
+ }
+ }
+}
diff --git a/versions.txt b/versions.txt
index a8c7ca076..cff6d8a28 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,6 +1,6 @@
# Format:
# module:released-version:current-version
-google-cloud-pubsub:1.126.3:1.126.4-SNAPSHOT
-grpc-google-cloud-pubsub-v1:1.108.3:1.108.4-SNAPSHOT
-proto-google-cloud-pubsub-v1:1.108.3:1.108.4-SNAPSHOT
+google-cloud-pubsub:1.126.4:1.126.5-SNAPSHOT
+grpc-google-cloud-pubsub-v1:1.108.4:1.108.5-SNAPSHOT
+proto-google-cloud-pubsub-v1:1.108.4:1.108.5-SNAPSHOT