Skip to content

Commit

Permalink
Add PubSub batch initialization to improve start up time in multi ten…
Browse files Browse the repository at this point in the history
…ant instances

Fix PubSub tenant topic/subscription clean up on tenant delete

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Feb 20, 2024
1 parent ed7c646 commit 7a36b1b
Show file tree
Hide file tree
Showing 11 changed files with 664 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,30 @@ public final class PubSubConstants {
public static final String EVENT_STATES_SUBTOPIC_ENDPOINT = "event.state";
public static final String COMMAND_ENDPOINT = "command";
public static final String COMMAND_RESPONSE_ENDPOINT = "command_response";
public static final String COMMUNICATION_API_SUBSCRIPTION_NAME = "%s-communication-api";

private PubSubConstants() {
}

/**
* Gets the list of all topics need to be created per tenant.
* Gets the list of all endpoints for which a topic per tenant has to be created.
*
* @return List of all topics.
* @return List of all topic endpoints.
*/
public static List<String> getTenantTopics() {
public static List<String> getTenantEndpoints() {
return List.of(EVENT_ENDPOINT,
COMMAND_ENDPOINT,
COMMAND_RESPONSE_ENDPOINT,
EVENT_STATES_SUBTOPIC_ENDPOINT,
TELEMETRY_ENDPOINT);
}

/**
* Gets the list of all endpoints for which an additional subscription per tenant has to be created.
*
* @return List of all endpoints that need an additional subscription.
*/
public static List<String> getEndpointsWithAdditionalSubscription() {
return List.of(EVENT_ENDPOINT, COMMAND_RESPONSE_ENDPOINT, EVENT_STATES_SUBTOPIC_ENDPOINT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public interface InternalMessageSubscriber {
* @param callbackHandler The function to be called when a message is received
*/
void subscribe(String topic, MessageReceiver callbackHandler);

/**
* Closes all active subscribers for the given tenant.
*
* @param tenant The tenant whose active subscribers should be closed.
*/
void closeSubscribersForTenant(String tenant);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package org.eclipse.hono.communication.api.service.communication;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.communication.api.config.PubSubConstants;
import org.eclipse.hono.communication.api.handler.CommandTopicEventHandler;
Expand All @@ -37,6 +41,7 @@
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.common.base.Strings;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;

Expand Down Expand Up @@ -96,16 +101,115 @@ public InternalTopicManagerImpl(final DeviceRepository deviceRepository,
@Override
public void initPubSub() {
log.info("Initialize tenant topics and subscriptions.");
internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges);
vertx.executeBlocking(promise -> {
internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges);
promise.complete();
});
deviceRepository.listDistinctTenants()
.onFailure(err -> log.error("Error getting tenants for topic creation: {}", err.getMessage()))
.onSuccess(tenants -> {
if (tenants.size() < internalMessagingConfig.getBatchInitTenantThreshold()) {
for (String tenant : tenants) {
initPubSubForTenant(tenant);
}
} else {
batchInitPubSubResources(tenants);
}
});
}

private void batchInitPubSubResources(final List<String> tenants) {
log.info("Start batchInitPubSubResources");
findMissingPubSubResources(tenants).compose(map -> {
final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory
.createAdminClientManager();
final List<Future> pubsubRequests = new ArrayList<>();
for (Map.Entry<String, String> missingTopic : map.get("missingTopics").entrySet()) {
final TopicName topic = TopicName.parse(missingTopic.getKey());
pubsubRequests.add(adminClientManager.createTopic(topic));
}
for (Map.Entry<String, String> missingSubscription : map.get("missingSubscriptions").entrySet()) {
final SubscriptionName subscription = SubscriptionName.parse(missingSubscription.getKey());
final TopicName topic = TopicName.parse(missingSubscription.getValue());
pubsubRequests.add(adminClientManager.createSubscription(subscription, topic));
}
for (Map.Entry<String, String> faultySubscription : map.get("faultySubscriptions").entrySet()) {
final SubscriptionName subscription = SubscriptionName.parse(faultySubscription.getKey());
final TopicName topic = TopicName.parse(faultySubscription.getValue());
pubsubRequests.add(adminClientManager.updateSubscriptionTopic(subscription, topic));
}
return CompositeFuture.join(pubsubRequests).onComplete(i -> {
adminClientManager.closeAdminClients();
log.info("Finished batchInitPubSubResources");
});
}).onFailure(e -> log.error("batchInitPubSubResources failed", e))
.onSuccess(i -> {
for (String tenant : tenants) {
initPubSubForTenant(tenant);
vertx.executeBlocking(promise -> subscribeToTenantTopics(tenant));
}
});
}

private Future<Map<String, Map<String, String>>> findMissingPubSubResources(final List<String> tenants) {
final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = adminClientManagerFactory
.createAdminClientManager();
final Future<Set<String>> topicsFuture = pubSubBasedAdminClientManager.listTopics();
final Future<Set<Subscription>> subscriptionsFuture = pubSubBasedAdminClientManager.listSubscriptions();
final Map<String, String> missingTopics = new HashMap<>();
final Map<String, String> missingSubscriptions = new HashMap<>();
final Map<String, String> faultySubscriptions = new HashMap<>();
return CompositeFuture.join(topicsFuture, subscriptionsFuture).map(i -> {
final Set<String> topicSet = topicsFuture.result();
final Set<Subscription> subscriptionSet = subscriptionsFuture.result();
final Map<String, Subscription> subscriptionMap = new HashMap<>();
subscriptionSet.forEach(s -> subscriptionMap.put(s.getName(), s));
for (String tenantEndpoint : PubSubConstants.getTenantEndpoints()) {
for (String tenant : tenants) {
final String topic = String
.valueOf(TopicName.of(projectId, PubSubMessageHelper.getTopicName(tenantEndpoint, tenant)));
addMissingTopicToMap(topicSet, topic, missingTopics);
final String subscription = String.valueOf(
SubscriptionName.of(projectId, PubSubMessageHelper.getTopicName(tenantEndpoint, tenant)));
addMissingOrFaultySubscription(subscriptionMap, missingSubscriptions, faultySubscriptions,
subscription, topic);
if (PubSubConstants.getEndpointsWithAdditionalSubscription().contains(tenantEndpoint)) {
final String apiSubscription = String.valueOf(SubscriptionName.of(projectId,
PubSubMessageHelper.getTopicName(
String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME,
tenantEndpoint),
tenant)));
addMissingOrFaultySubscription(subscriptionMap, missingSubscriptions, faultySubscriptions,
apiSubscription, topic);
}
}
}
return Map.of("missingTopics", missingTopics, "missingSubscriptions", missingSubscriptions,
"faultySubscriptions", faultySubscriptions);
}).onFailure(thr -> log.error("Cannot find missing Pub/Sub resources", thr))
.onComplete(i -> pubSubBasedAdminClientManager.closeAdminClients());
}

private void addMissingTopicToMap(final Set<String> topicSet, final String topic,
final Map<String, String> missingTopics) {
if (!topicSet.contains(topic)) {
missingTopics.put(topic, "");
}
}

private void addMissingOrFaultySubscription(final Map<String, Subscription> subscriptionMap,
final Map<String, String> missingSubscriptions, final Map<String, String> faultySubscriptions,
final String subscription, final String topic) {
if (!subscriptionMap.containsKey(subscription)) {
missingSubscriptions.put(subscription, topic);
return;
}
final String deletedTopic = "_deleted-topic_";
if (subscriptionMap.get(subscription) != null
&& StringUtils.equals(subscriptionMap.get(subscription).getTopic(), deletedTopic)) {
faultySubscriptions.put(subscription, topic);
}
}

/**
* Handle incoming tenant CREATE notifications.
*
Expand Down Expand Up @@ -165,12 +269,17 @@ private Future<Void> createPubSubResourceForTenant(final String tenantId,
final PubSubResourceType pubSubResourceType,
final PubSubBasedAdminClientManager pubSubBasedAdminClientManager) {
final List<Future> futureList = new ArrayList<>();
final List<String> topics = PubSubConstants.getTenantTopics();
for (String topic : topics) {
for (String tenantEndpoint : PubSubConstants.getTenantEndpoints()) {
if (pubSubResourceType == PubSubResourceType.TOPIC) {
futureList.add(pubSubBasedAdminClientManager.getOrCreateTopic(topic, tenantId));
futureList.add(pubSubBasedAdminClientManager.getOrCreateTopic(tenantEndpoint, tenantId));
} else {
futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(topic, tenantId));
futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(tenantEndpoint, tenantEndpoint,
tenantId));
if (PubSubConstants.getEndpointsWithAdditionalSubscription().contains(tenantEndpoint)) {
futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(tenantEndpoint,
String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, tenantEndpoint),
tenantId));
}
}
}
return CompositeFuture.join(futureList)
Expand Down Expand Up @@ -198,18 +307,31 @@ private void subscribeToTenantTopics(final String tenant) {
}

private void cleanupPubSubResources(final String tenant) {
final List<String> pubSubTopicsToDelete = PubSubConstants.getTenantTopics().stream()
.map(id -> TopicName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList();
final List<String> pubSubSubscriptionsToDelete = PubSubConstants.getTenantTopics().stream()
.map(id -> SubscriptionName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList();
final List<String> pubSubTopicsToDelete = PubSubConstants.getTenantEndpoints().stream().map(
endpoint -> String.valueOf(TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, tenant))))
.toList();
final List<String> pubSubSubscriptionsToDelete = Stream.concat(PubSubConstants.getTenantEndpoints().stream()
.map(endpoint -> String
.valueOf(SubscriptionName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, tenant)))),
PubSubConstants.getEndpointsWithAdditionalSubscription().stream()
.map(endpoint -> String.valueOf(SubscriptionName.of(projectId,
PubSubMessageHelper.getTopicName(
String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, endpoint),
tenant)))))
.toList();
internalMessaging.closeSubscribersForTenant(tenant);
PubSubMessageHelper.getCredentialsProvider()
.ifPresentOrElse(provider -> {
final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = adminClientManagerFactory
.createAdminClientManager();
pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete);
pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete);
log.info("All topics and subscriptions for tenant {} were deleted successfully.", tenant);
pubSubBasedAdminClientManager.closeAdminClients();
CompositeFuture.join(pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete),
pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete))
.onSuccess(compFuture -> log.info(
"All topics and subscriptions of tenant {} were deleted successfully.", tenant))
.onFailure(throwable -> log.warn(
"Some topics or subscriptions of tenant {} could not be deleted.", tenant,
throwable))
.onComplete(compFuture -> pubSubBasedAdminClientManager.closeAdminClients());
}, () -> log.error("credentials provider is empty"));
}

Expand Down
Loading

0 comments on commit 7a36b1b

Please sign in to comment.