diff --git a/broker/src/main/java/io/moquette/persistence/DatabaseStore.java b/broker/src/main/java/io/moquette/persistence/DatabaseStore.java index f557b2d72..07818d4e5 100644 --- a/broker/src/main/java/io/moquette/persistence/DatabaseStore.java +++ b/broker/src/main/java/io/moquette/persistence/DatabaseStore.java @@ -914,20 +914,24 @@ boolean loadRemoteMessagesFromTable(String user, WFCMessage.Conversation convers return false; } - void persistUserMessage(final String userId, final long messageId, final long messageSeq, final int messageContentType) { + void persistUserMessage(final String userId, final long messageId, final long messageSeq, int type, String target, int line, boolean directing, final int messageContentType) { mScheduler.execute(()->{ Connection connection = null; PreparedStatement statement = null; try { connection = DBUtil.getConnection(); - String sql = "insert into " + getUserMessageTable(userId) + " (`_mid`, `_uid`, `_seq`, `_cont_type`) values(?, ?, ?, ?)"; + String sql = "insert into " + getUserMessageTable(userId) + " (`_mid`, `_uid`, `_seq`, `_type`, `_target`, `_line`, `_directing`, `_cont_type`) values(?, ?, ?, ?, ?, ?, ?, ?)"; statement = connection.prepareStatement(sql); int index = 1; statement.setLong(index++, messageId); statement.setString(index++, userId); statement.setLong(index++, messageSeq); + statement.setInt(index++, type); + statement.setString(index++, target); + statement.setInt(index++, line); + statement.setInt(index++, directing ? 1 :0); statement.setInt(index++, messageContentType); int count = statement.executeUpdate(); LOG.info("Update rows {}", count); diff --git a/broker/src/main/java/io/moquette/persistence/MemoryMessagesStore.java b/broker/src/main/java/io/moquette/persistence/MemoryMessagesStore.java index b25fefb0e..43e2af885 100755 --- a/broker/src/main/java/io/moquette/persistence/MemoryMessagesStore.java +++ b/broker/src/main/java/io/moquette/persistence/MemoryMessagesStore.java @@ -866,7 +866,7 @@ public WFCMessage.PullMessageResult fetchChatroomMessage(String fromUser, String } @Override - public long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String user, long messageId) { + public long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String user, long messageId, boolean directing) { // messageId是全局的,messageSeq是跟个人相关的,理论上messageId的增长数度远远大于seq。 // 考虑到一种情况,当服务器发生变化,用户发生迁移后,messageSeq还需要保持有序。 要么把Seq持久化,要么在迁移后Seq取一个肯定比以前更大的数字(这个数字就是messageId) // 这里选择使用后面一种情况 @@ -906,7 +906,7 @@ public long insertUserMessages(String sender, int conversationType, String targe mWriteLock.unlock(); } - databaseStore.persistUserMessage(user, messageId, messageSeq, messageContentType); + databaseStore.persistUserMessage(user, messageId, messageSeq, conversationType, target, line, directing, messageContentType); return messageSeq; } diff --git a/broker/src/main/java/io/moquette/spi/IMessagesStore.java b/broker/src/main/java/io/moquette/spi/IMessagesStore.java index 8cc493e8d..6043d28b8 100755 --- a/broker/src/main/java/io/moquette/spi/IMessagesStore.java +++ b/broker/src/main/java/io/moquette/spi/IMessagesStore.java @@ -106,7 +106,7 @@ public String toString() { Set getAllEnds(); WFCMessage.PullMessageResult fetchMessage(String user, String exceptClientId, long fromMessageId, int pullType); WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count, Collection contentTypes); - long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String userId, long messageId); + long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String userId, long messageId, boolean directing); WFCMessage.GroupInfo createGroup(String operator, WFCMessage.GroupInfo groupInfo, List memberList, String memberExtra, boolean isAdmin); ErrorCode addGroupMembers(String operator, boolean isAdmin, String groupId, List memberList, String extra); ErrorCode kickoffGroupMembers(String operator, boolean isAdmin, String groupId, List memberList); diff --git a/broker/src/main/java/io/moquette/spi/impl/MessagesPublisher.java b/broker/src/main/java/io/moquette/spi/impl/MessagesPublisher.java index 53aec0626..d197409cd 100755 --- a/broker/src/main/java/io/moquette/spi/impl/MessagesPublisher.java +++ b/broker/src/main/java/io/moquette/spi/impl/MessagesPublisher.java @@ -160,7 +160,7 @@ public void publish2ChatroomReceiversDirectly(String user, String clientId, long } } - private void publish2Receivers(String sender, int conversationType, String target, int line, long messageId, Collection receivers, String pushContent, String pushData, String exceptClientId, int pullType, int messageContentType, long serverTime, int mentionType, List mentionTargets, int persistFlag) { + private void publish2Receivers(String sender, int conversationType, String target, int line, long messageId, Collection receivers, String pushContent, String pushData, String exceptClientId, int pullType, int messageContentType, long serverTime, int mentionType, List mentionTargets, int persistFlag, boolean directing) { if (persistFlag == Transparent) { publishTransparentMessage2Receivers(messageId, receivers, pullType, exceptClientId); return; @@ -191,7 +191,7 @@ private void publish2Receivers(String sender, int conversationType, String targe } long messageSeq; if (pullType != ProtoConstants.PullType.Pull_ChatRoom) { - messageSeq = m_messagesStore.insertUserMessages(sender, conversationType, target, line, messageContentType, user, messageId); + messageSeq = m_messagesStore.insertUserMessages(sender, conversationType, target, line, messageContentType, user, messageId, directing); } else { messageSeq = m_messagesStore.insertChatroomMessages(user, line, messageId); } @@ -646,7 +646,7 @@ public void publish2Receivers(WFCMessage.Message message, Set receivers, message.getConversation().getType(), message.getConversation().getTarget(), message.getConversation().getLine(), messageId, receivers, - pushContent, message.getContent().getPushData(), exceptClientId, pullType, message.getContent().getType(), message.getServerTimestamp(), message.getContent().getMentionedType(), message.getContent().getMentionedTargetList(), message.getContent().getPersistFlag()); + pushContent, message.getContent().getPushData(), exceptClientId, pullType, message.getContent().getType(), message.getServerTimestamp(), message.getContent().getMentionedType(), message.getContent().getMentionedTargetList(), message.getContent().getPersistFlag(), message.hasToUser() || !message.getToList().isEmpty()); }