From 7a36b1bf92e0cf07a1f82ae624b9f24a383b66c7 Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Wed, 3 Jan 2024 11:12:16 +0100 Subject: [PATCH] Add PubSub batch initialization to improve start up time in multi tenant instances Fix PubSub tenant topic/subscription clean up on tenant delete Signed-off-by: Matthias Kaemmer --- .../api/config/PubSubConstants.java | 16 +- .../InternalMessageSubscriber.java | 7 + .../InternalTopicManagerImpl.java | 152 ++++++- .../PubSubBasedAdminClientManager.java | 422 ++++++++++++++++++ .../PubSubBasedAdminClientManagerFactory.java | 2 - ...SubBasedAdminClientManagerFactoryImpl.java | 1 - .../service/communication/PubSubService.java | 79 ++-- .../core/app/InternalMessagingConfig.java | 8 + .../src/main/resources/application.yaml | 7 +- .../InternalTopicManagerImplTest.java | 46 +- .../communication/PubSubServiceTest.java | 22 +- 11 files changed, 664 insertions(+), 98 deletions(-) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManager.java diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java index c6adb524..4486a71e 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java @@ -29,20 +29,30 @@ public final class PubSubConstants { public static final String EVENT_STATES_SUBTOPIC_ENDPOINT = "event.state"; public static final String COMMAND_ENDPOINT = "command"; public static final String COMMAND_RESPONSE_ENDPOINT = "command_response"; + public static final String COMMUNICATION_API_SUBSCRIPTION_NAME = "%s-communication-api"; private PubSubConstants() { } /** - * Gets the list of all topics need to be created per tenant. + * Gets the list of all endpoints for which a topic per tenant has to be created. * - * @return List of all topics. + * @return List of all topic endpoints. */ - public static List getTenantTopics() { + public static List getTenantEndpoints() { return List.of(EVENT_ENDPOINT, COMMAND_ENDPOINT, COMMAND_RESPONSE_ENDPOINT, EVENT_STATES_SUBTOPIC_ENDPOINT, TELEMETRY_ENDPOINT); } + + /** + * Gets the list of all endpoints for which an additional subscription per tenant has to be created. + * + * @return List of all endpoints that need an additional subscription. + */ + public static List getEndpointsWithAdditionalSubscription() { + return List.of(EVENT_ENDPOINT, COMMAND_RESPONSE_ENDPOINT, EVENT_STATES_SUBTOPIC_ENDPOINT); + } } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java index 18892363..82394616 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java @@ -30,4 +30,11 @@ public interface InternalMessageSubscriber { * @param callbackHandler The function to be called when a message is received */ void subscribe(String topic, MessageReceiver callbackHandler); + + /** + * Closes all active subscribers for the given tenant. + * + * @param tenant The tenant whose active subscribers should be closed. + */ + void closeSubscribersForTenant(String tenant); } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java index 3c21627c..86a10842 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java @@ -17,9 +17,13 @@ package org.eclipse.hono.communication.api.service.communication; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; +import org.apache.commons.lang3.StringUtils; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.communication.api.config.PubSubConstants; import org.eclipse.hono.communication.api.handler.CommandTopicEventHandler; @@ -37,6 +41,7 @@ import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.common.base.Strings; import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; @@ -96,16 +101,115 @@ public InternalTopicManagerImpl(final DeviceRepository deviceRepository, @Override public void initPubSub() { log.info("Initialize tenant topics and subscriptions."); - internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges); + vertx.executeBlocking(promise -> { + internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges); + promise.complete(); + }); deviceRepository.listDistinctTenants() .onFailure(err -> log.error("Error getting tenants for topic creation: {}", err.getMessage())) .onSuccess(tenants -> { + if (tenants.size() < internalMessagingConfig.getBatchInitTenantThreshold()) { + for (String tenant : tenants) { + initPubSubForTenant(tenant); + } + } else { + batchInitPubSubResources(tenants); + } + }); + } + + private void batchInitPubSubResources(final List tenants) { + log.info("Start batchInitPubSubResources"); + findMissingPubSubResources(tenants).compose(map -> { + final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory + .createAdminClientManager(); + final List pubsubRequests = new ArrayList<>(); + for (Map.Entry missingTopic : map.get("missingTopics").entrySet()) { + final TopicName topic = TopicName.parse(missingTopic.getKey()); + pubsubRequests.add(adminClientManager.createTopic(topic)); + } + for (Map.Entry missingSubscription : map.get("missingSubscriptions").entrySet()) { + final SubscriptionName subscription = SubscriptionName.parse(missingSubscription.getKey()); + final TopicName topic = TopicName.parse(missingSubscription.getValue()); + pubsubRequests.add(adminClientManager.createSubscription(subscription, topic)); + } + for (Map.Entry faultySubscription : map.get("faultySubscriptions").entrySet()) { + final SubscriptionName subscription = SubscriptionName.parse(faultySubscription.getKey()); + final TopicName topic = TopicName.parse(faultySubscription.getValue()); + pubsubRequests.add(adminClientManager.updateSubscriptionTopic(subscription, topic)); + } + return CompositeFuture.join(pubsubRequests).onComplete(i -> { + adminClientManager.closeAdminClients(); + log.info("Finished batchInitPubSubResources"); + }); + }).onFailure(e -> log.error("batchInitPubSubResources failed", e)) + .onSuccess(i -> { for (String tenant : tenants) { - initPubSubForTenant(tenant); + vertx.executeBlocking(promise -> subscribeToTenantTopics(tenant)); } }); } + private Future>> findMissingPubSubResources(final List tenants) { + final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = adminClientManagerFactory + .createAdminClientManager(); + final Future> topicsFuture = pubSubBasedAdminClientManager.listTopics(); + final Future> subscriptionsFuture = pubSubBasedAdminClientManager.listSubscriptions(); + final Map missingTopics = new HashMap<>(); + final Map missingSubscriptions = new HashMap<>(); + final Map faultySubscriptions = new HashMap<>(); + return CompositeFuture.join(topicsFuture, subscriptionsFuture).map(i -> { + final Set topicSet = topicsFuture.result(); + final Set subscriptionSet = subscriptionsFuture.result(); + final Map subscriptionMap = new HashMap<>(); + subscriptionSet.forEach(s -> subscriptionMap.put(s.getName(), s)); + for (String tenantEndpoint : PubSubConstants.getTenantEndpoints()) { + for (String tenant : tenants) { + final String topic = String + .valueOf(TopicName.of(projectId, PubSubMessageHelper.getTopicName(tenantEndpoint, tenant))); + addMissingTopicToMap(topicSet, topic, missingTopics); + final String subscription = String.valueOf( + SubscriptionName.of(projectId, PubSubMessageHelper.getTopicName(tenantEndpoint, tenant))); + addMissingOrFaultySubscription(subscriptionMap, missingSubscriptions, faultySubscriptions, + subscription, topic); + if (PubSubConstants.getEndpointsWithAdditionalSubscription().contains(tenantEndpoint)) { + final String apiSubscription = String.valueOf(SubscriptionName.of(projectId, + PubSubMessageHelper.getTopicName( + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, + tenantEndpoint), + tenant))); + addMissingOrFaultySubscription(subscriptionMap, missingSubscriptions, faultySubscriptions, + apiSubscription, topic); + } + } + } + return Map.of("missingTopics", missingTopics, "missingSubscriptions", missingSubscriptions, + "faultySubscriptions", faultySubscriptions); + }).onFailure(thr -> log.error("Cannot find missing Pub/Sub resources", thr)) + .onComplete(i -> pubSubBasedAdminClientManager.closeAdminClients()); + } + + private void addMissingTopicToMap(final Set topicSet, final String topic, + final Map missingTopics) { + if (!topicSet.contains(topic)) { + missingTopics.put(topic, ""); + } + } + + private void addMissingOrFaultySubscription(final Map subscriptionMap, + final Map missingSubscriptions, final Map faultySubscriptions, + final String subscription, final String topic) { + if (!subscriptionMap.containsKey(subscription)) { + missingSubscriptions.put(subscription, topic); + return; + } + final String deletedTopic = "_deleted-topic_"; + if (subscriptionMap.get(subscription) != null + && StringUtils.equals(subscriptionMap.get(subscription).getTopic(), deletedTopic)) { + faultySubscriptions.put(subscription, topic); + } + } + /** * Handle incoming tenant CREATE notifications. * @@ -165,12 +269,17 @@ private Future createPubSubResourceForTenant(final String tenantId, final PubSubResourceType pubSubResourceType, final PubSubBasedAdminClientManager pubSubBasedAdminClientManager) { final List futureList = new ArrayList<>(); - final List topics = PubSubConstants.getTenantTopics(); - for (String topic : topics) { + for (String tenantEndpoint : PubSubConstants.getTenantEndpoints()) { if (pubSubResourceType == PubSubResourceType.TOPIC) { - futureList.add(pubSubBasedAdminClientManager.getOrCreateTopic(topic, tenantId)); + futureList.add(pubSubBasedAdminClientManager.getOrCreateTopic(tenantEndpoint, tenantId)); } else { - futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(topic, tenantId)); + futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(tenantEndpoint, tenantEndpoint, + tenantId)); + if (PubSubConstants.getEndpointsWithAdditionalSubscription().contains(tenantEndpoint)) { + futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(tenantEndpoint, + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, tenantEndpoint), + tenantId)); + } } } return CompositeFuture.join(futureList) @@ -198,18 +307,31 @@ private void subscribeToTenantTopics(final String tenant) { } private void cleanupPubSubResources(final String tenant) { - final List pubSubTopicsToDelete = PubSubConstants.getTenantTopics().stream() - .map(id -> TopicName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList(); - final List pubSubSubscriptionsToDelete = PubSubConstants.getTenantTopics().stream() - .map(id -> SubscriptionName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList(); + final List pubSubTopicsToDelete = PubSubConstants.getTenantEndpoints().stream().map( + endpoint -> String.valueOf(TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, tenant)))) + .toList(); + final List pubSubSubscriptionsToDelete = Stream.concat(PubSubConstants.getTenantEndpoints().stream() + .map(endpoint -> String + .valueOf(SubscriptionName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, tenant)))), + PubSubConstants.getEndpointsWithAdditionalSubscription().stream() + .map(endpoint -> String.valueOf(SubscriptionName.of(projectId, + PubSubMessageHelper.getTopicName( + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, endpoint), + tenant))))) + .toList(); + internalMessaging.closeSubscribersForTenant(tenant); PubSubMessageHelper.getCredentialsProvider() .ifPresentOrElse(provider -> { final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = adminClientManagerFactory .createAdminClientManager(); - pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete); - pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete); - log.info("All topics and subscriptions for tenant {} were deleted successfully.", tenant); - pubSubBasedAdminClientManager.closeAdminClients(); + CompositeFuture.join(pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete), + pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete)) + .onSuccess(compFuture -> log.info( + "All topics and subscriptions of tenant {} were deleted successfully.", tenant)) + .onFailure(throwable -> log.warn( + "Some topics or subscriptions of tenant {} could not be deleted.", tenant, + throwable)) + .onComplete(compFuture -> pubSubBasedAdminClientManager.closeAdminClients()); }, () -> log.error("credentials provider is empty")); } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManager.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManager.java new file mode 100644 index 00000000..bd75e954 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManager.java @@ -0,0 +1,422 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.communication.api.service.communication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient.ListSubscriptionsPagedResponse; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient.ListTopicsPagedResponse; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.protobuf.util.Durations; +import com.google.pubsub.v1.ExpirationPolicy; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +/** + * A Pub/Sub based admin client manager to manage topics and subscriptions. Wraps a TopicAdminClient and a + * SubscriptionAdminClient. + */ +public class PubSubBasedAdminClientManager { + + private static final Logger LOG = LoggerFactory.getLogger(PubSubBasedAdminClientManager.class); + + /** + * The message retention in milliseconds for a Pub/Sub subscription. + */ + private static final long MESSAGE_RETENTION = 600000; + private final String projectId; + private final CredentialsProvider credentialsProvider; + private final Vertx vertx; + private SubscriptionAdminClient subscriptionAdminClient; + private TopicAdminClient topicAdminClient; + + /** + * Creates a new PubSubBasedAdminClientManager. + * + * @param pubSubConfigProperties The Pub/Sub config properties containing the Google project ID. + * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @param vertx The Vert.x instance to use. + * @throws NullPointerException if vertx, credentialsProvider or projectId is {@code null}. + */ + public PubSubBasedAdminClientManager(final PubSubConfigProperties pubSubConfigProperties, + final CredentialsProvider credentialsProvider, final Vertx vertx) { + Objects.requireNonNull(pubSubConfigProperties); + this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId()); + this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + this.vertx = Objects.requireNonNull(vertx); + } + + private Future getOrCreateTopicAdminClient() { + if (topicAdminClient != null) { + return Future.succeededFuture(topicAdminClient); + } + try { + final TopicAdminSettings adminSettings = TopicAdminSettings + .newBuilder() + .setCredentialsProvider(credentialsProvider) + .build(); + topicAdminClient = TopicAdminClient.create(adminSettings); + return Future.succeededFuture(topicAdminClient); + } catch (IOException e) { + LOG.error("Error initializing topic admin client: {}", e.getMessage()); + return Future.failedFuture("Error creating topic admin client"); + } + } + + private Future getOrCreateSubscriptionAdminClient() { + if (subscriptionAdminClient != null) { + return Future.succeededFuture(subscriptionAdminClient); + } + try { + final SubscriptionAdminSettings adminSettings = SubscriptionAdminSettings + .newBuilder() + .setCredentialsProvider(credentialsProvider) + .build(); + subscriptionAdminClient = SubscriptionAdminClient.create(adminSettings); + return Future.succeededFuture(subscriptionAdminClient); + } catch (IOException e) { + LOG.error("Error initializing subscription admin client: {}", e.getMessage()); + return Future.failedFuture("Error creating subscription admin client"); + } + } + + /** + * Gets an existing topic or creates a new one on Pub/Sub based on the given topic endpoint and prefix. + * + * @param endpoint The endpoint name of the topic, e.g. command_internal. + * @param prefix The prefix of the topic, e.g. the adapter instance ID. + * @return A succeeded Future if the topic is successfully created or already exists, or a failed Future if it could + * not be created. + */ + public Future getOrCreateTopic(final String endpoint, final String prefix) { + final TopicName topicName = TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, prefix)); + + return getOrCreateTopicAdminClient() + .compose(client -> getTopic(topicName, client) + .recover(thr -> { + if (thr instanceof NotFoundException) { + return createTopic(topicName, client); + } else { + return Future.failedFuture(thr); + } + })); + } + + private Future getTopic(final TopicName topicName, final TopicAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + final Topic topic = client.getTopic(topicName); + promise.complete(topic.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + } + + /** + * Creates a new topic on Pub/Sub based on the given topicName. + * + * @param topicName The name of the topic, e.g. tenant.command. + * @return A succeeded Future if the topic is successfully created or a failed Future if it could not be created. + */ + public Future createTopic(final TopicName topicName) { + return getOrCreateTopicAdminClient() + .compose(client -> createTopic(topicName, client)); + } + + private Future createTopic(final TopicName topicName, final TopicAdminClient client) { + final Future createdTopic = vertx + .executeBlocking(promise -> { + try { + final Topic topic = client.createTopic(topicName); + promise.complete(topic.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + createdTopic.onSuccess(top -> LOG.info("Topic {} created successfully.", topicName)) + .onFailure(thr -> LOG.warn("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId)); + return createdTopic; + } + + /** + * Gets an existing subscription or creates a new one on Pub/Sub based on the given prefix, topic and subscription + * endpoint. + * + * @param topicEndpoint The endpoint name of the topic, e.g. command_internal. + * @param subscriptionEndpoint The endpoint name of the subscription, e.g. command_internal. + * @param prefix The prefix of the topic and subscription, e.g. the adapter instance ID. + * @return A succeeded Future if the subscription is successfully created or already exists, or a failed Future if + * it could not be created. + */ + public Future getOrCreateSubscription(final String topicEndpoint, final String subscriptionEndpoint, + final String prefix) { + final String topic = PubSubMessageHelper.getTopicName(topicEndpoint, prefix); + final String subscription = PubSubMessageHelper.getTopicName(subscriptionEndpoint, prefix); + final TopicName topicName = TopicName.of(projectId, topic); + final SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscription); + + return getOrCreateSubscriptionAdminClient() + .compose(client -> getSubscription(subscriptionName, client) + .recover(thr -> { + if (thr instanceof NotFoundException) { + return createSubscription(subscriptionName, topicName, client); + } else { + return Future.failedFuture(thr); + } + })); + } + + private Future getSubscription(final SubscriptionName subscriptionName, + final SubscriptionAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + final Subscription subscription = client.getSubscription(subscriptionName); + promise.complete(subscription.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + } + + /** + * Creates a new subscription on Pub/Sub based on the given subscriptionName and topicName. + * + * @param subscriptionName The name of the subscription, e.g. tenant.event. + * @param topicName The name of the topic, e.g. tenant.event. + * @return A succeeded Future if the subscription is successfully created or a failed Future if it could not be + * created. + */ + public Future createSubscription(final SubscriptionName subscriptionName, final TopicName topicName) { + return getOrCreateSubscriptionAdminClient() + .compose(client -> createSubscription(subscriptionName, topicName, client)); + } + + private Future createSubscription(final SubscriptionName subscriptionName, final TopicName topicName, + final SubscriptionAdminClient client) { + final Subscription request = Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setPushConfig(PushConfig.getDefaultInstance()) + .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) + .setExpirationPolicy(ExpirationPolicy.getDefaultInstance()) + .build(); + final Future createdSubscription = vertx + .executeBlocking(promise -> { + try { + final Subscription subscription = client.createSubscription(request); + promise.complete(subscription.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + createdSubscription.onSuccess(sub -> LOG.info("Subscription {} created successfully.", subscriptionName)) + .onFailure( + thr -> LOG.warn("Creating subscription failed [subscription: {}, topic: {}, project: {}]", + subscriptionName, topicName, projectId)); + return createdSubscription; + } + + /** + * Updates the topic of a subscription on Pub/Sub based on the given subscriptionName and topicName. + * + * @param subscriptionName The name of the subscription, e.g. tenant.command. + * @param topicName The name of the new topic, e.g. tenant.command. + * @return A succeeded Future if the topic of the subscription is successfully updated or a failed Future if it + * could not be updated. + */ + public Future updateSubscriptionTopic(final SubscriptionName subscriptionName, final TopicName topicName) { + return getOrCreateSubscriptionAdminClient() + .compose(client -> deleteSubscription(subscriptionName, client) + .compose(promise -> createSubscription(subscriptionName, topicName, client))) + .onSuccess(sub -> LOG.info("Subscription {} updated successfully.", sub)) + .onFailure( + thr -> LOG.warn("Updating subscription failed [subscription: {}, topic: {}, project: {}]", + subscriptionName, topicName, projectId)); + } + + /** + * Lists subscriptions and adds its names in a set. + * + * @return A future containing a set of subscription names. + */ + public Future> listSubscriptions() { + return getOrCreateSubscriptionAdminClient() + .compose(client -> { + final Set allSubscriptions = new HashSet<>(); + final ProjectName projectName = ProjectName.of(projectId); + + return vertx.executeBlocking(promise -> { + try { + final ListSubscriptionsPagedResponse pagedResponse = client.listSubscriptions( + projectName); + Optional.ofNullable(pagedResponse) + .ifPresent(p -> p.iterateAll().forEach(allSubscriptions::add)); + promise.complete(allSubscriptions); + } catch (Exception e) { + LOG.error("Error listing subscriptions on project {}", projectId, e); + promise.fail("Error listing subscriptions"); + } + }); + }); + } + + /** + * Lists topics and adds its names in a set. + * + * @return A future containing a set of topic names. + */ + public Future> listTopics() { + return getOrCreateTopicAdminClient() + .compose(client -> { + final Set allTopics = new HashSet<>(); + final ProjectName projectName = ProjectName.of(projectId); + + return vertx.executeBlocking(promise -> { + try { + final ListTopicsPagedResponse pagedResponse = client.listTopics(projectName); + Optional.ofNullable(pagedResponse) + .ifPresent( + p -> pagedResponse.iterateAll().forEach(t -> allTopics.add(t.getName()))); + promise.complete(allTopics); + } catch (Exception e) { + LOG.error("Error listing topics on project {}", projectId, e); + promise.fail("Error listing topics"); + } + }); + }); + } + + private Future deleteTopic(final TopicName topicName, final TopicAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + client.deleteTopic(topicName); + promise.complete(); + } catch (ApiException e) { + LOG.warn("Could not delete topic {}", topicName, e); + promise.fail(e); + } + }); + } + + private Future deleteSubscription(final SubscriptionName subscriptionName, + final SubscriptionAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + client.deleteSubscription(subscriptionName); + promise.complete(); + } catch (ApiException e) { + LOG.warn("Could not delete subscription {}", subscriptionName, e); + promise.fail(e); + } + }); + } + + /** + * Deletes the topics with the provided names in the list. + * + * @param topicsToDelete A list containing the topic names that should be deleted. The list must contain topics with + * the format `"projects/{project}/topics/{topic}"` + * @return A succeeded future if the topics could be deleted successfully, a failed future if an error occurs. + */ + public Future deleteTopics(final List topicsToDelete) { + return getOrCreateTopicAdminClient() + .compose(client -> { + final List futureList = new ArrayList<>(); + for (final String topic : topicsToDelete) { + futureList.add(deleteTopic(TopicName.parse(topic), client)); + } + return CompositeFuture.join(futureList); + }); + } + + /** + * Deletes the subscriptions with the provided names in the list. + * + * @param subscriptionsToDelete A list containing the subscription names that should be deleted. The list must + * contain subscriptions with the format `"projects/{project}/subscriptions/{subscription}"` + * @return A succeeded future if the subscriptions could be deleted successfully, a failed future if an error + * occurs. + */ + public Future deleteSubscriptions(final List subscriptionsToDelete) { + return getOrCreateSubscriptionAdminClient() + .compose(client -> { + final List futureList = new ArrayList<>(); + for (final String subscription : subscriptionsToDelete) { + futureList.add(deleteSubscription(SubscriptionName.parse(subscription), client)); + } + return CompositeFuture.join(futureList); + }); + } + + /** + * Closes the TopicAdminClient and the SubscriptionAdminClient if they exist. This method is expected to be invoked + * as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. This method will block the + * current thread for up to 10 seconds! + */ + public void closeAdminClients() { + closeSubscriptionAdminClient(); + closeTopicAdminClient(); + } + + private void closeSubscriptionAdminClient() { + if (subscriptionAdminClient != null) { + subscriptionAdminClient.shutdown(); + try { + subscriptionAdminClient.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.debug("Resources are not freed properly, error", e); + Thread.currentThread().interrupt(); + } + } + } + + private void closeTopicAdminClient() { + if (topicAdminClient != null) { + topicAdminClient.shutdown(); + try { + topicAdminClient.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.debug("Resources are not freed properly, error", e); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactory.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactory.java index b02d139e..1df57542 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactory.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactory.java @@ -14,8 +14,6 @@ package org.eclipse.hono.communication.api.service.communication; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; - /** * A factory to create PubSubBasedAdminClientManager instances. */ diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactoryImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactoryImpl.java index 16c9c2bd..d0f85dad 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactoryImpl.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubBasedAdminClientManagerFactoryImpl.java @@ -15,7 +15,6 @@ import java.util.Objects; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import com.google.api.gax.core.CredentialsProvider; diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java index 24e53063..a18e2721 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java @@ -16,13 +16,12 @@ package org.eclipse.hono.communication.api.service.communication; -import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.stream.StreamSupport; +import java.util.concurrent.TimeoutException; +import org.eclipse.hono.communication.api.config.PubSubConstants; import org.eclipse.hono.communication.core.app.InternalMessagingConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +32,10 @@ import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Subscriber; -import com.google.cloud.pubsub.v1.SubscriptionAdminClient; -import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; -import com.google.pubsub.v1.ProjectName; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.PushConfig; -import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.TopicName; import jakarta.annotation.PreDestroy; @@ -53,12 +47,10 @@ @ApplicationScoped public class PubSubService implements InternalMessaging { - public static final String COMMUNICATION_API_SUBSCRIPTION_NAME = "%s-communication-api"; private final Logger log = LoggerFactory.getLogger(PubSubService.class); private final Map activeSubscriptions = new HashMap<>(); private final String projectId; - private TopicName topicName; /** * Creates a new PubSubService. @@ -74,14 +66,19 @@ public PubSubService(final InternalMessagingConfig configs) { */ @PreDestroy void destroy() { + activeSubscriptions.forEach((topic, subscriber) -> closeSubscriber(subscriber)); + activeSubscriptions.clear(); + } - activeSubscriptions.forEach((topic, subscriber) -> { - if (subscriber != null) { - subscriber.stopAsync(); + private void closeSubscriber(final Subscriber subscriber) { + if (subscriber != null) { + subscriber.stopAsync(); + try { + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new RuntimeException(e); } - }); - - activeSubscriptions.clear(); + } } @Override @@ -117,53 +114,31 @@ public void onFailure(final Throwable t) { @Override public void subscribe(final String topic, final MessageReceiver callbackHandler) { - if (activeSubscriptions.containsKey(topic)) { + final String subscription = String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, topic); + if (activeSubscriptions.containsKey(subscription)) { return; } - topicName = TopicName.of(projectId, topic); final ProjectSubscriptionName subscriptionName; try { - subscriptionName = initSubscription(topic); + subscriptionName = ProjectSubscriptionName.of( + projectId, + subscription); final Subscriber subscriber = Subscriber.newBuilder(subscriptionName, callbackHandler).build(); subscriber.startAsync().awaitRunning(); - activeSubscriptions.put(topic, subscriber); - log.info("Successfully subscribe to topic: {}.", topicName.getTopic()); + activeSubscriptions.put(subscription, subscriber); + log.info("Successfully subscribe to topic: {}.", topic); } catch (Exception ex) { log.error("Error subscribe to topic {}: {}", topic, ex.getMessage()); } } - /** - * If the subscription doesn't exist creates a new one. - * - * @param topic Topic name, it will be used for creating the subscription topic_name-sub - * @return The ProjectSubscriptionName object - * @throws IOException if subscription can't be created - */ - ProjectSubscriptionName initSubscription(final String topic) throws IOException { - final var subscriptionName = ProjectSubscriptionName.of( - projectId, - String.format(COMMUNICATION_API_SUBSCRIPTION_NAME, topic)); - final var subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder() - .build(); - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient - .create(subscriptionAdminSettings)) { - final var subscriptions = subscriptionAdminClient.listSubscriptions(ProjectName.of(projectId)) - .iterateAll(); - final Optional existing = StreamSupport - .stream(subscriptions.spliterator(), false) - .filter(sub -> sub.getName().equals(subscriptionName.toString())) - .findFirst(); - - if (existing.isEmpty()) { - - subscriptionAdminClient.createSubscription( - subscriptionName.toString(), - topicName, - PushConfig.getDefaultInstance(), - 50); - } + @Override + public void closeSubscribersForTenant(final String tenant) { + for (String endpoint : PubSubConstants.getEndpointsWithAdditionalSubscription()) { + final String subscription = String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, + String.format("%s.%s", tenant, endpoint)); + final Subscriber subscriber = activeSubscriptions.get(subscription); + closeSubscriber(subscriber); } - return subscriptionName; } } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java index eb24d710..0570edf2 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java @@ -60,6 +60,10 @@ public class InternalMessagingConfig { @ConfigProperty(name = "app.internalMessaging.command.topicFormat") String commandTopicFormat; + // PubSub + @ConfigProperty(name = "app.internalMessaging.pubsub.batchInitTenantThreshold") + int batchInitTenantThreshold; + public String getCommandTopicFormat() { return commandTopicFormat; } @@ -107,4 +111,8 @@ public String getDeliveryFailureNotificationContentType() { public String getStateTopicFormat() { return stateTopicFormat; } + + public int getBatchInitTenantThreshold() { + return batchInitTenantThreshold; + } } diff --git a/device-communication/src/main/resources/application.yaml b/device-communication/src/main/resources/application.yaml index 478a83f3..176ddc3d 100644 --- a/device-communication/src/main/resources/application.yaml +++ b/device-communication/src/main/resources/application.yaml @@ -16,13 +16,14 @@ app: event: topicFormat: ${COM_EVENT_ON_CONNECT_TOPIC_FORMAT:%s.event} # TENANT_NAME.event emptyNotificationEventContentType: ${COM_EVENT_EMPTY_NOTIFICATION_CONTENT_TYPE:application/vnd.eclipse-hono-empty-notification} - state: topicFormat: ${COM_STATE_TOPIC_FORMAT:%s.event.state} # TENANT_NAME.event.state command: topicFormat: ${COM_COMMAND_TOPIC_FORMAT:%s.command} # TENANT_NAME.command ackTopic: ${COM_CONFIG_ACK_TOPIC:%s.command_response} # TENANT_NAME.command_response commandConfigAckDelay: ${COM_COMMAND_CONFIG_ACK_DELAY:5000} + pubsub: + batchInitTenantThreshold: ${COM_PUBSUB_BATCH_INIT_TENANT_THRESHOLD:3} vertx: openapi: @@ -63,5 +64,5 @@ quarkus: container-image: builder: docker build: true - push: true - image: "gcr.io/sotec-iot-core-dev/hono-device-communication:2.5.1-SNAPSHOT-test" \ No newline at end of file + push: false + image: "gcr.io/sotec-iot-core-dev/hono-device-communication:2.6.0-SNAPSHOT-dev" \ No newline at end of file diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java index 3d12bde7..9edb84cc 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java @@ -18,6 +18,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; @@ -29,7 +30,6 @@ import java.time.Instant; import java.util.Optional; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.communication.api.handler.CommandTopicEventHandler; import org.eclipse.hono.communication.api.handler.ConfigTopicEventHandler; @@ -68,10 +68,10 @@ class InternalTopicManagerImplTest { private final CommandTopicEventHandler commandTopicEventHandler; private final ConfigTopicEventHandler configTopicEventHandler; private final StateTopicEventHandler stateTopicEventHandler; - private final InternalTopicManagerImpl internalTopicManager; private final PubSubBasedAdminClientManagerFactory adminClientManagerFactory; private final PubSubBasedAdminClientManager adminClientManager; private final Vertx vertxMock; + private InternalTopicManagerImpl internalTopicManager; InternalTopicManagerImplTest() { this.deviceRepositoryMock = mock(DeviceRepository.class); @@ -87,10 +87,6 @@ class InternalTopicManagerImplTest { this.stateTopicEventHandler = mock(StateTopicEventHandler.class); this.adminClientManagerFactory = mock(PubSubBasedAdminClientManagerFactory.class); this.vertxMock = mock(Vertx.class); - this.internalTopicManager = new InternalTopicManagerImpl(deviceRepositoryMock, commandTopicEventHandler, - configTopicEventHandler, stateTopicEventHandler, internalCommunicationMock, internalMessagingConfigMock, - adminClientManagerFactory, - vertxMock); this.adminClientManager = mock(PubSubBasedAdminClientManager.class); } @@ -98,6 +94,10 @@ class InternalTopicManagerImplTest { void setup() { when(pubsubMessageMock.getData()).thenReturn(byteStringMock); when(internalMessagingConfigMock.getProjectId()).thenReturn("project_ID"); + this.internalTopicManager = new InternalTopicManagerImpl(deviceRepositoryMock, commandTopicEventHandler, + configTopicEventHandler, stateTopicEventHandler, internalCommunicationMock, internalMessagingConfigMock, + adminClientManagerFactory, + vertxMock); } @AfterEach @@ -158,7 +158,7 @@ public void testOnTenantChanges_success() throws IOException { when(byteStringMock.toStringUtf8()).thenReturn(new ObjectMapper().writeValueAsString(notification)); when(adminClientManagerFactory.createAdminClientManager()).thenReturn(adminClientManager); when(adminClientManager.getOrCreateTopic(anyString(), anyString())).thenReturn(Future.succeededFuture()); - when(adminClientManager.getOrCreateSubscription(anyString(), anyString())) + when(adminClientManager.getOrCreateSubscription(anyString(), anyString(), anyString())) .thenReturn(Future.succeededFuture()); internalTopicManager.onTenantChanges(pubsubMessageMock, ackReplyConsumerMock); @@ -169,8 +169,38 @@ public void testOnTenantChanges_success() throws IOException { verify(byteStringMock).toStringUtf8(); verify(adminClientManagerFactory).createAdminClientManager(); verify(adminClientManager, times(5)).getOrCreateTopic(anyString(), anyString()); - verify(adminClientManager, times(5)).getOrCreateSubscription(anyString(), anyString()); + verify(adminClientManager, times(8)).getOrCreateSubscription(anyString(), anyString(), anyString()); verify(vertxMock, times(2)).executeBlocking(any()); } } + + @Test + void testOnTenantChange_tenantDelete_success() throws IOException { + try (MockedStatic mockedPubSubMessageHelper = mockStatic(PubSubMessageHelper.class)) { + final var credMock = mock(FixedCredentialsProvider.class); + mockedPubSubMessageHelper.when(() -> PubSubMessageHelper.getTopicName(anyString(), anyString())) + .thenReturn("test"); + mockedPubSubMessageHelper.when(PubSubMessageHelper::getCredentialsProvider) + .thenReturn(Optional.of(credMock)); + final TenantChangeNotification notification = new TenantChangeNotification(LifecycleChange.DELETE, tenantId, + Instant.now(), false, true); + doNothing().when(internalCommunicationMock).closeSubscribersForTenant(anyString()); + when(byteStringMock.toStringUtf8()).thenReturn(new ObjectMapper().writeValueAsString(notification)); + when(adminClientManagerFactory.createAdminClientManager()).thenReturn(adminClientManager); + when(adminClientManager.deleteTopics(any())).thenReturn(Future.succeededFuture()); + when(adminClientManager.deleteSubscriptions(any())).thenReturn(Future.succeededFuture()); + + internalTopicManager.onTenantChanges(pubsubMessageMock, ackReplyConsumerMock); + + verify(internalMessagingConfigMock).getProjectId(); + verify(ackReplyConsumerMock).ack(); + verify(pubsubMessageMock).getData(); + verify(internalCommunicationMock).closeSubscribersForTenant(anyString()); + verify(byteStringMock).toStringUtf8(); + verify(adminClientManagerFactory).createAdminClientManager(); + verify(adminClientManager).closeAdminClients(); + verify(adminClientManager).deleteTopics(any()); + verify(adminClientManager).deleteSubscriptions(any()); + } + } } diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java index 53ecae0b..c758ab68 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java @@ -16,7 +16,6 @@ package org.eclipse.hono.communication.api.service.communication; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -26,11 +25,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.eclipse.hono.communication.api.config.PubSubConstants; import org.eclipse.hono.communication.core.app.InternalMessagingConfig; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -144,51 +143,46 @@ void testPublish_failed_null_pubSUb_message() throws Exception { } @Test - public void testSubscribe_success() throws Exception { - final String subscriptionName = "my-sub"; + public void testSubscribe_success() { final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, - subscriptionName); + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, topic)); try (MockedStatic subscriberMockedStatic = mockStatic(Subscriber.class)) { subscriberMockedStatic.when(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)) .thenReturn(subscriberBuilderMock); when(configMock.getProjectId()).thenReturn(PROJECT_ID); when(subscriberBuilderMock.build()).thenReturn(subscriberMock); final PubSubService pubSubServiceSpyClient = spy(new PubSubService(configMock)); - doReturn(projectSubscriptionName).when(pubSubServiceSpyClient).initSubscription(topic); pubSubServiceSpyClient.subscribe(topic, messageReceiverMock); verify(configMock).getProjectId(); verify(subscriberBuilderMock).build(); - verify(pubSubServiceSpyClient, times(1)).initSubscription(topic); subscriberMockedStatic.verify(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)); verify(subscriberMock, times(1)).startAsync(); verify(pubSubServiceSpyClient).subscribe(topic, messageReceiverMock); verifyNoMoreInteractions(pubSubServiceSpyClient, subscriberMock); subscriberMockedStatic.verifyNoMoreInteractions(); - } - } @Test - public void testSubscribe_failed() throws Exception { - final String subscriptionName = "my-sub"; + public void testSubscribe_failed() { final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, - subscriptionName); + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, topic)); try (MockedStatic subscriberMockedStatic = mockStatic(Subscriber.class)) { subscriberMockedStatic.when(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)) .thenReturn(subscriberBuilderMock); when(configMock.getProjectId()).thenReturn(PROJECT_ID); when(subscriberBuilderMock.build()).thenReturn(subscriberMock); + doThrow(new NullPointerException()).when(subscriberBuilderMock).build(); final PubSubService pubSubServiceSpyClient = spy(new PubSubService(configMock)); - doThrow(new IOException()).when(pubSubServiceSpyClient).initSubscription(topic); pubSubServiceSpyClient.subscribe(topic, messageReceiverMock); verify(configMock, times(1)).getProjectId(); - verify(pubSubServiceSpyClient, times(1)).initSubscription(topic); + subscriberMockedStatic.verify(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)); + verify(subscriberBuilderMock).build(); verify(pubSubServiceSpyClient, times(1)).subscribe(topic, messageReceiverMock); verifyNoMoreInteractions(pubSubServiceSpyClient, subscriberMock);