Skip to content

Commit

Permalink
docs(samples): Use an error listener instead of catching an exception…
Browse files Browse the repository at this point in the history
… for the OptimisticSubscribeExample
  • Loading branch information
michaelpri10 committed Jun 25, 2024
1 parent b75b471 commit 6002fb4
Showing 1 changed file with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

// [START pubsub_optimistic_subscribe]

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
Expand Down Expand Up @@ -58,22 +60,35 @@ public static void optimisticSubscribeExample(
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

// Listen for resource NOT_FOUND errors and rebuild the subscriber and restart subscribing
// when the current subscriber encounters these errors.
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
System.out.println(failure.getStackTrace());
if (failure instanceof NotFoundException) {
try (SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a pull subscription with default acknowledgement deadline of 10 seconds.
// The client library will automatically extend acknowledgement deadlines.
Subscription subscription =
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
System.out.println("Created pull subscription: " + subscription.getName());
optimisticSubscribeExample(projectId, subscriptionId, topicId);
} catch (IOException err) {
System.out.println("Failed to create pull subscription: " + err.getMessage());
}
}
}
},
MoreExecutors.directExecutor());

subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (IllegalStateException e) {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a pull subscription with default acknowledgement deadline of 10 seconds.
// Messages not successfully acknowledged within 10 seconds will get resent by the server.
Subscription subscription =
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
System.out.println("Created pull subscription: " + subscription.getName());
optimisticSubscribeExample(projectId, subscriptionId, topicId);
} catch (IOException err) {
System.out.println("Failed to create pull subscription: " + err.getMessage());
}
} catch (TimeoutException e) {
subscriber.stopAsync();
}
Expand Down

0 comments on commit 6002fb4

Please sign in to comment.