diff --git a/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java b/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java index 4f2ba158a..26b0fe439 100644 --- a/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java +++ b/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java @@ -18,10 +18,12 @@ // [START pubsub_optimistic_subscribe] +import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; @@ -58,22 +60,35 @@ public static void optimisticSubscribeExample( Subscriber subscriber = null; try { subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + + // Listen for resource NOT_FOUND errors and rebuild the subscriber and restart subscribing + // when the current subscriber encounters these errors. + subscriber.addListener( + new Subscriber.Listener() { + public void failed(Subscriber.State from, Throwable failure) { + System.out.println(failure.getStackTrace()); + if (failure instanceof NotFoundException) { + try (SubscriptionAdminClient subscriptionAdminClient = + SubscriptionAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + // Create a pull subscription with default acknowledgement deadline of 10 seconds. + // The client library will automatically extend acknowledgement deadlines. + Subscription subscription = + subscriptionAdminClient.createSubscription( + subscriptionName, topicName, PushConfig.getDefaultInstance(), 10); + System.out.println("Created pull subscription: " + subscription.getName()); + optimisticSubscribeExample(projectId, subscriptionId, topicId); + } catch (IOException err) { + System.out.println("Failed to create pull subscription: " + err.getMessage()); + } + } + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync().awaitRunning(); System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); subscriber.awaitTerminated(30, TimeUnit.SECONDS); - } catch (IllegalStateException e) { - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - TopicName topicName = TopicName.of(projectId, topicId); - // Create a pull subscription with default acknowledgement deadline of 10 seconds. - // Messages not successfully acknowledged within 10 seconds will get resent by the server. - Subscription subscription = - subscriptionAdminClient.createSubscription( - subscriptionName, topicName, PushConfig.getDefaultInstance(), 10); - System.out.println("Created pull subscription: " + subscription.getName()); - optimisticSubscribeExample(projectId, subscriptionId, topicId); - } catch (IOException err) { - System.out.println("Failed to create pull subscription: " + err.getMessage()); - } } catch (TimeoutException e) { subscriber.stopAsync(); }