Skip to content

Commit

Permalink
Merge branch 'main' into tpc-java-support
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelpri10 committed Feb 12, 2024
2 parents e4c1070 + 678a3aa commit bf48660
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 53 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [1.126.4](https://github.com/googleapis/java-pubsub/compare/v1.126.3...v1.126.4) (2024-02-09)


### Bug Fixes

* Message ordering fix for [#1889](https://github.com/googleapis/java-pubsub/issues/1889) ([#1903](https://github.com/googleapis/java-pubsub/issues/1903)) ([22a87c6](https://github.com/googleapis/java-pubsub/commit/22a87c67f07b55266e277f83f5ceb17d9f32f67e))

## [1.126.3](https://github.com/googleapis/java-pubsub/compare/v1.126.2...v1.126.3) (2024-02-08)


Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ If you are using Maven without the BOM, add this to your dependencies:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.126.3</version>
<version>1.126.4</version>
</dependency>

```
Expand All @@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-pubsub'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-pubsub:1.126.3'
implementation 'com.google.cloud:google-cloud-pubsub:1.126.4'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.126.3"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.126.4"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -409,7 +409,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsub/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-pubsub.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.126.3
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.126.4
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
8 changes: 4 additions & 4 deletions google-cloud-pubsub-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub-bom</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<packaging>pom</packaging>
<parent>
<groupId>com.google.cloud</groupId>
Expand Down Expand Up @@ -52,17 +52,17 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
<version>1.108.4-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-pubsub-v1:current} -->
<version>1.108.5-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-pubsub-v1:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsub-v1</artifactId>
<version>1.108.4-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-pubsub-v1:current} -->
<version>1.108.5-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-pubsub-v1:current} -->
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
4 changes: 2 additions & 2 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<packaging>jar</packaging>
<name>Google Cloud Pub/Sub</name>
<url>https://github.com/googleapis/java-pubsub</url>
<description>Java idiomatic client for Google Cloud Pub/Sub</description>
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub-parent</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
</parent>
<properties>
<site.installationModule>google-cloud-pubsub</site.installationModule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -92,8 +93,8 @@ class MessageDispatcher {
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();
private final ConcurrentMap<String, ReceiptCompleteData> outstandingReceipts =
new ConcurrentHashMap<String, ReceiptCompleteData>();
private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts =
new LinkedHashMap<String, ReceiptCompleteData>();
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
Expand Down Expand Up @@ -397,7 +398,9 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
// process the receipt modack. If that is successful then we process the message.
outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
synchronized (outstandingReceipts) {
outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
}
} else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
// previously-mapped element.
Expand All @@ -417,33 +420,36 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
}

void notifyAckSuccess(AckRequestData ackRequestData) {

if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
List<OutstandingMessage> outstandingBatch = new ArrayList<>();

for (Iterator<Entry<String, ReceiptCompleteData>> it =
outstandingReceipts.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<String, ReceiptCompleteData> receipt = it.next();
// If receipt is complete then add to outstandingBatch to process the batch
if (receipt.getValue().isReceiptComplete()) {
it.remove();
if (pendingMessages.putIfAbsent(
receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
== null) {
outstandingBatch.add(receipt.getValue().getOutstandingMessage());
synchronized (outstandingReceipts) {
if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
List<OutstandingMessage> outstandingBatch = new ArrayList<>();

for (Iterator<Entry<String, ReceiptCompleteData>> it =
outstandingReceipts.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<String, ReceiptCompleteData> receipt = it.next();
// If receipt is complete then add to outstandingBatch to process the batch
if (receipt.getValue().isReceiptComplete()) {
it.remove();
if (pendingMessages.putIfAbsent(
receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
== null) {
outstandingBatch.add(receipt.getValue().getOutstandingMessage());
}
} else {
break;
}
} else {
break;
}
processBatch(outstandingBatch);
}
processBatch(outstandingBatch);
}
}

void notifyAckFailed(AckRequestData ackRequestData) {
outstandingReceipts.remove(ackRequestData.getAckId());
synchronized (outstandingReceipts) {
outstandingReceipts.remove(ackRequestData.getAckId());
}
}

private void processBatch(List<OutstandingMessage> batch) {
Expand Down
4 changes: 2 additions & 2 deletions grpc-google-cloud-pubsub-v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
<version>1.108.4-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-pubsub-v1:current} -->
<version>1.108.5-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-pubsub-v1:current} -->
<name>grpc-google-cloud-pubsub-v1</name>
<description>GRPC library for grpc-google-cloud-pubsub-v1</description>
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub-parent</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
</parent>
<dependencies>
<dependency>
Expand Down
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub-parent</artifactId>
<packaging>pom</packaging>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<name>Google Cloud Pub/Sub Parent</name>
<url>https://github.com/googleapis/java-pubsub</url>
<description>
Expand Down Expand Up @@ -69,17 +69,17 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsub-v1</artifactId>
<version>1.108.4-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-pubsub-v1:current} -->
<version>1.108.5-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-pubsub-v1:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
<version>1.108.4-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-pubsub-v1:current} -->
<version>1.108.5-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-pubsub-v1:current} -->
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
</dependency>

<!-- Test dependencies -->
Expand All @@ -104,7 +104,7 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
4 changes: 2 additions & 2 deletions proto-google-cloud-pubsub-v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsub-v1</artifactId>
<version>1.108.4-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-pubsub-v1:current} -->
<version>1.108.5-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-pubsub-v1:current} -->
<name>proto-google-cloud-pubsub-v1</name>
<description>PROTO library for proto-google-cloud-pubsub-v1</description>
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub-parent</artifactId>
<version>1.126.4-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
<version>1.126.5-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsub:current} -->
</parent>
<dependencies>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions samples/install-without-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.126.3</version>
<version>1.126.4</version>
</dependency>
<!-- [END pubsub_install_without_bom] -->

Expand All @@ -69,7 +69,7 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
6 changes: 3 additions & 3 deletions samples/native-image-sample/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -107,7 +107,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>5.10.1</version>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -134,7 +134,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<plugin>
<groupId>org.graalvm.buildtools</groupId>
<artifactId>native-maven-plugin</artifactId>
<version>0.9.28</version>
<version>0.10.0</version>
<extensions>true</extensions>
<configuration>
<mainClass>pubsub.NativeImagePubSubSample
Expand Down
4 changes: 2 additions & 2 deletions samples/snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.126.4-SNAPSHOT</version>
<version>1.126.5-SNAPSHOT</version>
</dependency>
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Loading

0 comments on commit bf48660

Please sign in to comment.