From b433095717d7f63f03500b071630845869629c6d Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Wed, 7 Feb 2024 10:30:31 -0500 Subject: [PATCH 1/5] chore: change assignees for issues and PRs to michaelpri10 --- .github/blunderbuss.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml index 44ab091e1..97998e463 100644 --- a/.github/blunderbuss.yml +++ b/.github/blunderbuss.yml @@ -1,9 +1,9 @@ # Configuration for the Blunderbuss GitHub app. For more info see # https://github.com/googleapis/repo-automation-bots/tree/main/packages/blunderbuss assign_issues: - - maitrimangal + - michaelpri10 assign_prs: - - maitrimangal + - michaelpri10 assign_prs_by: - labels: - samples From 0af60d75449d06a0af276578a07d246e62570e46 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 8 Feb 2024 19:57:53 -0500 Subject: [PATCH 2/5] feat: Add setUniverseDomain option for Publisher and Subscriber --- .../main/java/com/google/cloud/pubsub/v1/Publisher.java | 8 ++++++++ .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index d0437cb58..c7d252c85 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -185,6 +185,7 @@ private Publisher(Builder builder) throws IOException { .setExecutorProvider(FixedExecutorProvider.create(executor)) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) + .setUniverseDomain(builder.universeDomain) .setHeaderProvider(builder.headerProvider); stubSettings .publishSettings() @@ -718,6 +719,7 @@ public static final class Builder { String topicName; private String endpoint = PublisherStubSettings.getDefaultEndpoint(); + private String universeDomain = null; // Batching options BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; @@ -857,6 +859,12 @@ public Builder setEndpoint(String endpoint) { return this; } + /** Gives the ability to override the universe domain. */ + public Builder setUniverseDomain(String universeDomain) { + this.universeDomain = universeDomain; + return this; + } + /** Gives the ability to enable transport compression. */ public Builder setEnableCompression(boolean enableCompression) { this.enableCompression = enableCompression; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 1d1017dd4..f5eddd2e1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -192,6 +192,7 @@ private Subscriber(Builder builder) { .setTransportChannelProvider(channelProvider) .setHeaderProvider(builder.headerProvider) .setEndpoint(builder.endpoint) + .setUniverseDomain(builder.universeDomain) .build(); // TODO(pongad): what about internal header?? } catch (Exception e) { @@ -492,6 +493,7 @@ public static final class Builder { private Optional clock = Optional.absent(); private int parallelPullCount = 1; private String endpoint = SubscriberStubSettings.getDefaultEndpoint(); + private String universeDomain = null; Builder(String subscription, MessageReceiver receiver) { this.subscription = subscription; @@ -670,6 +672,12 @@ public Builder setEndpoint(String endpoint) { return this; } + /** Gives the ability to override the universe domain. */ + public Builder setUniverseDomain(String universeDomain) { + this.universeDomain = universeDomain; + return this; + } + /** Gives the ability to set a custom clock. */ Builder setClock(ApiClock clock) { this.clock = Optional.of(clock); From 678a3aa9e4a0ad1152c094c27a3eb5af93036e26 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 8 Feb 2024 20:28:12 +0000 Subject: [PATCH 3/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../snippets/src/main/java/pubsub/EodSub.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/EodSub.java diff --git a/samples/snippets/src/main/java/pubsub/EodSub.java b/samples/snippets/src/main/java/pubsub/EodSub.java new file mode 100644 index 000000000..d644db849 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/EodSub.java @@ -0,0 +1,97 @@ +package pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse; +import com.google.cloud.pubsub.v1.AckResponse; +import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.threeten.bp.Duration; + +public class EodSub { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "cloud-pubsub-experiments"; + String subscriptionId = "mike-java-sub"; + + subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId); + } + + public static void subscribeWithExactlyOnceConsumerWithResponseExample( + String projectId, String subscriptionId) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of("cloud-pubsub-experiments", subscriptionId); + + // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse` + // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call. + // When exactly once delivery is enabled on the subscription, the message is guaranteed + // to not be delivered again if the ack future succeeds. + MessageReceiverWithAckResponse receiverWithResponse = + (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> { + try { + // Handle incoming message, then ack the message, and receive an ack response. + // System.out.println("Message received: " + message.getData().toStringUtf8()); + Future ackResponseFuture = consumerWithResponse.ack(); + + // Retrieve the completed future for the ack response from the server. + AckResponse ackResponse = ackResponseFuture.get(); + + switch (ackResponse) { + case SUCCESSFUL: + // Success code means that this MessageID will not be delivered again. + if (message.getData().toStringUtf8().equals("hello #499")) { + System.out.println("Message successfully acked: " + message.getData().toStringUtf8()); + } + break; + case INVALID: + System.out.println( + "Message failed to ack with a response of Invalid. Id: " + + message.getMessageId()); + break; + case PERMISSION_DENIED: + System.out.println( + "Message failed to ack with a response of Permission Denied. Id: " + + message.getMessageId()); + break; + case FAILED_PRECONDITION: + System.out.println( + "Message failed to ack with a response of Failed Precondition. Id: " + + message.getMessageId()); + break; + case OTHER: + System.out.println( + "Message failed to ack with a response of Other. Id: " + + message.getMessageId()); + break; + default: + break; + } + } catch (InterruptedException | ExecutionException e) { + System.out.println( + "MessageId: " + message.getMessageId() + " failed when retrieving future"); + } catch (Throwable t) { + System.out.println("Throwable caught" + t.getMessage()); + } + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse) + .setMaxDurationPerAckExtension(Duration.ofSeconds(600)) + .setMinDurationPerAckExtension(Duration.ofSeconds(300)) + .build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(7200, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } +} From 9b37268f58cbc17763b1d4815aac3241ad0fb5a0 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 12 Feb 2024 19:20:11 +0000 Subject: [PATCH 4/5] feat: Set default endpoint as null for Subscribers and Publishers --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 2 +- .../src/main/java/com/google/cloud/pubsub/v1/Subscriber.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index c7d252c85..efaba6cf1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -718,7 +718,7 @@ public static final class Builder { static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L; String topicName; - private String endpoint = PublisherStubSettings.getDefaultEndpoint(); + private String endpoint = null; private String universeDomain = null; // Batching options diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index f5eddd2e1..1723c72b1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -492,7 +492,7 @@ public static final class Builder { SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build(); private Optional clock = Optional.absent(); private int parallelPullCount = 1; - private String endpoint = SubscriberStubSettings.getDefaultEndpoint(); + private String endpoint = null; private String universeDomain = null; Builder(String subscription, MessageReceiver receiver) { From 9aec34d0f7e69cb10bcfaa550b73c853e21a6c5c Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 12 Feb 2024 19:42:23 +0000 Subject: [PATCH 5/5] chore: Remove unneeded sample snippet --- .../snippets/src/main/java/pubsub/EodSub.java | 97 ------------------- 1 file changed, 97 deletions(-) delete mode 100644 samples/snippets/src/main/java/pubsub/EodSub.java diff --git a/samples/snippets/src/main/java/pubsub/EodSub.java b/samples/snippets/src/main/java/pubsub/EodSub.java deleted file mode 100644 index d644db849..000000000 --- a/samples/snippets/src/main/java/pubsub/EodSub.java +++ /dev/null @@ -1,97 +0,0 @@ -package pubsub; - -import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse; -import com.google.cloud.pubsub.v1.AckResponse; -import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.threeten.bp.Duration; - -public class EodSub { - public static void main(String... args) throws Exception { - // TODO(developer): Replace these variables before running the sample. - String projectId = "cloud-pubsub-experiments"; - String subscriptionId = "mike-java-sub"; - - subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId); - } - - public static void subscribeWithExactlyOnceConsumerWithResponseExample( - String projectId, String subscriptionId) { - ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of("cloud-pubsub-experiments", subscriptionId); - - // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse` - // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call. - // When exactly once delivery is enabled on the subscription, the message is guaranteed - // to not be delivered again if the ack future succeeds. - MessageReceiverWithAckResponse receiverWithResponse = - (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> { - try { - // Handle incoming message, then ack the message, and receive an ack response. - // System.out.println("Message received: " + message.getData().toStringUtf8()); - Future ackResponseFuture = consumerWithResponse.ack(); - - // Retrieve the completed future for the ack response from the server. - AckResponse ackResponse = ackResponseFuture.get(); - - switch (ackResponse) { - case SUCCESSFUL: - // Success code means that this MessageID will not be delivered again. - if (message.getData().toStringUtf8().equals("hello #499")) { - System.out.println("Message successfully acked: " + message.getData().toStringUtf8()); - } - break; - case INVALID: - System.out.println( - "Message failed to ack with a response of Invalid. Id: " - + message.getMessageId()); - break; - case PERMISSION_DENIED: - System.out.println( - "Message failed to ack with a response of Permission Denied. Id: " - + message.getMessageId()); - break; - case FAILED_PRECONDITION: - System.out.println( - "Message failed to ack with a response of Failed Precondition. Id: " - + message.getMessageId()); - break; - case OTHER: - System.out.println( - "Message failed to ack with a response of Other. Id: " - + message.getMessageId()); - break; - default: - break; - } - } catch (InterruptedException | ExecutionException e) { - System.out.println( - "MessageId: " + message.getMessageId() + " failed when retrieving future"); - } catch (Throwable t) { - System.out.println("Throwable caught" + t.getMessage()); - } - }; - - Subscriber subscriber = null; - try { - subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse) - .setMaxDurationPerAckExtension(Duration.ofSeconds(600)) - .setMinDurationPerAckExtension(Duration.ofSeconds(300)) - .build(); - // Start the subscriber. - subscriber.startAsync().awaitRunning(); - System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); - // Allow the subscriber to run for 30s unless an unrecoverable error occurs. - subscriber.awaitTerminated(7200, TimeUnit.SECONDS); - } catch (TimeoutException timeoutException) { - // Shut down the subscriber after 30s. Stop receiving messages. - subscriber.stopAsync(); - } - } -}