diff --git a/broker/pom.xml b/broker/pom.xml
index 05e14b9b1..ef19f29cc 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -187,6 +187,12 @@
1.15
+
+ org.pcollections
+ pcollections
+ 4.0.2
+
+
org.fusesource.mqtt-client
mqtt-client
diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java
index 5ff719ac5..880c28460 100644
--- a/broker/src/main/java/io/moquette/broker/PostOffice.java
+++ b/broker/src/main/java/io/moquette/broker/PostOffice.java
@@ -58,6 +58,7 @@
import java.util.stream.Collectors;
import static io.moquette.broker.Utils.messageId;
+import io.moquette.broker.subscriptions.SubscriptionCollection;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
@@ -842,7 +843,7 @@ private RoutingResults publish2Subscribers(String publisherClientId,
final boolean retainPublish = msg.fixedHeader().isRetain();
final Topic topic = new Topic(msg.variableHeader().topicName());
final MqttQoS publishingQos = msg.fixedHeader().qosLevel();
- List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
+ SubscriptionCollection topicMatchingSubscriptions = subscriptions.matchWithoutQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
// no matching subscriptions, clean exit
LOG.trace("No matching subscriptions for topic: {}", topic);
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
index faa349aa7..a37fe05c7 100644
--- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
@@ -22,6 +22,7 @@
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -33,52 +34,48 @@
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
+import org.pcollections.PMap;
+import org.pcollections.TreePMap;
class CNode implements Comparable {
public static final Random SECURE_RANDOM = new SecureRandom();
private final Token token;
- private final List children;
- // Sorted list of subscriptions. The sort is necessary for fast access, instead of linear scan.
- private List subscriptions;
+ private PMap children;
+ // Map of subscriptions per clientId.
+ private PMap subscriptions;
// the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan.
- private Map> sharedSubscriptions;
+ private PMap> sharedSubscriptions;
CNode(Token token) {
- this.children = new ArrayList<>();
- this.subscriptions = new ArrayList<>();
- this.sharedSubscriptions = new HashMap<>();
+ this.children = TreePMap.empty();
+ this.subscriptions = TreePMap.empty();
+ this.sharedSubscriptions = TreePMap.empty();
this.token = token;
}
//Copy constructor
- private CNode(Token token, List children, List subscriptions, Map> sharedSubscriptions) {
+ private CNode(Token token, PMap children, PMap subscriptions, PMap> sharedSubscriptions) {
this.token = token; // keep reference, root comparison in directory logic relies on it for now.
- this.subscriptions = new ArrayList<>(subscriptions);
- this.sharedSubscriptions = new HashMap<>(sharedSubscriptions);
- this.children = new ArrayList<>(children);
+ this.subscriptions = subscriptions;
+ this.sharedSubscriptions = sharedSubscriptions;
+ this.children = children;
}
public Token getToken() {
return token;
}
- List allChildren() {
- return new ArrayList<>(this.children);
+ Collection allChildren() {
+ return this.children.values();
}
Optional childOf(Token token) {
- int idx = findIndexForToken(token);
- if (idx < 0) {
+ INode value = children.get(token.name);
+ if (value == null) {
return Optional.empty();
}
- return Optional.of(children.get(idx));
- }
-
- private int findIndexForToken(Token token) {
- final INode tempTokenNode = new INode(new CNode(token));
- return Collections.binarySearch(children, tempTokenNode, (INode node, INode tokenHolder) -> node.mainNode().token.compareTo(tokenHolder.mainNode().token));
+ return Optional.of(value);
}
@Override
@@ -91,35 +88,23 @@ CNode copy() {
}
public void add(INode newINode) {
- int idx = findIndexForToken(newINode.mainNode().token);
- if (idx < 0) {
- children.add(-1 - idx, newINode);
- } else {
- children.add(idx, newINode);
- }
+ final String tokenName = newINode.mainNode().token.name;
+ children = children.plus(tokenName, newINode);
}
public INode remove(INode node) {
- int idx = findIndexForToken(node.mainNode().token);
- return this.children.remove(idx);
+ final String tokenName = node.mainNode().token.name;
+ INode toRemove = children.get(tokenName);
+ children = children.minus(tokenName);
+ return toRemove;
}
- private List sharedSubscriptions() {
- List selectedSubscriptions = new ArrayList<>(sharedSubscriptions.size());
- // for each sharedSubscription related to a ShareName, select one subscription
- for (Map.Entry> subsForName : sharedSubscriptions.entrySet()) {
- List list = subsForName.getValue();
- final String shareName = subsForName.getKey().getShareName();
- // select a subscription randomly
- int randIdx = SECURE_RANDOM.nextInt(list.size());
- SharedSubscription sub = list.get(randIdx);
- selectedSubscriptions.add(sub.createSubscription());
- }
- return selectedSubscriptions;
+ public PMap getSubscriptions() {
+ return subscriptions;
}
- List subscriptions() {
- return subscriptions;
+ public PMap> getSharedSubscriptions() {
+ return sharedSubscriptions;
}
// Mutating operation
@@ -141,25 +126,23 @@ CNode addSubscription(SubscriptionRequest request) {
final Subscription newSubscription = request.subscription();
// if already contains one with same topic and same client, keep that with higher QoS
- int idx = Collections.binarySearch(subscriptions, newSubscription);
- if (idx >= 0) {
+ final Subscription existing = subscriptions.get(newSubscription.clientId);
+ if (existing != null) {
// Subscription already exists
- final Subscription existing = subscriptions.get(idx);
if (needsToUpdateExistingSubscription(newSubscription, existing)) {
- subscriptions.set(idx, newSubscription);
+ subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription);
}
} else {
// insert into the expected index so that the sorting is maintained
- this.subscriptions.add(-1 - idx, newSubscription);
+ subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription);
}
}
return this;
}
private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) {
- if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) &&
- newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())
- ) {
+ if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier())
+ && newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())) {
// if subscription identifier hasn't changed,
// then check QoS but don't lower the requested QoS level
return existing.option().qos().value() < newSubscription.option().qos().value();
@@ -177,8 +160,8 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri
* AND at least one subscription is actually present for that clientId
* */
boolean containsOnly(String clientId) {
- for (Subscription sub : this.subscriptions) {
- if (!sub.clientId.equals(clientId)) {
+ for (String sub : this.subscriptions.keySet()) {
+ if (!sub.equals(clientId)) {
return false;
}
}
@@ -207,12 +190,7 @@ private static SharedSubscription wrapKey(String clientId) {
//TODO this is equivalent to negate(containsOnly(clientId))
private boolean containsSubscriptionsForClient(String clientId) {
- for (Subscription sub : this.subscriptions) {
- if (sub.clientId.equals(clientId)) {
- return true;
- }
- }
- return false;
+ return subscriptions.containsKey(clientId);
}
void removeSubscriptionsFor(UnsubscribeRequest request) {
@@ -226,20 +204,12 @@ void removeSubscriptionsFor(UnsubscribeRequest request) {
subscriptionsForName.removeAll(toRemove);
if (subscriptionsForName.isEmpty()) {
- this.sharedSubscriptions.remove(request.getSharedName());
+ sharedSubscriptions = sharedSubscriptions.minus(request.getSharedName());
} else {
- this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName);
+ sharedSubscriptions = sharedSubscriptions.plus(request.getSharedName(), subscriptionsForName);
}
} else {
- // collect Subscription instances to remove
- Set toRemove = new HashSet<>();
- for (Subscription sub : this.subscriptions) {
- if (sub.clientId.equals(clientId)) {
- toRemove.add(sub);
- }
- }
- // effectively remove the instances
- this.subscriptions.removeAll(toRemove);
+ subscriptions = subscriptions.minus(clientId);
}
}
@@ -248,11 +218,4 @@ public int compareTo(CNode o) {
return token.compareTo(o.token);
}
- public List sharedAndNonSharedSubscriptions() {
- List shared = sharedSubscriptions();
- List returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size());
- returnedSubscriptions.addAll(subscriptions);
- returnedSubscriptions.addAll(shared);
- return returnedSubscriptions;
- }
}
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
index a71d67961..9bdb03588 100644
--- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
@@ -118,6 +118,7 @@ public SubscriptionIdentifier getSubscriptionIdentifier() {
* Models a request to unsubscribe a client, it's carrier for the Subscription
* */
public final static class UnsubscribeRequest {
+
private final Topic topicFilter;
private final String clientId;
private boolean shared = false;
@@ -231,44 +232,56 @@ private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) {
return NavigationAction.STOP;
}
- public List recursiveMatch(Topic topicName) {
- return recursiveMatch(topicName, this.root, 0);
+ public SubscriptionCollection recursiveMatch(Topic topicName) {
+ SubscriptionCollection subscriptions = new SubscriptionCollection();
+ recursiveMatch(topicName, this.root, 0, subscriptions);
+ return subscriptions;
}
- private List recursiveMatch(Topic topicName, INode inode, int depth) {
+ private void recursiveMatch(Topic topicName, INode inode, int depth, SubscriptionCollection target) {
CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
- return Collections.emptyList();
+ return;
}
NavigationAction action = evaluate(topicName, cnode, depth);
if (action == NavigationAction.MATCH) {
- return cnode.sharedAndNonSharedSubscriptions();
+ target.addNormalSubscriptions(cnode.getSubscriptions());
+ target.addSharedSubscriptions(cnode.getSharedSubscriptions());
+ return;
}
if (action == NavigationAction.STOP) {
- return Collections.emptyList();
+ return;
}
- Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptHeadToken();
- List subscriptions = new ArrayList<>();
+ final boolean isRoot = ROOT.equals(cnode.getToken());
+ final boolean isSingle = Token.SINGLE.equals(cnode.getToken());
+ final boolean isMulti = Token.MULTI.equals(cnode.getToken());
+
+ Topic remainingTopic = isRoot
+ ? topicName
+ : (isSingle || isMulti)
+ ? topicName.exceptFullHeadToken()
+ : topicName.exceptHeadToken();
+ SubscriptionCollection subscriptions = new SubscriptionCollection();
// We should only consider the maximum three children children of
// type #, + or exact match
Optional subInode = cnode.childOf(Token.MULTI);
if (subInode.isPresent()) {
- subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
+ recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
subInode = cnode.childOf(Token.SINGLE);
if (subInode.isPresent()) {
- subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
+ recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
if (remainingTopic.isEmpty()) {
- subscriptions.addAll(cnode.sharedAndNonSharedSubscriptions());
+ target.addNormalSubscriptions(cnode.getSubscriptions());
+ target.addSharedSubscriptions(cnode.getSharedSubscriptions());
} else {
subInode = cnode.childOf(remainingTopic.headToken());
if (subInode.isPresent()) {
- subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
+ recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
}
- return subscriptions;
}
/**
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java
index e8efbac08..a7c15987e 100644
--- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java
@@ -24,7 +24,7 @@
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -78,34 +78,10 @@ public void init(ISubscriptionsRepository subscriptionsRepository) {
* @return the list of matching subscriptions, or empty if not matching.
*/
@Override
- public List matchWithoutQosSharpening(Topic topicName) {
+ public SubscriptionCollection matchWithoutQosSharpening(Topic topicName) {
return ctrie.recursiveMatch(topicName);
}
- @Override
- public List matchQosSharpening(Topic topicName) {
- final List subscriptions = matchWithoutQosSharpening(topicName);
-
- // for each session select the subscription with higher QoS
- return selectSubscriptionsWithHigherQoSForEachSession(subscriptions);
- }
-
- private static List selectSubscriptionsWithHigherQoSForEachSession(List subscriptions) {
- // for each session select the subscription with higher QoS
- Map subsGroupedByClient = new HashMap<>();
- for (Subscription sub : subscriptions) {
- // If same client is subscribed to two different shared subscription that overlaps
- // then it has to return both subscriptions, because the share name made them independent.
- final String key = sub.clientAndShareName();
- Subscription existingSub = subsGroupedByClient.get(key);
- // update the selected subscriptions if not present or if it has a greater qos
- if (existingSub == null || existingSub.qosLessThan(sub)) {
- subsGroupedByClient.put(key, sub);
- }
- }
- return new ArrayList<>(subsGroupedByClient.values());
- }
-
@Override
public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option);
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java
index 3647ee0b8..09c73b2db 100644
--- a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java
@@ -31,22 +31,21 @@ private String prettySubscriptions(CNode node) {
if (node instanceof TNode) {
return "TNode";
}
- if (node.subscriptions().isEmpty()) {
+ if (node.getSubscriptions().isEmpty()) {
return StringUtil.EMPTY_STRING;
}
StringBuilder subScriptionsStr = new StringBuilder(" ~~[");
int counter = 0;
- for (Subscription couple : node.subscriptions()) {
+ for (Subscription couple : node.getSubscriptions().values()) {
subScriptionsStr
.append("{filter=").append(couple.topicFilter).append(", ")
.append("option=").append(couple.option()).append(", ")
.append("client='").append(couple.clientId).append("'}");
counter++;
- if (counter < node.subscriptions().size()) {
- subScriptionsStr.append(";");
- }
+ subScriptionsStr.append(";");
}
- return subScriptionsStr.append("]").toString();
+ final int length = subScriptionsStr.length();
+ return subScriptionsStr.replace(length - 1, length, "]").toString();
}
private String indentTabs(int deep) {
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java
index 97af38035..d6e11c5c3 100644
--- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java
@@ -27,9 +27,7 @@ public interface ISubscriptionsDirectory {
void init(ISubscriptionsRepository sessionsRepository);
- List matchWithoutQosSharpening(Topic topic);
-
- List matchQosSharpening(Topic topic);
+ SubscriptionCollection matchWithoutQosSharpening(Topic topic);
boolean add(String clientId, Topic filter, MqttSubscriptionOption option);
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java
index 2ec25b0fe..3f5f66caf 100644
--- a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java
@@ -21,7 +21,7 @@
* Shared subscription's name.
*/
// It's public because used by PostOffice
-public final class ShareName {
+public final class ShareName implements Comparable{
private final String shareName;
public ShareName(String shareName) {
@@ -36,8 +36,8 @@ public boolean equals(Object o) {
return Objects.equals(shareName, (String) o);
}
if (getClass() != o.getClass()) return false;
- ShareName shareName1 = (ShareName) o;
- return Objects.equals(shareName, shareName1.shareName);
+ ShareName oShareName = (ShareName) o;
+ return Objects.equals(shareName, oShareName.shareName);
}
public String getShareName() {
@@ -55,4 +55,9 @@ public String toString() {
"shareName='" + shareName + '\'' +
'}';
}
+
+ @Override
+ public int compareTo(ShareName o) {
+ return shareName.compareTo(o.shareName);
+ }
}
diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java
new file mode 100644
index 000000000..8f2065b0c
--- /dev/null
+++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2012-2018 The original author or authors
+ * ------------------------------------------------------
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.moquette.broker.subscriptions;
+
+import static io.moquette.broker.subscriptions.CNode.SECURE_RANDOM;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * A wrapper over multiple maps of normal subscriptions.
+ */
+public class SubscriptionCollection implements Iterable {
+
+ private final List