diff --git a/broker/src/main/java/io/moquette/persistence/MemorySessionStore.java b/broker/src/main/java/io/moquette/persistence/MemorySessionStore.java index bd3e83e24..b9a3bac34 100755 --- a/broker/src/main/java/io/moquette/persistence/MemorySessionStore.java +++ b/broker/src/main/java/io/moquette/persistence/MemorySessionStore.java @@ -531,7 +531,7 @@ public Session sessionForClientAndUser(String username, String clientID) { if (session.getUsername().equals(username)) { return session; } else { - cleanSession(clientID); + cleanSession(null, clientID); } } return null; @@ -739,9 +739,9 @@ public ErrorCode kickoffPCClient(String operator, String pcClientId) { @Override - public void disableSession(String clientId) { + public void disableSession(String userId, String clientId) { Session session = sessions.get(clientId); - if (session != null && session.getDeleted() == 0) { + if (session != null && session.getDeleted() == 0 && (userId == null || session.getUsername().equals(userId))) { databaseStore.updateSessionDeleted(session.getUsername(), clientId, 1); ConcurrentSkipListSet<String> sessionSet = getUserSessionSet(session.username); sessionSet.remove(clientId); @@ -750,7 +750,7 @@ public void disableSession(String clientId) { } @Override - public void cleanSession(String clientID) { + public void cleanSession(String userId, String clientID) { LOG.info("Fooooooooo <{}>", clientID); Session session = sessions.get(clientID); @@ -758,6 +758,11 @@ public void cleanSession(String clientID) { LOG.error("Can't find the session for client <{}>", clientID); return; } + + if(userId != null && !session.getUsername().equals(userId)) { + return; + } + ConcurrentSkipListSet<String> sessionSet = getUserSessionSet(session.username); sessionSet.remove(clientID); diff --git a/broker/src/main/java/io/moquette/spi/ClientSession.java b/broker/src/main/java/io/moquette/spi/ClientSession.java index 653cddf20..22a48bfd5 100755 --- a/broker/src/main/java/io/moquette/spi/ClientSession.java +++ b/broker/src/main/java/io/moquette/spi/ClientSession.java @@ -109,19 +109,19 @@ public String toString() { } - public void disconnect(boolean cleanSession, boolean disableSession) { + public void disconnect(String userId, boolean cleanSession, boolean disableSession) { if (cleanSession) { LOG.info("Client disconnected. Removing its subscriptions. ClientId={}", clientID); // cleanup topic subscriptions - cleanSession(); + cleanSession(userId); } else if(disableSession) { LOG.info("Client disconnected. disable session. ClientId={}", clientID); - m_sessionsStore.disableSession(this.clientID); + m_sessionsStore.disableSession(userId, this.clientID); } } - public void cleanSession() { - m_sessionsStore.cleanSession(this.clientID); + public void cleanSession(String userId) { + m_sessionsStore.cleanSession(userId, this.clientID); } diff --git a/broker/src/main/java/io/moquette/spi/ISessionsStore.java b/broker/src/main/java/io/moquette/spi/ISessionsStore.java index 324d65000..11afe1d06 100755 --- a/broker/src/main/java/io/moquette/spi/ISessionsStore.java +++ b/broker/src/main/java/io/moquette/spi/ISessionsStore.java @@ -150,8 +150,8 @@ public interface ISessionsStore { */ int getSecondPhaseAckPendingMessages(String clientID); - void disableSession(String clientId); - void cleanSession(String clientID); + void disableSession(String userId, String clientId); + void cleanSession(String userId, String clientID); boolean isMultiEndpointSupported(); diff --git a/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java b/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java index f09217bf8..265466cd5 100755 --- a/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java +++ b/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java @@ -44,12 +44,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import win.liyufan.im.HttpUtils; +import win.liyufan.im.IMTopic; import win.liyufan.im.Utility; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static cn.wildfirechat.common.ErrorCode.ERROR_CODE_SUCCESS; import static io.moquette.spi.impl.InternalRepublisher.createPublishForQos; import static cn.wildfirechat.common.IMExceptionEvent.EventType.EVENT_CALLBACK_Exception; import static io.moquette.spi.impl.Utils.readBytesAndRewind; @@ -582,7 +584,7 @@ public void processDisconnect(Channel channel, boolean isDup, boolean isRetain) //disconnect the session - m_sessionsStore.sessionForClient(clientID).disconnect(isDup, isRetain); + m_sessionsStore.sessionForClient(clientID).disconnect(session.getUsername(), isDup, isRetain); } diff --git a/broker/src/main/java/win/liyufan/im/IMTopic.java b/broker/src/main/java/win/liyufan/im/IMTopic.java index da638b1b4..6d1cf98bb 100755 --- a/broker/src/main/java/win/liyufan/im/IMTopic.java +++ b/broker/src/main/java/win/liyufan/im/IMTopic.java @@ -80,4 +80,6 @@ public interface IMTopic { String LoadRemoteMessagesTopic = "LRM"; String KickoffPCClientTopic = "KPCC"; + + String ClearSessionTopic = "CST"; }