Skip to content

Commit

Permalink
修复存储用户消息错误问题
Browse files Browse the repository at this point in the history
  • Loading branch information
imndx committed Jan 5, 2022
1 parent c7dbf45 commit 546110f
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -914,20 +914,24 @@ boolean loadRemoteMessagesFromTable(String user, WFCMessage.Conversation convers
return false;
}

void persistUserMessage(final String userId, final long messageId, final long messageSeq, final int messageContentType) {
void persistUserMessage(final String userId, final long messageId, final long messageSeq, int type, String target, int line, boolean directing, final int messageContentType) {
mScheduler.execute(()->{
Connection connection = null;
PreparedStatement statement = null;
try {
connection = DBUtil.getConnection();

String sql = "insert into " + getUserMessageTable(userId) + " (`_mid`, `_uid`, `_seq`, `_cont_type`) values(?, ?, ?, ?)";
String sql = "insert into " + getUserMessageTable(userId) + " (`_mid`, `_uid`, `_seq`, `_type`, `_target`, `_line`, `_directing`, `_cont_type`) values(?, ?, ?, ?, ?, ?, ?, ?)";

statement = connection.prepareStatement(sql);
int index = 1;
statement.setLong(index++, messageId);
statement.setString(index++, userId);
statement.setLong(index++, messageSeq);
statement.setInt(index++, type);
statement.setString(index++, target);
statement.setInt(index++, line);
statement.setInt(index++, directing ? 1 :0);
statement.setInt(index++, messageContentType);
int count = statement.executeUpdate();
LOG.info("Update rows {}", count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ public WFCMessage.PullMessageResult fetchChatroomMessage(String fromUser, String
}

@Override
public long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String user, long messageId) {
public long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String user, long messageId, boolean directing) {
// messageId是全局的,messageSeq是跟个人相关的,理论上messageId的增长数度远远大于seq。
// 考虑到一种情况,当服务器发生变化,用户发生迁移后,messageSeq还需要保持有序。 要么把Seq持久化,要么在迁移后Seq取一个肯定比以前更大的数字(这个数字就是messageId)
// 这里选择使用后面一种情况
Expand Down Expand Up @@ -906,7 +906,7 @@ public long insertUserMessages(String sender, int conversationType, String targe
mWriteLock.unlock();
}

databaseStore.persistUserMessage(user, messageId, messageSeq, messageContentType);
databaseStore.persistUserMessage(user, messageId, messageSeq, conversationType, target, line, directing, messageContentType);
return messageSeq;
}

Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/spi/IMessagesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public String toString() {
Set<String> getAllEnds();
WFCMessage.PullMessageResult fetchMessage(String user, String exceptClientId, long fromMessageId, int pullType);
WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count, Collection<Integer> contentTypes);
long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String userId, long messageId);
long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String userId, long messageId, boolean directing);
WFCMessage.GroupInfo createGroup(String operator, WFCMessage.GroupInfo groupInfo, List<WFCMessage.GroupMember> memberList, String memberExtra, boolean isAdmin);
ErrorCode addGroupMembers(String operator, boolean isAdmin, String groupId, List<WFCMessage.GroupMember> memberList, String extra);
ErrorCode kickoffGroupMembers(String operator, boolean isAdmin, String groupId, List<String> memberList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void publish2ChatroomReceiversDirectly(String user, String clientId, long
}
}

private void publish2Receivers(String sender, int conversationType, String target, int line, long messageId, Collection<String> receivers, String pushContent, String pushData, String exceptClientId, int pullType, int messageContentType, long serverTime, int mentionType, List<String> mentionTargets, int persistFlag) {
private void publish2Receivers(String sender, int conversationType, String target, int line, long messageId, Collection<String> receivers, String pushContent, String pushData, String exceptClientId, int pullType, int messageContentType, long serverTime, int mentionType, List<String> mentionTargets, int persistFlag, boolean directing) {
if (persistFlag == Transparent) {
publishTransparentMessage2Receivers(messageId, receivers, pullType, exceptClientId);
return;
Expand Down Expand Up @@ -191,7 +191,7 @@ private void publish2Receivers(String sender, int conversationType, String targe
}
long messageSeq;
if (pullType != ProtoConstants.PullType.Pull_ChatRoom) {
messageSeq = m_messagesStore.insertUserMessages(sender, conversationType, target, line, messageContentType, user, messageId);
messageSeq = m_messagesStore.insertUserMessages(sender, conversationType, target, line, messageContentType, user, messageId, directing);
} else {
messageSeq = m_messagesStore.insertChatroomMessages(user, line, messageId);
}
Expand Down Expand Up @@ -646,7 +646,7 @@ public void publish2Receivers(WFCMessage.Message message, Set<String> receivers,
message.getConversation().getType(), message.getConversation().getTarget(), message.getConversation().getLine(),
messageId,
receivers,
pushContent, message.getContent().getPushData(), exceptClientId, pullType, message.getContent().getType(), message.getServerTimestamp(), message.getContent().getMentionedType(), message.getContent().getMentionedTargetList(), message.getContent().getPersistFlag());
pushContent, message.getContent().getPushData(), exceptClientId, pullType, message.getContent().getType(), message.getServerTimestamp(), message.getContent().getMentionedType(), message.getContent().getMentionedTargetList(), message.getContent().getPersistFlag(), message.hasToUser() || !message.getToList().isEmpty());

}

Expand Down

0 comments on commit 546110f

Please sign in to comment.