Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve deadlock in subscription registry #6632

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 6632
title: "A race condition in the SubscriptionRegistry could cause an occasional deadlock
during shutdown."
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -48,8 +49,8 @@ public class SubscriptionChannelRegistry {
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using
// it
// Key Channel Name, Value Subscription Id
private final Multimap<String, String> myActiveSubscriptionByChannelName =
MultimapBuilder.hashKeys().arrayListValues().build();
private final Multimap<String, String> myActiveSubscriptionByChannelName = Multimaps.synchronizedMultimap(
MultimapBuilder.hashKeys().arrayListValues().build());
private final Map<String, IChannelProducer> myChannelNameToSender = new ConcurrentHashMap<>();

@Autowired
Expand Down Expand Up @@ -120,7 +121,7 @@ protected IChannelProducer newSendingChannel(ProducingChannelParameters theParam
return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(), settings);
}

public synchronized void remove(ActiveSubscription theActiveSubscription) {
public void remove(ActiveSubscription theActiveSubscription) {
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,34 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.commons.lang3.StringUtils.isBlank;

/**
* Thread-safety: This class is thread-safe.
*/
class ActiveSubscriptionCache {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscriptionCache.class);

private final Map<String, ActiveSubscription> myCache = new ConcurrentHashMap<>();
private final Map<String, ActiveSubscription> myCache = new HashMap<>();

public ActiveSubscription get(String theIdPart) {
public synchronized ActiveSubscription get(String theIdPart) {
return myCache.get(theIdPart);
}

public Collection<ActiveSubscription> getAll() {
return Collections.unmodifiableCollection(myCache.values());
public synchronized Collection<ActiveSubscription> getAll() {
return Collections.unmodifiableCollection(new ArrayList<>(myCache.values()));
}

public int size() {
public synchronized int size() {
return myCache.size();
}

public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) {
public synchronized void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) {
myCache.put(theSubscriptionId, theActiveSubscription);
}

Expand All @@ -66,9 +69,10 @@ public synchronized ActiveSubscription remove(String theSubscriptionId) {
return activeSubscription;
}

List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection<String> theAllIds) {
synchronized List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(
Collection<String> theAllIds) {
List<String> retval = new ArrayList<>();
for (String next : new ArrayList<>(myCache.keySet())) {
for (String next : myCache.keySet()) {
ActiveSubscription activeSubscription = myCache.get(next);
if (theAllIds.contains(next)) {
// In case we got a false positive from a race condition on a previous sync, unset the flag.
Expand All @@ -90,15 +94,15 @@ List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(
* @param theTopic
* @return a list of all subscriptions that are subscribed to the given topic
*/
public List<ActiveSubscription> getTopicSubscriptionsForTopic(String theTopic) {
public synchronized List<ActiveSubscription> getTopicSubscriptionsForTopic(String theTopic) {
assert !isBlank(theTopic);
return getAll().stream()
.filter(as -> as.getSubscription().isTopicSubscription())
.filter(as -> theTopic.equals(as.getSubscription().getTopic()))
.collect(Collectors.toList());
}

public List<ActiveSubscription> getAllNonTopicSubscriptions() {
public synchronized List<ActiveSubscription> getAllNonTopicSubscriptions() {
return getAll().stream()
.filter(as -> !as.getSubscription().isTopicSubscription())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ public SubscriptionRegistry() {
super();
}

public synchronized ActiveSubscription get(String theIdPart) {
public ActiveSubscription get(String theIdPart) {
return myActiveSubscriptionCache.get(theIdPart);
}

public synchronized Collection<ActiveSubscription> getAll() {
public Collection<ActiveSubscription> getAll() {
return myActiveSubscriptionCache.getAll();
}

public synchronized List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) {
public List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) {
return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic);
}

private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
Validate.notNull(theId, "theId must not be null");
Validate.notBlank(theId.getIdPart(), "theId must have an ID part");
Optional<ActiveSubscription> activeSubscription =
Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart()));
return activeSubscription.map(ActiveSubscription::getSubscription);
Expand All @@ -92,8 +92,7 @@ private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
/**
* Extracts the retry configuration settings from the CanonicalSubscription object.
*
* Returns the configuration, or null, if no retry (or a bad retry value)
* is specified.
* @return the configuration, or null, if no retry (or a bad retry value) is specified.
*/
private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions(
CanonicalSubscription theSubscription) {
Expand All @@ -115,10 +114,10 @@ private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtension
}

private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) {
Validate.notNull(theId);
Validate.notNull(theId, "theId must not be null");
String subscriptionId = theId.getIdPart();
Validate.notBlank(subscriptionId);
Validate.notNull(theCanonicalSubscription);
Validate.notBlank(subscriptionId, "theId must have an ID part");
Validate.notNull(theCanonicalSubscription, "theCanonicalSubscription must not be null");

String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);

Expand All @@ -143,8 +142,8 @@ private void registerSubscription(IIdType theId, CanonicalSubscription theCanoni
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
}

public synchronized void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
Validate.notNull(theSubscriptionId);
public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
Validate.notNull(theSubscriptionId, "theSubscriptionId must not be null");

ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
if (activeSubscription != null) {
Expand All @@ -161,14 +160,14 @@ public synchronized void unregisterSubscriptionIfRegistered(String theSubscripti
}

@PreDestroy
public synchronized void unregisterAllSubscriptions() {
public void unregisterAllSubscriptions() {
// Once to set flag
unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
// Twice to remove
unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
}

synchronized void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {

List<String> idsToDelete =
myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
Expand All @@ -178,7 +177,7 @@ synchronized void unregisterAllSubscriptionsNotInCollection(Collection<String> t
}

public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
Validate.notNull(theSubscription);
Validate.notNull(theSubscription, "theSubscription must not be null");
Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);

Expand Down Expand Up @@ -208,10 +207,10 @@ public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseRes

private void updateSubscription(IBaseResource theSubscription) {
IIdType theId = theSubscription.getIdElement();
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
Validate.notNull(theId, "theId must not be null");
Validate.notBlank(theId.getIdPart(), "theId must have an ID part");
ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
Validate.notNull(activeSubscription);
Validate.notNull(activeSubscription, "Subscription with ID %s not found in cache", theId.getIdPart());
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
activeSubscription.setSubscription(canonicalized);

Expand All @@ -229,7 +228,7 @@ public int size() {
return myActiveSubscriptionCache.size();
}

public synchronized List<ActiveSubscription> getAllNonTopicSubscriptions() {
public List<ActiveSubscription> getAllNonTopicSubscriptions() {
return myActiveSubscriptionCache.getAllNonTopicSubscriptions();
}
}
Loading