diff --git a/broker/pom.xml b/broker/pom.xml index 05e14b9b1..ef19f29cc 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -187,6 +187,12 @@ 1.15 + + org.pcollections + pcollections + 4.0.2 + + org.fusesource.mqtt-client mqtt-client diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 5ff719ac5..880c28460 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; import static io.moquette.broker.Utils.messageId; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -842,7 +843,7 @@ private RoutingResults publish2Subscribers(String publisherClientId, final boolean retainPublish = msg.fixedHeader().isRetain(); final Topic topic = new Topic(msg.variableHeader().topicName()); final MqttQoS publishingQos = msg.fixedHeader().qosLevel(); - List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); + SubscriptionCollection topicMatchingSubscriptions = subscriptions.matchWithoutQosSharpening(topic); if (topicMatchingSubscriptions.isEmpty()) { // no matching subscriptions, clean exit LOG.trace("No matching subscriptions for topic: {}", topic); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index faa349aa7..a37fe05c7 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -22,6 +22,7 @@ import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -33,52 +34,48 @@ import java.util.Random; import java.util.Set; import java.util.stream.Collectors; +import org.pcollections.PMap; +import org.pcollections.TreePMap; class CNode implements Comparable { public static final Random SECURE_RANDOM = new SecureRandom(); private final Token token; - private final List children; - // Sorted list of subscriptions. The sort is necessary for fast access, instead of linear scan. - private List subscriptions; + private PMap children; + // Map of subscriptions per clientId. + private PMap subscriptions; // the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan. - private Map> sharedSubscriptions; + private PMap> sharedSubscriptions; CNode(Token token) { - this.children = new ArrayList<>(); - this.subscriptions = new ArrayList<>(); - this.sharedSubscriptions = new HashMap<>(); + this.children = TreePMap.empty(); + this.subscriptions = TreePMap.empty(); + this.sharedSubscriptions = TreePMap.empty(); this.token = token; } //Copy constructor - private CNode(Token token, List children, List subscriptions, Map> sharedSubscriptions) { + private CNode(Token token, PMap children, PMap subscriptions, PMap> sharedSubscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. - this.subscriptions = new ArrayList<>(subscriptions); - this.sharedSubscriptions = new HashMap<>(sharedSubscriptions); - this.children = new ArrayList<>(children); + this.subscriptions = subscriptions; + this.sharedSubscriptions = sharedSubscriptions; + this.children = children; } public Token getToken() { return token; } - List allChildren() { - return new ArrayList<>(this.children); + Collection allChildren() { + return this.children.values(); } Optional childOf(Token token) { - int idx = findIndexForToken(token); - if (idx < 0) { + INode value = children.get(token.name); + if (value == null) { return Optional.empty(); } - return Optional.of(children.get(idx)); - } - - private int findIndexForToken(Token token) { - final INode tempTokenNode = new INode(new CNode(token)); - return Collections.binarySearch(children, tempTokenNode, (INode node, INode tokenHolder) -> node.mainNode().token.compareTo(tokenHolder.mainNode().token)); + return Optional.of(value); } @Override @@ -91,35 +88,23 @@ CNode copy() { } public void add(INode newINode) { - int idx = findIndexForToken(newINode.mainNode().token); - if (idx < 0) { - children.add(-1 - idx, newINode); - } else { - children.add(idx, newINode); - } + final String tokenName = newINode.mainNode().token.name; + children = children.plus(tokenName, newINode); } public INode remove(INode node) { - int idx = findIndexForToken(node.mainNode().token); - return this.children.remove(idx); + final String tokenName = node.mainNode().token.name; + INode toRemove = children.get(tokenName); + children = children.minus(tokenName); + return toRemove; } - private List sharedSubscriptions() { - List selectedSubscriptions = new ArrayList<>(sharedSubscriptions.size()); - // for each sharedSubscription related to a ShareName, select one subscription - for (Map.Entry> subsForName : sharedSubscriptions.entrySet()) { - List list = subsForName.getValue(); - final String shareName = subsForName.getKey().getShareName(); - // select a subscription randomly - int randIdx = SECURE_RANDOM.nextInt(list.size()); - SharedSubscription sub = list.get(randIdx); - selectedSubscriptions.add(sub.createSubscription()); - } - return selectedSubscriptions; + public PMap getSubscriptions() { + return subscriptions; } - List subscriptions() { - return subscriptions; + public PMap> getSharedSubscriptions() { + return sharedSubscriptions; } // Mutating operation @@ -141,25 +126,23 @@ CNode addSubscription(SubscriptionRequest request) { final Subscription newSubscription = request.subscription(); // if already contains one with same topic and same client, keep that with higher QoS - int idx = Collections.binarySearch(subscriptions, newSubscription); - if (idx >= 0) { + final Subscription existing = subscriptions.get(newSubscription.clientId); + if (existing != null) { // Subscription already exists - final Subscription existing = subscriptions.get(idx); if (needsToUpdateExistingSubscription(newSubscription, existing)) { - subscriptions.set(idx, newSubscription); + subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription); } } else { // insert into the expected index so that the sorting is maintained - this.subscriptions.add(-1 - idx, newSubscription); + subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription); } } return this; } private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) { - if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) && - newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier()) - ) { + if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) + && newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())) { // if subscription identifier hasn't changed, // then check QoS but don't lower the requested QoS level return existing.option().qos().value() < newSubscription.option().qos().value(); @@ -177,8 +160,8 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri * AND at least one subscription is actually present for that clientId * */ boolean containsOnly(String clientId) { - for (Subscription sub : this.subscriptions) { - if (!sub.clientId.equals(clientId)) { + for (String sub : this.subscriptions.keySet()) { + if (!sub.equals(clientId)) { return false; } } @@ -207,12 +190,7 @@ private static SharedSubscription wrapKey(String clientId) { //TODO this is equivalent to negate(containsOnly(clientId)) private boolean containsSubscriptionsForClient(String clientId) { - for (Subscription sub : this.subscriptions) { - if (sub.clientId.equals(clientId)) { - return true; - } - } - return false; + return subscriptions.containsKey(clientId); } void removeSubscriptionsFor(UnsubscribeRequest request) { @@ -226,20 +204,12 @@ void removeSubscriptionsFor(UnsubscribeRequest request) { subscriptionsForName.removeAll(toRemove); if (subscriptionsForName.isEmpty()) { - this.sharedSubscriptions.remove(request.getSharedName()); + sharedSubscriptions = sharedSubscriptions.minus(request.getSharedName()); } else { - this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName); + sharedSubscriptions = sharedSubscriptions.plus(request.getSharedName(), subscriptionsForName); } } else { - // collect Subscription instances to remove - Set toRemove = new HashSet<>(); - for (Subscription sub : this.subscriptions) { - if (sub.clientId.equals(clientId)) { - toRemove.add(sub); - } - } - // effectively remove the instances - this.subscriptions.removeAll(toRemove); + subscriptions = subscriptions.minus(clientId); } } @@ -248,11 +218,4 @@ public int compareTo(CNode o) { return token.compareTo(o.token); } - public List sharedAndNonSharedSubscriptions() { - List shared = sharedSubscriptions(); - List returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size()); - returnedSubscriptions.addAll(subscriptions); - returnedSubscriptions.addAll(shared); - return returnedSubscriptions; - } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index a71d67961..9bdb03588 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -118,6 +118,7 @@ public SubscriptionIdentifier getSubscriptionIdentifier() { * Models a request to unsubscribe a client, it's carrier for the Subscription * */ public final static class UnsubscribeRequest { + private final Topic topicFilter; private final String clientId; private boolean shared = false; @@ -231,44 +232,56 @@ private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) { return NavigationAction.STOP; } - public List recursiveMatch(Topic topicName) { - return recursiveMatch(topicName, this.root, 0); + public SubscriptionCollection recursiveMatch(Topic topicName) { + SubscriptionCollection subscriptions = new SubscriptionCollection(); + recursiveMatch(topicName, this.root, 0, subscriptions); + return subscriptions; } - private List recursiveMatch(Topic topicName, INode inode, int depth) { + private void recursiveMatch(Topic topicName, INode inode, int depth, SubscriptionCollection target) { CNode cnode = inode.mainNode(); if (cnode instanceof TNode) { - return Collections.emptyList(); + return; } NavigationAction action = evaluate(topicName, cnode, depth); if (action == NavigationAction.MATCH) { - return cnode.sharedAndNonSharedSubscriptions(); + target.addNormalSubscriptions(cnode.getSubscriptions()); + target.addSharedSubscriptions(cnode.getSharedSubscriptions()); + return; } if (action == NavigationAction.STOP) { - return Collections.emptyList(); + return; } - Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptHeadToken(); - List subscriptions = new ArrayList<>(); + final boolean isRoot = ROOT.equals(cnode.getToken()); + final boolean isSingle = Token.SINGLE.equals(cnode.getToken()); + final boolean isMulti = Token.MULTI.equals(cnode.getToken()); + + Topic remainingTopic = isRoot + ? topicName + : (isSingle || isMulti) + ? topicName.exceptFullHeadToken() + : topicName.exceptHeadToken(); + SubscriptionCollection subscriptions = new SubscriptionCollection(); // We should only consider the maximum three children children of // type #, + or exact match Optional subInode = cnode.childOf(Token.MULTI); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } subInode = cnode.childOf(Token.SINGLE); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } if (remainingTopic.isEmpty()) { - subscriptions.addAll(cnode.sharedAndNonSharedSubscriptions()); + target.addNormalSubscriptions(cnode.getSubscriptions()); + target.addSharedSubscriptions(cnode.getSharedSubscriptions()); } else { subInode = cnode.childOf(remainingTopic.headToken()); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } } - return subscriptions; } /** diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index e8efbac08..a7c15987e 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,34 +78,10 @@ public void init(ISubscriptionsRepository subscriptionsRepository) { * @return the list of matching subscriptions, or empty if not matching. */ @Override - public List matchWithoutQosSharpening(Topic topicName) { + public SubscriptionCollection matchWithoutQosSharpening(Topic topicName) { return ctrie.recursiveMatch(topicName); } - @Override - public List matchQosSharpening(Topic topicName) { - final List subscriptions = matchWithoutQosSharpening(topicName); - - // for each session select the subscription with higher QoS - return selectSubscriptionsWithHigherQoSForEachSession(subscriptions); - } - - private static List selectSubscriptionsWithHigherQoSForEachSession(List subscriptions) { - // for each session select the subscription with higher QoS - Map subsGroupedByClient = new HashMap<>(); - for (Subscription sub : subscriptions) { - // If same client is subscribed to two different shared subscription that overlaps - // then it has to return both subscriptions, because the share name made them independent. - final String key = sub.clientAndShareName(); - Subscription existingSub = subsGroupedByClient.get(key); - // update the selected subscriptions if not present or if it has a greater qos - if (existingSub == null || existingSub.qosLessThan(sub)) { - subsGroupedByClient.put(key, sub); - } - } - return new ArrayList<>(subsGroupedByClient.values()); - } - @Override public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) { SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java index 3647ee0b8..09c73b2db 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java @@ -31,22 +31,21 @@ private String prettySubscriptions(CNode node) { if (node instanceof TNode) { return "TNode"; } - if (node.subscriptions().isEmpty()) { + if (node.getSubscriptions().isEmpty()) { return StringUtil.EMPTY_STRING; } StringBuilder subScriptionsStr = new StringBuilder(" ~~["); int counter = 0; - for (Subscription couple : node.subscriptions()) { + for (Subscription couple : node.getSubscriptions().values()) { subScriptionsStr .append("{filter=").append(couple.topicFilter).append(", ") .append("option=").append(couple.option()).append(", ") .append("client='").append(couple.clientId).append("'}"); counter++; - if (counter < node.subscriptions().size()) { - subScriptionsStr.append(";"); - } + subScriptionsStr.append(";"); } - return subScriptionsStr.append("]").toString(); + final int length = subScriptionsStr.length(); + return subScriptionsStr.replace(length - 1, length, "]").toString(); } private String indentTabs(int deep) { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java index 97af38035..d6e11c5c3 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java @@ -27,9 +27,7 @@ public interface ISubscriptionsDirectory { void init(ISubscriptionsRepository sessionsRepository); - List matchWithoutQosSharpening(Topic topic); - - List matchQosSharpening(Topic topic); + SubscriptionCollection matchWithoutQosSharpening(Topic topic); boolean add(String clientId, Topic filter, MqttSubscriptionOption option); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java index 2ec25b0fe..3f5f66caf 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java @@ -21,7 +21,7 @@ * Shared subscription's name. */ // It's public because used by PostOffice -public final class ShareName { +public final class ShareName implements Comparable{ private final String shareName; public ShareName(String shareName) { @@ -36,8 +36,8 @@ public boolean equals(Object o) { return Objects.equals(shareName, (String) o); } if (getClass() != o.getClass()) return false; - ShareName shareName1 = (ShareName) o; - return Objects.equals(shareName, shareName1.shareName); + ShareName oShareName = (ShareName) o; + return Objects.equals(shareName, oShareName.shareName); } public String getShareName() { @@ -55,4 +55,9 @@ public String toString() { "shareName='" + shareName + '\'' + '}'; } + + @Override + public int compareTo(ShareName o) { + return shareName.compareTo(o.shareName); + } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java new file mode 100644 index 000000000..8f2065b0c --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2012-2018 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.moquette.broker.subscriptions; + +import static io.moquette.broker.subscriptions.CNode.SECURE_RANDOM; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * A wrapper over multiple maps of normal subscriptions. + */ +public class SubscriptionCollection implements Iterable { + + private final List> normalSubscriptions = new ArrayList<>(); + private final List>> sharedSubscriptions = new ArrayList<>(); + + public boolean isEmpty() { + return normalSubscriptions.isEmpty() && sharedSubscriptions.isEmpty(); + } + + /** + * Calculates the number of subscriptions. Expensive, only use for tests! + * @return the number of subscriptions. + */ + public int size() { + int total = 0; + for (Map var : normalSubscriptions) { + total += var.size(); + } + for (Map> var : sharedSubscriptions) { + total += var.size(); + } + return total; + } + + public void addNormalSubscriptions(Map subs) { + if (subs.isEmpty()) { + return; + } + normalSubscriptions.add(subs); + } + + public void addSharedSubscriptions(Map> subs) { + if (sharedSubscriptions.isEmpty()) { + return; + } + sharedSubscriptions.add(subs); + } + + private static Subscription selectRandom(List list) { + // select a subscription randomly + int randIdx = SECURE_RANDOM.nextInt(list.size()); + return list.get(randIdx).createSubscription(); + } + + @Override + public Iterator iterator() { + return new IteratorImpl(this); + } + + private static class IteratorImpl implements Iterator { + + private Iterator> normapSubListIter; + private Iterator normalSubIter; + + private Iterator>> sharedSubMapIter; + private Iterator> sharedSubIter; + + public IteratorImpl(SubscriptionCollection parent) { + normapSubListIter = parent.normalSubscriptions.iterator(); + sharedSubMapIter = parent.sharedSubscriptions.iterator(); + } + + @Override + public boolean hasNext() { + if (normalSubIter != null && normalSubIter.hasNext()) { + return true; + } + if (sharedSubIter != null && sharedSubIter.hasNext()) { + return true; + } + if (normapSubListIter != null) { + if (normapSubListIter.hasNext()) { + // Get the next normal subscriptions iterator. + Map next = normapSubListIter.next(); + normalSubIter = next.values().iterator(); + return true; + } else { + // Reached the end of the normal subscriptions lists. + normapSubListIter = null; + } + } + if (sharedSubMapIter != null) { + if (sharedSubMapIter.hasNext()) { + Map> next = sharedSubMapIter.next(); + sharedSubIter = next.values().iterator(); + return true; + } else { + sharedSubMapIter = null; + } + } + return false; + } + + @Override + public Subscription next() { + if (normalSubIter != null) { + return normalSubIter.next(); + } + if (sharedSubIter != null) { + return selectRandom(sharedSubIter.next()); + } + throw new NoSuchElementException("Fetched past the end of Iterator, make sure to call hasNext!"); + } + } + +} diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java index 863e944c3..9d06835d9 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java @@ -19,11 +19,11 @@ class SubscriptionCounterVisitor implements CTrie.IVisitor { - private AtomicInteger accumulator = new AtomicInteger(0); + private final AtomicInteger accumulator = new AtomicInteger(0); @Override public void visit(CNode node, int deep) { - accumulator.addAndGet(node.subscriptions().size()); + accumulator.addAndGet(node.getSubscriptions().size()); } @Override diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Token.java b/broker/src/main/java/io/moquette/broker/subscriptions/Token.java index ca3544ed1..64bc8cf27 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Token.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Token.java @@ -25,15 +25,29 @@ public class Token implements Comparable { static final Token MULTI = new Token("#"); static final Token SINGLE = new Token("+"); final String name; + boolean lastSubToken; protected Token(String s) { + this(s, true); + } + + protected Token(String s, boolean isLastSub) { name = s; + lastSubToken = isLastSub; } protected String name() { return name; } + protected void setLastSubToken(boolean lastSubToken) { + this.lastSubToken = lastSubToken; + } + + protected boolean isLastSubToken() { + return lastSubToken; + } + @Override public int hashCode() { int hash = 7; diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java index 96940e116..3778f6571 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java @@ -31,6 +31,8 @@ public class Topic implements Serializable, Comparable { private static final Logger LOG = LoggerFactory.getLogger(Topic.class); + public static int MAX_TOKEN_LENGTH = 4; + private static final long serialVersionUID = 2438799283749822L; private final String topic; @@ -55,7 +57,7 @@ public Topic(String topic) { Topic(List tokens) { this.tokens = tokens; - List strTokens = tokens.stream().map(Token::toString).collect(Collectors.toList()); + List strTokens = fullTokens().stream().map(Token::toString).collect(Collectors.toList()); this.topic = String.join("/", strTokens); this.valid = true; } @@ -74,7 +76,24 @@ public List getTokens() { return tokens; } - private List parseTopic(String topic) throws ParseException { + public List fullTokens() { + List fullTokens = new ArrayList<>(); + String currentToken = null; + for (Token token : getTokens()) { + if (currentToken == null) { + currentToken = token.name; + } else { + currentToken += token.name; + } + if (token.isLastSubToken()) { + fullTokens.add(new Token(currentToken, true)); + currentToken = null; + } + } + return fullTokens; + } + + private static List parseTopic(String topic) throws ParseException { if (topic.length() == 0) { throw new ParseException("Bad format of topic, topic MUST be at least 1 character [MQTT-4.7.3-1] and " + "this was empty", 0); @@ -117,7 +136,18 @@ private List parseTopic(String topic) throws ParseException { } else if (s.contains("+")) { throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i); } else { - res.add(new Token(s)); + final int l = s.length(); + int start = 0; + Token token = null; + while (start < l) { + int end = Math.min(start + MAX_TOKEN_LENGTH, l); + final String subToken = s.substring(start, end); + token = new Token(subToken, false); + res.add(token); + start = end; + } + // Can't be null because s can't be empty. + token.setLastSubToken(true); } } @@ -151,6 +181,22 @@ public Topic exceptHeadToken() { return new Topic(tokensCopy); } + /** + * @return a new Topic corresponding to this less than the full head token, skipping any sub-tokens. + */ + public Topic exceptFullHeadToken() { + List tokens = getTokens(); + if (tokens.isEmpty()) { + return new Topic(Collections.emptyList()); + } + List tokensCopy = new ArrayList<>(tokens); + Token removed; + do { + removed = tokensCopy.remove(0); + } while (!removed.isLastSubToken() && !tokensCopy.isEmpty()); + return new Topic(tokensCopy); + } + public boolean isValid() { if (tokens == null) getTokens(); @@ -169,14 +215,16 @@ public boolean isValid() { public boolean match(Topic subscriptionTopic) { List msgTokens = getTokens(); List subscriptionTokens = subscriptionTopic.getTokens(); + // Due to sub-tokens and the + wildcard, indexes may differ. int i = 0; - for (; i < subscriptionTokens.size(); i++) { + int m = 0; + for (; i < subscriptionTokens.size(); i++, m++) { Token subToken = subscriptionTokens.get(i); if (!Token.MULTI.equals(subToken) && !Token.SINGLE.equals(subToken)) { - if (i >= msgTokens.size()) { + if (m >= msgTokens.size()) { return false; } - Token msgToken = msgTokens.get(i); + Token msgToken = msgTokens.get(m); if (!msgToken.equals(subToken)) { return false; } @@ -184,12 +232,20 @@ public boolean match(Topic subscriptionTopic) { if (Token.MULTI.equals(subToken)) { return true; } -// if (Token.SINGLE.equals(subToken)) { -// // skip a step forward -// } + if (m >= msgTokens.size()) { + return false; + } + if (Token.SINGLE.equals(subToken)) { + // skip to the next full token in the message topic + Token msgToken = msgTokens.get(m); + while (!msgToken.isLastSubToken()) { + m++; + msgToken = msgTokens.get(m); + } + } } } - return i == msgTokens.size(); + return m == msgTokens.size(); } @Override diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 5c285ad14..626e79b89 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -37,6 +37,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singleton; @@ -338,7 +339,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index b3314fb6b..dfbf46dbf 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -40,6 +40,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -198,7 +199,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index f0b25dcff..8443b7bc8 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -43,6 +43,7 @@ import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID; import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; import static java.util.Collections.singleton; @@ -146,7 +147,7 @@ protected void subscribe(EmbeddedChannel channel, String topic, MqttQoS desiredQ final String clientId = NettyUtils.clientID(channel); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); @@ -166,7 +167,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 13bda420f..279e62995 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -39,6 +39,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; import java.util.List; @@ -125,7 +126,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); //assertTrue(matchedSubscriptions.size() >=1); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java index cb83fcb6f..bc72414fb 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java @@ -54,7 +54,7 @@ public void whenManySharedSubscriptionsOfDifferentShareNameMatchATopicThenOneSub sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); sut.addShared("TempSensor1", new ShareName("livingroom_devices"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); - List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); + SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions) .containsOnly(SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "temp_sensors"), SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "livingroom_devices")) @@ -71,7 +71,7 @@ public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenN sut.removeSharedSubscriptionsForClient(clientId); // Verify - List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); + SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions).isEmpty(); } @@ -82,7 +82,7 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe new SubscriptionIdentifier(1)); // verify it contains the subscription identifier - final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1), "share_temp"); // update the subscription of same clientId on same topic filter but with different subscription identifier @@ -90,11 +90,11 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe new SubscriptionIdentifier(123)); // verify the subscription identifier is updated - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123), "share_temp"); } - private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { + private static void verifySubscriptionIdentifierIsPresent(SubscriptionCollection matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { assertAll("subscription contains the subscription identifier", () -> assertEquals(1, matchingSubscriptions.size()), () -> assertEquals(expectedShareName, matchingSubscriptions.iterator().next().shareName), @@ -111,13 +111,13 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); - verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); + verifySubscriptionIdentifierIsPresent(sut.matchWithoutQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); // update the subscription of same clientId on same topic filter but removing subscription identifier sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertAll("subscription doesn't contain subscription identifier", () -> assertEquals(1, reloadedSubscriptions.size()), () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 60660cbfd..7f732cd53 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -24,8 +24,12 @@ import java.util.Random; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,88 +37,326 @@ public class CTrieSpeedTest { private static final Logger LOGGER = LoggerFactory.getLogger(CTrieSpeedTest.class.getName()); - private static final int MAX_DURATION_S = 5 * 60; - private static final int CHECK_INTERVAL = 50_000; - private static final int TOTAL_SUBSCRIPTIONS = 500_000; + private static final int MAX_DURATION_S = 50 * 60; + private static final int CHECK_INTERVAL = 5000_000; + private static final int TOTAL_SUBSCRIPTIONS = 1_000_000; + + private static final Map>> TEST_RESULTS_ADD = new TreeMap<>(); + private static final Map>> TEST_RESULTS_READ = new TreeMap<>(); + private static final Map>> TEST_RESULTS_REMOVE = new TreeMap<>(); + + private static void addAddResult(TestResult result) { + addResult(TEST_RESULTS_ADD, result); + } + + private static void addReadResult(TestResult result) { + addResult(TEST_RESULTS_READ, result); + } + + private static void addRemoveResult(TestResult result) { + addResult(TEST_RESULTS_REMOVE, result); + } + + private static void addResult(Map>> set, TestResult result) { + set.computeIfAbsent(result.threads, t -> new TreeMap<>()) + .computeIfAbsent(result.maxLength, t -> new ArrayList<>()) + .add(result); + } + + private static void logResults() { + LOGGER.info("Results for Adding:"); + logResults(TEST_RESULTS_ADD); + LOGGER.info("Results for Reading:"); + logResults(TEST_RESULTS_READ); + LOGGER.info("Results for Removing:"); + logResults(TEST_RESULTS_REMOVE); + } + + private static void clearResults() { + TEST_RESULTS_ADD.clear(); + TEST_RESULTS_READ.clear(); + TEST_RESULTS_REMOVE.clear(); + } + + private static void logResults(Map>> set) { + StringBuilder header = new StringBuilder(set.values().iterator().next().values().iterator().next().get(0).topicCount+" topics"); + TreeMap rowPerLength = new TreeMap<>(); + for (Map.Entry>> entry : set.entrySet()) { + Integer threads = entry.getKey(); + Map> lengthMap = entry.getValue(); + header.append(',').append(threads); + for (Map.Entry> innerEntry : lengthMap.entrySet()) { + Integer length = innerEntry.getKey(); + List results = innerEntry.getValue(); + int count = results.size(); + double durationAvg = 0; + for (TestResult result : results) { + durationAvg += 1.0 * result.durationMs / count; + } + rowPerLength.computeIfAbsent(length, t -> new StringBuilder("" + t)).append(',').append(Math.round(durationAvg)); + } + } + LOGGER.info(header.toString()); + for (StringBuilder row : rowPerLength.values()) { + LOGGER.info("{}", row); + } + } static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) { return SubscriptionRequest.buildNonShared(clientID, asTopic(topicName), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); } + @Disabled @Test - @Timeout(value = MAX_DURATION_S) - public void testManyClientsFewTopics() { - List subscriptionList = prepareSubscriptionsManyClientsFewTopic(); - createSubscriptions(subscriptionList); + public void testManyClientsFewTopics() throws InterruptedException, Exception { + LOGGER.info("testManyClientsFewTopics"); + clearResults(); + Callable> subCreate = () -> prepareSubscriptionsManyClientsFewTopic(20_000); + test(subCreate); } + @Disabled @Test - @Timeout(value = MAX_DURATION_S) - public void testFlat() { - List results = prepareSubscriptionsFlat(); - createSubscriptions(results); + public void testFlat() throws Exception { + LOGGER.info("TestFlat"); + clearResults(); + Callable> subCreate = () -> prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS); + test(subCreate); } + @Disabled @Test - @Timeout(value = MAX_DURATION_S) - public void testDeep() { - List results = prepareSubscriptionsDeep(); - createSubscriptions(results); + public void testDeep() throws InterruptedException, Exception { + LOGGER.info("TestDeep"); + clearResults(); + Callable> subCreate = () -> prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS); + test(subCreate); } - public void createSubscriptions(List results) { - int count = 0; + private void test(Callable> subCreate) throws Exception { + for (int length : new int[]{1, 2, 3, 4, 5, 6, 7, 8, 99}) { + for (int threads : new int[]{1, 2, 4, 6, 8, 12, 16}) { + test(subCreate, threads, length); + test(subCreate, threads, length); + test(subCreate, threads, length); + logResults(); + } + } + logResults(); + } + + private void test(Callable> subCreate, int threadCount, int maxLength) throws Exception { + Topic.MAX_TOKEN_LENGTH = maxLength; + final List subRequests = subCreate.call(); + CTrie tree = createSubscriptions(subRequests, threadCount); + readSubscriptions(tree, subRequests, threadCount); + removeSubscriptions(tree, subRequests, threadCount); + } + + public CTrie createSubscriptions(List subsToAdd, int threadCount) throws InterruptedException { long start = System.currentTimeMillis(); - int log = CHECK_INTERVAL; CTrie tree = new CTrie(); - for (SubscriptionRequest result : results) { - tree.addToTree(result); - count++; - log--; - if (log <= 0) { - log = CHECK_INTERVAL; - long end = System.currentTimeMillis(); - long duration = end - start; - LOGGER.info("Added {} subscriptions in {} ms ({}/ms)", count, duration, Math.round(1.0 * count / duration)); - } - if (Thread.currentThread().isInterrupted()) { - return; - } + List> subLists = new ArrayList<>(); + final int total = subsToAdd.size(); + int perList = total / threadCount; + int startIdx = 0; + for (int idx = 0; idx < threadCount - 1; idx++) { + int endIdx = startIdx + perList; + subLists.add(subsToAdd.subList(startIdx, endIdx)); + startIdx = endIdx; } + subLists.add(subsToAdd.subList(startIdx, total)); + + List threads = new ArrayList<>(); + for (int idx = 0; idx < threadCount; idx++) { + final List workList = subLists.get(idx); + threads.add(new Thread(() -> { + int log = CHECK_INTERVAL; + int count = 0; + for (SubscriptionRequest result : workList) { + tree.addToTree(result); + count++; + log--; + if (log <= 0) { + log = CHECK_INTERVAL; + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.info("Threads {}, Subs {}, time(ms) {}, perMs {}", threadCount, count, duration, Math.round(1.0 * count / duration)); + } + if (Thread.currentThread().isInterrupted()) { + return; + } + } + })); + } + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.info("Added " + count + " subscriptions in " + duration + " ms (" + Math.round(1000.0 * count / duration) + "/s)"); + final long speed = Math.round(1000.0 * total / duration); + LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); + addAddResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + return tree; } - public List prepareSubscriptionsManyClientsFewTopic() { - List subscriptionList = new ArrayList<>(TOTAL_SUBSCRIPTIONS); - for (int i = 0; i < TOTAL_SUBSCRIPTIONS; i++) { - Topic topic = asTopic("topic/test/" + new Random().nextInt(1 + i % 10) + "/test"); - subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); + public CTrie removeSubscriptions(CTrie tree, List subsToAdd, int threadCount) throws InterruptedException { + long start = System.currentTimeMillis(); + + List> subLists = new ArrayList<>(); + final int total = subsToAdd.size(); + int perList = total / threadCount; + int startIdx = 0; + for (int idx = 0; idx < threadCount - 1; idx++) { + int endIdx = startIdx + perList; + subLists.add(subsToAdd.subList(startIdx, endIdx)); + startIdx = endIdx; } + subLists.add(subsToAdd.subList(startIdx, total)); + + List threads = new ArrayList<>(); + for (int idx = 0; idx < threadCount; idx++) { + final List workList = subLists.get(idx); + threads.add(new Thread(() -> { + int log = CHECK_INTERVAL; + int count = 0; + for (SubscriptionRequest subReq : workList) { + tree.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared(subReq.getClientId(), subReq.getTopicFilter())); + count++; + log--; + if (log <= 0) { + log = CHECK_INTERVAL; + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.info("Threads {}, Subs {}, time(ms) {}, perMs {}", threadCount, count, duration, Math.round(1.0 * count / duration)); + } + if (Thread.currentThread().isInterrupted()) { + return; + } + } + })); + } + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + + long end = System.currentTimeMillis(); + long duration = end - start; + final long speed = Math.round(1000.0 * total / duration); + LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); + addRemoveResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + return tree; + } + + public void readSubscriptions(CTrie tree, List subsToRead, int threadCount) throws InterruptedException { + final long start1 = System.currentTimeMillis(); + List> subLists = new ArrayList<>(); + final int total = subsToRead.size(); + int perList = total / threadCount; + int startIdx = 0; + for (int idx = 0; idx < threadCount - 1; idx++) { + int endIdx = startIdx + perList; + subLists.add(subsToRead.subList(startIdx, endIdx)); + startIdx = endIdx; + } + subLists.add(subsToRead.subList(startIdx, total)); + + List threads = new ArrayList<>(); + for (int idx = 0; idx < threadCount; idx++) { + final List workList = subLists.get(idx); + threads.add(new Thread(() -> { + int log = CHECK_INTERVAL; + int count = 0; + for (SubscriptionRequest subReq : workList) { + + final Subscription subscription = subReq.subscription(); + final Topic topic = subReq.getTopicFilter(); + final SubscriptionCollection recursiveMatch = tree.recursiveMatch(topic); + boolean contains = false; + for (Subscription sub : recursiveMatch) { + if (sub.equals(subscription)) { + contains = true; + break; + } + } + Assertions.assertTrue(contains, () -> "Failed to find " + subscription + " on " + topic + " found " + recursiveMatch.size() + " matches."); + + count++; + log--; + if (log <= 0) { + log = CHECK_INTERVAL; + long end = System.currentTimeMillis(); + long duration = end - start1; + LOGGER.info("Threads {}, Subs {}, time(ms) {}, perMs {}", threadCount, count, duration, Math.round(1.0 * count / duration)); + } + if (Thread.currentThread().isInterrupted()) { + return; + } + } + })); + } + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + + long end = System.currentTimeMillis(); + long duration = end - start1; + final long speed = Math.round(1000.0 * total / duration); + addReadResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); + } + + public List prepareSubscriptionsManyClientsFewTopic(int subCount) { + List subscriptionList = new ArrayList<>(subCount); + int total = 0; + long start = System.currentTimeMillis(); + int groupSize = subCount / 10; + for (int i = 0; i < groupSize; i++) { + for (int group = 0; group < 10; group++) { + int idx = group * groupSize + i; + Topic topic = asTopic("topic/test"); + subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + idx, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); + total++; + } + } + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.info("Prepared {} subscriptions in {} ms on 10 topics", total, duration); return subscriptionList; } - public List prepareSubscriptionsFlat() { - List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); + public List prepareSubscriptionsFlat(int subCount) { + List results = new ArrayList<>(subCount); int count = 0; long start = System.currentTimeMillis(); - for (int topicNr = 0; topicNr < TOTAL_SUBSCRIPTIONS / 10; topicNr++) { - for (int clientNr = 0; clientNr < 10; clientNr++) { + final int clientCount = 1; + final int topicCount = subCount / clientCount; + for (int clientNr = 0; clientNr < clientCount; clientNr++) { + for (int topicNr = 0; topicNr < topicCount; topicNr++) { count++; - results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + topicNr)); + results.add(clientSubOnTopic("Client-" + clientNr, topicNr + "-mainTopic")); } } long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.info("Prepared {} subscriptions in {} ms", count, duration); + LOGGER.debug("Prepared {} subscriptions for {} topics in {} ms", count, topicCount, duration); return results; } - public List prepareSubscriptionsDeep() { - List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); - long countPerLevel = Math.round(Math.pow(TOTAL_SUBSCRIPTIONS, 0.25)); - LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", TOTAL_SUBSCRIPTIONS, countPerLevel); + public List prepareSubscriptionsDeep(int subCount) { + List results = new ArrayList<>(subCount); + long countPerLevel = Math.round(Math.pow(subCount, 0.25)); + LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", subCount, countPerLevel); int count = 0; long start = System.currentTimeMillis(); outerloop: @@ -125,7 +367,7 @@ public List prepareSubscriptionsDeep() { count++; results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + firstLevelNr + "/subTopic-" + secondLevelNr + "/subSubTopic" + thirdLevelNr)); // Due to the 4th-power-root we don't get exactly the required number of subs. - if (count >= TOTAL_SUBSCRIPTIONS) { + if (count >= subCount) { break outerloop; } } @@ -138,4 +380,19 @@ public List prepareSubscriptionsDeep() { return results; } + private static class TestResult { + + public final int threads; + public final int maxLength; + public final int topicCount; + public final long durationMs; + + public TestResult(int threads, int maxLength, int topicCount, long durationMs) { + this.threads = threads; + this.maxLength = maxLength; + this.topicCount = topicCount; + this.durationMs = durationMs; + } + + } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index 7d1438732..b6f31d88a 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -15,7 +15,6 @@ */ package io.moquette.broker.subscriptions; - import io.moquette.broker.ISubscriptionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +26,7 @@ import static io.moquette.broker.subscriptions.CTrieSharedSubscriptionDirectoryMatchingTest.asOption; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.ArrayList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; @@ -198,7 +198,7 @@ public void testOverlappingSubscriptions() { sut.add(specificSub.clientId, specificSub.topicFilter, specificSub.option()); //Exercise - final List matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); + final SubscriptionCollection matchingForSpecific = sut.matchWithoutQosSharpening(asTopic("a/b")); // Verify assertThat(matchingForSpecific.size()).isEqualTo(1); @@ -226,7 +226,7 @@ public void removeSubscription_sameClients_subscribedSameTopic() { sut.removeSubscription(asTopic("/topic"), "Sensor1"); // Verify - final List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); assertThat(matchingSubscriptions).isEmpty(); } @@ -244,14 +244,17 @@ public void duplicatedSubscriptionsWithDifferentQos() { this.sut.add("client1", asTopic("client/test/b"), asOption(MqttQoS.EXACTLY_ONCE)); // Verify - List subscriptions = this.sut.matchQosSharpening(asTopic("client/test/b")); + SubscriptionCollection subscriptions = this.sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertThat(subscriptions).contains(client1SubQoS2); assertThat(subscriptions).contains(client2Sub); - final Optional matchingClient1Sub = subscriptions - .stream() - .filter(s -> s.equals(client1SubQoS0)) - .findFirst(); + Optional matchingClient1Sub = Optional.empty(); + for (Subscription sub : subscriptions) { + if (sub.equals(client1SubQoS0)) { + matchingClient1Sub = Optional.of(sub); + break; + } + } assertTrue(matchingClient1Sub.isPresent()); Subscription client1Sub = matchingClient1Sub.get(); @@ -267,18 +270,18 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier - final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1)); // update the subscription of same clientId on same topic filter but with different subscription identifier sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(123)); // verify the subscription identifier is updated - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123)); } - private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { + private static void verifySubscriptionIdentifierIsPresent(SubscriptionCollection matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { assertAll("subscription contains the subscription identifier", () -> assertEquals(1, matchingSubscriptions.size()), () -> assertTrue(matchingSubscriptions.iterator().next().hasSubscriptionIdentifier()), @@ -293,13 +296,13 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); - verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); + verifySubscriptionIdentifierIsPresent(sut.matchWithoutQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); // update the subscription of same clientId on same topic filter but removing subscription identifier sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertAll("subscription doesn't contain subscription identifier", () -> assertEquals(1, reloadedSubscriptions.size()), () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 524aab39b..fd0c42e12 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -15,7 +15,6 @@ */ package io.moquette.broker.subscriptions; - import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; @@ -26,11 +25,11 @@ import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription; import static io.moquette.broker.subscriptions.Topic.asTopic; -import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.pcollections.PMap; public class CTrieTest { @@ -51,15 +50,15 @@ public void testAddOnSecondLayerWithEmptyTokenOnEmptyTree() { final Optional matchedNode = sut.lookup(asTopic("/")); assertTrue(matchedNode.isPresent(), "Node on path / must be present"); //verify structure, only root INode and the first CNode should be present - assertThat(this.sut.root.mainNode().subscriptions()).isEmpty(); + assertThat(this.sut.root.mainNode().getSubscriptions()).isEmpty(); assertThat(this.sut.root.mainNode().allChildren()).isNotEmpty(); - INode firstLayer = this.sut.root.mainNode().allChildren().get(0); - assertThat(firstLayer.mainNode().subscriptions()).isEmpty(); + INode firstLayer = this.sut.root.mainNode().allChildren().stream().findFirst().get(); + assertThat(firstLayer.mainNode().getSubscriptions()).isEmpty(); assertThat(firstLayer.mainNode().allChildren()).isNotEmpty(); - INode secondLayer = firstLayer.mainNode().allChildren().get(0); - assertThat(secondLayer.mainNode().subscriptions()).isNotEmpty(); + INode secondLayer = firstLayer.mainNode().allChildren().stream().findFirst().get(); + assertThat(secondLayer.mainNode().getSubscriptions()).isNotEmpty(); assertThat(secondLayer.mainNode().allChildren()).isEmpty(); } @@ -72,7 +71,7 @@ public void testAddFirstLayerNodeOnEmptyTree() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - assertFalse(matchedNode.get().subscriptions().isEmpty()); + assertFalse(matchedNode.get().getSubscriptions().isEmpty()); } @Test @@ -99,8 +98,8 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final List subscriptions = matchedNode.get().subscriptions(); - assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp"))); + final PMap subscriptions = matchedNode.get().getSubscriptions(); + assertTrue(subscriptions.containsValue(asSubscription("TempSensor2", "/temp"))); } @Test @@ -117,8 +116,8 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final List subscriptions = matchedNode.get().subscriptions(); - assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness"))); + final PMap subscriptions = matchedNode.get().getSubscriptions(); + assertTrue(subscriptions.containsValue(asSubscription("HappinessSensor", "/italy/happiness"))); } static SubscriptionRequest clientSubOnTopic(String clientID, String topicFilter) { @@ -191,7 +190,7 @@ public void givenTreeWithSomeNodeHierarchyWhenRemoveContainedSubscriptionThenNod sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp/1"))); sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp/1"))); - final List matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); + final SubscriptionCollection matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp/2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -208,8 +207,8 @@ public void givenTreeWithSomeNodeHierarchWhenRemoveContainedSubscriptionSmallerT //Exercise sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp"))); - final List matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); //Verify // not clear to me, but I believe /temp unsubscribe should not unsub you from downstream /temp/1 or /temp/2 @@ -237,7 +236,7 @@ public void testMatchSubscriptionNoWildcards() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs = sut.recursiveMatch(asTopic("/temp")); + final SubscriptionCollection matchingSubs = sut.recursiveMatch(asTopic("/temp")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -252,8 +251,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -265,8 +264,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("temp"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -280,8 +279,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -293,8 +292,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("temp"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -308,8 +307,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -321,8 +320,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor2", asTopic("temp/1"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).contains(expectedMatchingsub1); assertThat(matchingSubs4).doesNotContain(expectedMatchingsub2); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java index 134213f6a..49ee4d565 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java @@ -145,7 +145,7 @@ public TopicAssert doesNotMatch(String topic) { } public TopicAssert containsToken(Object... tokens) { - Assertions.assertThat(actual.getTokens()).containsExactly(asArray(tokens)); + Assertions.assertThat(actual.fullTokens()).containsExactly(asArray(tokens)); return myself; } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 1eafab0ad..12ea43c8d 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -35,9 +35,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RequestResponseTest extends AbstractServerIntegrationWithoutClientFixture { + private static final Logger LOGGER = LoggerFactory.getLogger(RequestResponseTest.class.getName()); + @Test public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply() throws InterruptedException { final Mqtt5BlockingClient requester = createHiveBlockingClient("requester"); @@ -60,9 +64,11 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo .topicFilter("requester/door/open") .qos(MqttQos.AT_LEAST_ONCE) .build(); + LOGGER.info("Subscribing to on requester/door/open"); responder.toAsync().subscribe(subscribeToRequest, (Mqtt5Publish pub) -> { assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); + LOGGER.info("Responding on {}", pub.getResponseTopic().get()); Mqtt5PublishResult responseResult = responder.publishWith() .topic(pub.getResponseTopic().get()) .payload("OK".getBytes(StandardCharsets.UTF_8))