diff --git a/src/main/java/net/openhft/chronicle/network/cluster/ConnectionManager.java b/src/main/java/net/openhft/chronicle/network/cluster/ConnectionManager.java index 944b0ee847..843cfa2853 100644 --- a/src/main/java/net/openhft/chronicle/network/cluster/ConnectionManager.java +++ b/src/main/java/net/openhft/chronicle/network/cluster/ConnectionManager.java @@ -21,20 +21,24 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.IdentityHashMap; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static java.util.Collections.newSetFromMap; - public class ConnectionManager> { private static final int EMPTY_SEQUENCE = -1; - private final Set> connectionListeners = newSetFromMap(new IdentityHashMap<>()); + private final List> connectionListeners = new ArrayList<>(); private final AtomicInteger lastListenerAddedSequence = new AtomicInteger(EMPTY_SEQUENCE); public synchronized void addListener(@NotNull ConnectionListener connectionListener) { + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < connectionListeners.size(); i++) { + if (connectionListeners.get(i).connectionListener() == connectionListener) { + return; + } + } connectionListeners.add(new ConnectionListenerHolder<>(lastListenerAddedSequence.incrementAndGet(), connectionListener)); } @@ -75,16 +79,18 @@ public EventEmitterToken onConnectionChanged(boolean isConnected, private synchronized void executeListenersWithSequenceGreaterThan(int lowerSequenceLimit, @NotNull final T nc, @NotNull EventEmitterToken token) { - connectionListeners.forEach(l -> { + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < connectionListeners.size(); i++) { + ConnectionListenerHolder l = connectionListeners.get(i); if (l.sequence > lowerSequenceLimit) { try { - ((ConnectionListenerHolder)l).connectionListener.onConnectionChange(nc, token.connected.get()); + l.connectionListener.onConnectionChange(nc, token.connected.get()); } catch (IllegalStateException ignore) { // this is already logged } token.latestSequenceExecuted = Math.max(token.latestSequenceExecuted, l.sequence); } - }); + } } @FunctionalInterface @@ -105,10 +111,14 @@ private static final class ConnectionListenerHolder> private final int sequence; private final ConnectionListener connectionListener; - public ConnectionListenerHolder(int sequence, ConnectionListener connectionListener) { + public ConnectionListenerHolder(int sequence, @NotNull ConnectionListener connectionListener) { this.sequence = sequence; this.connectionListener = connectionListener; } + + public ConnectionListener connectionListener() { + return connectionListener; + } } /** diff --git a/src/test/java/net/openhft/chronicle/network/cluster/ConnectionManagerTest.java b/src/test/java/net/openhft/chronicle/network/cluster/ConnectionManagerTest.java index b39c066b5a..9dc5ce3286 100644 --- a/src/test/java/net/openhft/chronicle/network/cluster/ConnectionManagerTest.java +++ b/src/test/java/net/openhft/chronicle/network/cluster/ConnectionManagerTest.java @@ -130,6 +130,14 @@ void executeNewListenersWorksWhenThereAreNoListeners() { verify(listener1).onConnectionChange(networkContext, true); } + @Test + void addConnectionListenerIsIdempotent() { + connectionManager.addListener(listener1); + connectionManager.addListener(listener1); + connectionManager.onConnectionChanged(true, networkContext, null); + verify(listener1).onConnectionChange(networkContext, true); + } + static class TestNetworkContext extends VanillaNetworkContext { } } \ No newline at end of file