Skip to content

Commit

Permalink
Resolve deadlock in subscription registry (#6632)
Browse files Browse the repository at this point in the history
* Resolve deadlock in subscription registry

* Add changelog
  • Loading branch information
jamesagnew authored Jan 17, 2025
1 parent ababa42 commit 15c792b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 6632
title: "A race condition in the SubscriptionRegistry could cause an occasional deadlock
during shutdown."
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -48,8 +49,8 @@ public class SubscriptionChannelRegistry {
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using
// it
// Key Channel Name, Value Subscription Id
private final Multimap<String, String> myActiveSubscriptionByChannelName =
MultimapBuilder.hashKeys().arrayListValues().build();
private final Multimap<String, String> myActiveSubscriptionByChannelName = Multimaps.synchronizedMultimap(
MultimapBuilder.hashKeys().arrayListValues().build());
private final Map<String, IChannelProducer> myChannelNameToSender = new ConcurrentHashMap<>();

@Autowired
Expand Down Expand Up @@ -120,7 +121,7 @@ protected IChannelProducer newSendingChannel(ProducingChannelParameters theParam
return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(), settings);
}

public synchronized void remove(ActiveSubscription theActiveSubscription) {
public void remove(ActiveSubscription theActiveSubscription) {
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,34 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.commons.lang3.StringUtils.isBlank;

/**
* Thread-safety: This class is thread-safe.
*/
class ActiveSubscriptionCache {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscriptionCache.class);

private final Map<String, ActiveSubscription> myCache = new ConcurrentHashMap<>();
private final Map<String, ActiveSubscription> myCache = new HashMap<>();

public ActiveSubscription get(String theIdPart) {
public synchronized ActiveSubscription get(String theIdPart) {
return myCache.get(theIdPart);
}

public Collection<ActiveSubscription> getAll() {
return Collections.unmodifiableCollection(myCache.values());
public synchronized Collection<ActiveSubscription> getAll() {
return Collections.unmodifiableCollection(new ArrayList<>(myCache.values()));
}

public int size() {
public synchronized int size() {
return myCache.size();
}

public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) {
public synchronized void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) {
myCache.put(theSubscriptionId, theActiveSubscription);
}

Expand All @@ -66,9 +69,10 @@ public synchronized ActiveSubscription remove(String theSubscriptionId) {
return activeSubscription;
}

List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection<String> theAllIds) {
synchronized List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(
Collection<String> theAllIds) {
List<String> retval = new ArrayList<>();
for (String next : new ArrayList<>(myCache.keySet())) {
for (String next : myCache.keySet()) {
ActiveSubscription activeSubscription = myCache.get(next);
if (theAllIds.contains(next)) {
// In case we got a false positive from a race condition on a previous sync, unset the flag.
Expand All @@ -90,15 +94,15 @@ List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(
* @param theTopic
* @return a list of all subscriptions that are subscribed to the given topic
*/
public List<ActiveSubscription> getTopicSubscriptionsForTopic(String theTopic) {
public synchronized List<ActiveSubscription> getTopicSubscriptionsForTopic(String theTopic) {
assert !isBlank(theTopic);
return getAll().stream()
.filter(as -> as.getSubscription().isTopicSubscription())
.filter(as -> theTopic.equals(as.getSubscription().getTopic()))
.collect(Collectors.toList());
}

public List<ActiveSubscription> getAllNonTopicSubscriptions() {
public synchronized List<ActiveSubscription> getAllNonTopicSubscriptions() {
return getAll().stream()
.filter(as -> !as.getSubscription().isTopicSubscription())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ public SubscriptionRegistry() {
super();
}

public synchronized ActiveSubscription get(String theIdPart) {
public ActiveSubscription get(String theIdPart) {
return myActiveSubscriptionCache.get(theIdPart);
}

public synchronized Collection<ActiveSubscription> getAll() {
public Collection<ActiveSubscription> getAll() {
return myActiveSubscriptionCache.getAll();
}

public synchronized List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) {
public List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) {
return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic);
}

private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
Validate.notNull(theId, "theId must not be null");
Validate.notBlank(theId.getIdPart(), "theId must have an ID part");
Optional<ActiveSubscription> activeSubscription =
Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart()));
return activeSubscription.map(ActiveSubscription::getSubscription);
Expand All @@ -92,8 +92,7 @@ private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
/**
* Extracts the retry configuration settings from the CanonicalSubscription object.
*
* Returns the configuration, or null, if no retry (or a bad retry value)
* is specified.
* @return the configuration, or null, if no retry (or a bad retry value) is specified.
*/
private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions(
CanonicalSubscription theSubscription) {
Expand All @@ -115,10 +114,10 @@ private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtension
}

private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) {
Validate.notNull(theId);
Validate.notNull(theId, "theId must not be null");
String subscriptionId = theId.getIdPart();
Validate.notBlank(subscriptionId);
Validate.notNull(theCanonicalSubscription);
Validate.notBlank(subscriptionId, "theId must have an ID part");
Validate.notNull(theCanonicalSubscription, "theCanonicalSubscription must not be null");

String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);

Expand All @@ -143,8 +142,8 @@ private void registerSubscription(IIdType theId, CanonicalSubscription theCanoni
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
}

public synchronized void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
Validate.notNull(theSubscriptionId);
public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
Validate.notNull(theSubscriptionId, "theSubscriptionId must not be null");

ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
if (activeSubscription != null) {
Expand All @@ -161,14 +160,14 @@ public synchronized void unregisterSubscriptionIfRegistered(String theSubscripti
}

@PreDestroy
public synchronized void unregisterAllSubscriptions() {
public void unregisterAllSubscriptions() {
// Once to set flag
unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
// Twice to remove
unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
}

synchronized void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {

List<String> idsToDelete =
myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
Expand All @@ -178,7 +177,7 @@ synchronized void unregisterAllSubscriptionsNotInCollection(Collection<String> t
}

public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
Validate.notNull(theSubscription);
Validate.notNull(theSubscription, "theSubscription must not be null");
Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);

Expand Down Expand Up @@ -208,10 +207,10 @@ public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseRes

private void updateSubscription(IBaseResource theSubscription) {
IIdType theId = theSubscription.getIdElement();
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
Validate.notNull(theId, "theId must not be null");
Validate.notBlank(theId.getIdPart(), "theId must have an ID part");
ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
Validate.notNull(activeSubscription);
Validate.notNull(activeSubscription, "Subscription with ID %s not found in cache", theId.getIdPart());
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
activeSubscription.setSubscription(canonicalized);

Expand All @@ -229,7 +228,7 @@ public int size() {
return myActiveSubscriptionCache.size();
}

public synchronized List<ActiveSubscription> getAllNonTopicSubscriptions() {
public List<ActiveSubscription> getAllNonTopicSubscriptions() {
return myActiveSubscriptionCache.getAllNonTopicSubscriptions();
}
}

0 comments on commit 15c792b

Please sign in to comment.