Skip to content

Commit

Permalink
优化disconnect功能
Browse files Browse the repository at this point in the history
  • Loading branch information
heavyrain2012 committed Oct 8, 2021
1 parent 4bcf784 commit 3d7220d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ public Session sessionForClientAndUser(String username, String clientID) {
if (session.getUsername().equals(username)) {
return session;
} else {
cleanSession(clientID);
cleanSession(null, clientID);
}
}
return null;
Expand Down Expand Up @@ -739,9 +739,9 @@ public ErrorCode kickoffPCClient(String operator, String pcClientId) {


@Override
public void disableSession(String clientId) {
public void disableSession(String userId, String clientId) {
Session session = sessions.get(clientId);
if (session != null && session.getDeleted() == 0) {
if (session != null && session.getDeleted() == 0 && (userId == null || session.getUsername().equals(userId))) {
databaseStore.updateSessionDeleted(session.getUsername(), clientId, 1);
ConcurrentSkipListSet<String> sessionSet = getUserSessionSet(session.username);
sessionSet.remove(clientId);
Expand All @@ -750,14 +750,19 @@ public void disableSession(String clientId) {
}

@Override
public void cleanSession(String clientID) {
public void cleanSession(String userId, String clientID) {
LOG.info("Fooooooooo <{}>", clientID);

Session session = sessions.get(clientID);
if (session == null) {
LOG.error("Can't find the session for client <{}>", clientID);
return;
}

if(userId != null && !session.getUsername().equals(userId)) {
return;
}

ConcurrentSkipListSet<String> sessionSet = getUserSessionSet(session.username);
sessionSet.remove(clientID);

Expand Down
10 changes: 5 additions & 5 deletions broker/src/main/java/io/moquette/spi/ClientSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ public String toString() {
}


public void disconnect(boolean cleanSession, boolean disableSession) {
public void disconnect(String userId, boolean cleanSession, boolean disableSession) {
if (cleanSession) {
LOG.info("Client disconnected. Removing its subscriptions. ClientId={}", clientID);
// cleanup topic subscriptions
cleanSession();
cleanSession(userId);
} else if(disableSession) {
LOG.info("Client disconnected. disable session. ClientId={}", clientID);
m_sessionsStore.disableSession(this.clientID);
m_sessionsStore.disableSession(userId, this.clientID);
}
}

public void cleanSession() {
m_sessionsStore.cleanSession(this.clientID);
public void cleanSession(String userId) {
m_sessionsStore.cleanSession(userId, this.clientID);
}


Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/spi/ISessionsStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public interface ISessionsStore {
*/
int getSecondPhaseAckPendingMessages(String clientID);

void disableSession(String clientId);
void cleanSession(String clientID);
void disableSession(String userId, String clientId);
void cleanSession(String userId, String clientID);

boolean isMultiEndpointSupported();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import win.liyufan.im.HttpUtils;
import win.liyufan.im.IMTopic;
import win.liyufan.im.Utility;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static cn.wildfirechat.common.ErrorCode.ERROR_CODE_SUCCESS;
import static io.moquette.spi.impl.InternalRepublisher.createPublishForQos;
import static cn.wildfirechat.common.IMExceptionEvent.EventType.EVENT_CALLBACK_Exception;
import static io.moquette.spi.impl.Utils.readBytesAndRewind;
Expand Down Expand Up @@ -582,7 +584,7 @@ public void processDisconnect(Channel channel, boolean isDup, boolean isRetain)

//disconnect the session

m_sessionsStore.sessionForClient(clientID).disconnect(isDup, isRetain);
m_sessionsStore.sessionForClient(clientID).disconnect(session.getUsername(), isDup, isRetain);
}


Expand Down
2 changes: 2 additions & 0 deletions broker/src/main/java/win/liyufan/im/IMTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ public interface IMTopic {

String LoadRemoteMessagesTopic = "LRM";
String KickoffPCClientTopic = "KPCC";

String ClearSessionTopic = "CST";
}

0 comments on commit 3d7220d

Please sign in to comment.