From 15c792bfb95ac7f4ec627e906f0b15dfcb25c3a9 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Fri, 17 Jan 2025 15:37:52 -0500 Subject: [PATCH] Resolve deadlock in subscription registry (#6632) * Resolve deadlock in subscription registry * Add changelog --- ...lve-deadlock-in-subscription-registry.yaml | 5 +++ .../SubscriptionChannelRegistry.java | 7 ++-- .../registry/ActiveSubscriptionCache.java | 26 +++++++------ .../match/registry/SubscriptionRegistry.java | 37 +++++++++---------- 4 files changed, 42 insertions(+), 33 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6632-resolve-deadlock-in-subscription-registry.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6632-resolve-deadlock-in-subscription-registry.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6632-resolve-deadlock-in-subscription-registry.yaml new file mode 100644 index 00000000000..e3897a1b0ec --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6632-resolve-deadlock-in-subscription-registry.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 6632 +title: "A race condition in the SubscriptionRegistry could cause an occasional deadlock + during shutdown." diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java index c34265fda4d..3193b14bcbc 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java @@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Multimap; import com.google.common.collect.MultimapBuilder; +import com.google.common.collect.Multimaps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -48,8 +49,8 @@ public class SubscriptionChannelRegistry { // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using // it // Key Channel Name, Value Subscription Id - private final Multimap myActiveSubscriptionByChannelName = - MultimapBuilder.hashKeys().arrayListValues().build(); + private final Multimap myActiveSubscriptionByChannelName = Multimaps.synchronizedMultimap( + MultimapBuilder.hashKeys().arrayListValues().build()); private final Map myChannelNameToSender = new ConcurrentHashMap<>(); @Autowired @@ -120,7 +121,7 @@ protected IChannelProducer newSendingChannel(ProducingChannelParameters theParam return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(), settings); } - public synchronized void remove(ActiveSubscription theActiveSubscription) { + public void remove(ActiveSubscription theActiveSubscription) { String channelName = theActiveSubscription.getChannelName(); ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName); boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscriptionCache.java index fbdd547f2b2..95d4377a79c 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/ActiveSubscriptionCache.java @@ -26,31 +26,34 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.isBlank; +/** + * Thread-safety: This class is thread-safe. + */ class ActiveSubscriptionCache { private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscriptionCache.class); - private final Map myCache = new ConcurrentHashMap<>(); + private final Map myCache = new HashMap<>(); - public ActiveSubscription get(String theIdPart) { + public synchronized ActiveSubscription get(String theIdPart) { return myCache.get(theIdPart); } - public Collection getAll() { - return Collections.unmodifiableCollection(myCache.values()); + public synchronized Collection getAll() { + return Collections.unmodifiableCollection(new ArrayList<>(myCache.values())); } - public int size() { + public synchronized int size() { return myCache.size(); } - public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) { + public synchronized void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) { myCache.put(theSubscriptionId, theActiveSubscription); } @@ -66,9 +69,10 @@ public synchronized ActiveSubscription remove(String theSubscriptionId) { return activeSubscription; } - List markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection theAllIds) { + synchronized List markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete( + Collection theAllIds) { List retval = new ArrayList<>(); - for (String next : new ArrayList<>(myCache.keySet())) { + for (String next : myCache.keySet()) { ActiveSubscription activeSubscription = myCache.get(next); if (theAllIds.contains(next)) { // In case we got a false positive from a race condition on a previous sync, unset the flag. @@ -90,7 +94,7 @@ List markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete( * @param theTopic * @return a list of all subscriptions that are subscribed to the given topic */ - public List getTopicSubscriptionsForTopic(String theTopic) { + public synchronized List getTopicSubscriptionsForTopic(String theTopic) { assert !isBlank(theTopic); return getAll().stream() .filter(as -> as.getSubscription().isTopicSubscription()) @@ -98,7 +102,7 @@ public List getTopicSubscriptionsForTopic(String theTopic) { .collect(Collectors.toList()); } - public List getAllNonTopicSubscriptions() { + public synchronized List getAllNonTopicSubscriptions() { return getAll().stream() .filter(as -> !as.getSubscription().isTopicSubscription()) .collect(Collectors.toList()); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java index 55962928129..f99d900b5e0 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionRegistry.java @@ -69,21 +69,21 @@ public SubscriptionRegistry() { super(); } - public synchronized ActiveSubscription get(String theIdPart) { + public ActiveSubscription get(String theIdPart) { return myActiveSubscriptionCache.get(theIdPart); } - public synchronized Collection getAll() { + public Collection getAll() { return myActiveSubscriptionCache.getAll(); } - public synchronized List getTopicSubscriptionsByTopic(String theTopic) { + public List getTopicSubscriptionsByTopic(String theTopic) { return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic); } private Optional hasSubscription(IIdType theId) { - Validate.notNull(theId); - Validate.notBlank(theId.getIdPart()); + Validate.notNull(theId, "theId must not be null"); + Validate.notBlank(theId.getIdPart(), "theId must have an ID part"); Optional activeSubscription = Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart())); return activeSubscription.map(ActiveSubscription::getSubscription); @@ -92,8 +92,7 @@ private Optional hasSubscription(IIdType theId) { /** * Extracts the retry configuration settings from the CanonicalSubscription object. * - * Returns the configuration, or null, if no retry (or a bad retry value) - * is specified. + * @return the configuration, or null, if no retry (or a bad retry value) is specified. */ private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions( CanonicalSubscription theSubscription) { @@ -115,10 +114,10 @@ private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtension } private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) { - Validate.notNull(theId); + Validate.notNull(theId, "theId must not be null"); String subscriptionId = theId.getIdPart(); - Validate.notBlank(subscriptionId); - Validate.notNull(theCanonicalSubscription); + Validate.notBlank(subscriptionId, "theId must have an ID part"); + Validate.notNull(theCanonicalSubscription, "theCanonicalSubscription must not be null"); String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription); @@ -143,8 +142,8 @@ private void registerSubscription(IIdType theId, CanonicalSubscription theCanoni myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); } - public synchronized void unregisterSubscriptionIfRegistered(String theSubscriptionId) { - Validate.notNull(theSubscriptionId); + public void unregisterSubscriptionIfRegistered(String theSubscriptionId) { + Validate.notNull(theSubscriptionId, "theSubscriptionId must not be null"); ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId); if (activeSubscription != null) { @@ -161,14 +160,14 @@ public synchronized void unregisterSubscriptionIfRegistered(String theSubscripti } @PreDestroy - public synchronized void unregisterAllSubscriptions() { + public void unregisterAllSubscriptions() { // Once to set flag unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); // Twice to remove unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); } - synchronized void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { + void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { List idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); @@ -178,7 +177,7 @@ synchronized void unregisterAllSubscriptionsNotInCollection(Collection t } public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { - Validate.notNull(theSubscription); + Validate.notNull(theSubscription, "theSubscription must not be null"); Optional existingSubscription = hasSubscription(theSubscription.getIdElement()); CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); @@ -208,10 +207,10 @@ public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseRes private void updateSubscription(IBaseResource theSubscription) { IIdType theId = theSubscription.getIdElement(); - Validate.notNull(theId); - Validate.notBlank(theId.getIdPart()); + Validate.notNull(theId, "theId must not be null"); + Validate.notBlank(theId.getIdPart(), "theId must have an ID part"); ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart()); - Validate.notNull(activeSubscription); + Validate.notNull(activeSubscription, "Subscription with ID %s not found in cache", theId.getIdPart()); CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); activeSubscription.setSubscription(canonicalized); @@ -229,7 +228,7 @@ public int size() { return myActiveSubscriptionCache.size(); } - public synchronized List getAllNonTopicSubscriptions() { + public List getAllNonTopicSubscriptions() { return myActiveSubscriptionCache.getAllNonTopicSubscriptions(); } }