Skip to content

Commit

Permalink
获取远程消息支持按照类型来获取
Browse files Browse the repository at this point in the history
  • Loading branch information
heavyrain2012 committed Nov 23, 2021
1 parent 7824fb3 commit 92c59d2
Show file tree
Hide file tree
Showing 7 changed files with 452 additions and 21 deletions.
1 change: 1 addition & 0 deletions broker/migrate/h2/V40__add_user_messages_cont_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table `t_user_messages` add column `_cont_type` int(11) NOT NULL DEFAULT '0';
257 changes: 257 additions & 0 deletions broker/migrate/mysql/V45__add_user_messages_cont_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
alter table `t_user_messages` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_0` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_1` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_2` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_3` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_4` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_5` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_6` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_7` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_8` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_9` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_10` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_11` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_12` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_13` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_14` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_15` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_16` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_17` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_18` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_19` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_20` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_21` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_22` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_23` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_24` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_25` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_26` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_27` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_28` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_29` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_30` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_31` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_32` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_33` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_34` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_35` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_36` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_37` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_38` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_39` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_40` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_41` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_42` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_43` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_44` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_45` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_46` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_47` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_48` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_49` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_50` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_51` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_52` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_53` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_54` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_55` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_56` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_57` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_58` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_59` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_60` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_61` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_62` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_63` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_64` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_65` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_66` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_67` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_68` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_69` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_70` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_71` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_72` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_73` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_74` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_75` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_76` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_77` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_78` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_79` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_80` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_81` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_82` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_83` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_84` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_85` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_86` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_87` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_88` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_89` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_90` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_91` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_92` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_93` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_94` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_95` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_96` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_97` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_98` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_99` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_100` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_101` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_102` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_103` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_104` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_105` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_106` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_107` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_108` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_109` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_110` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_111` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_112` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_113` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_114` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_115` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_116` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_117` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_118` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_119` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_120` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_121` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_122` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_123` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_124` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_125` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_126` add column `_cont_type` int(11) NOT NULL DEFAULT '0';

alter table `t_user_messages_127` add column `_cont_type` int(11) NOT NULL DEFAULT '0';
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ErrorCode action(ByteBuf ackPayload, String clientID, String fromUser, bo
}
}

WFCMessage.PullMessageResult result = m_messagesStore.loadRemoteMessages(fromUser, request.getConversation(), beforeUid, request.getCount());
WFCMessage.PullMessageResult result = m_messagesStore.loadRemoteMessages(fromUser, request.getConversation(), beforeUid, request.getCount(), request.getContentTypeList());
byte[] data = result.toByteArray();
LOG.info("User {} load message with count({}), payload size({})", fromUser, result.getMessageCount(), data.length);
ackPayload.ensureWritable(data.length).writeBytes(data);
Expand Down
31 changes: 23 additions & 8 deletions broker/src/main/java/io/moquette/persistence/DatabaseStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -784,13 +784,13 @@ void deleteMessage(long messageId) {
}
}

List<WFCMessage.Message> loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count) {
List<WFCMessage.Message> loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count, Collection<Integer> contentTypes) {
List<WFCMessage.Message> messages = new ArrayList<>();
long[] before = new long[1];
before[0] = beforeUid;
boolean hasMore = loadRemoteMessagesFromTable(user, conversation, before, count, MessageShardingUtil.getMessageTable(beforeUid), messages);
boolean hasMore = loadRemoteMessagesFromTable(user, conversation, before, count, MessageShardingUtil.getMessageTable(beforeUid), messages, contentTypes);
while (messages.size() < count && hasMore) {
hasMore = loadRemoteMessagesFromTable(user, conversation, before, count - messages.size(), MessageShardingUtil.getMessageTable(beforeUid), messages);
hasMore = loadRemoteMessagesFromTable(user, conversation, before, count - messages.size(), MessageShardingUtil.getMessageTable(beforeUid), messages, contentTypes);
}

int month = 0;
Expand All @@ -800,7 +800,7 @@ List<WFCMessage.Message> loadRemoteMessages(String user, WFCMessage.Conversation
int size = messages.size();
hasMore = true;
while (size == messages.size() && hasMore) {
hasMore = loadRemoteMessagesFromTable(user, conversation, before, count - messages.size(), nexTable, messages);
hasMore = loadRemoteMessagesFromTable(user, conversation, before, count - messages.size(), nexTable, messages, contentTypes);
}

if (size < messages.size()) {
Expand All @@ -811,7 +811,7 @@ List<WFCMessage.Message> loadRemoteMessages(String user, WFCMessage.Conversation
return messages;
}

boolean loadRemoteMessagesFromTable(String user, WFCMessage.Conversation conversation, long[] before, int count, String table, List<WFCMessage.Message> messages) {
boolean loadRemoteMessagesFromTable(String user, WFCMessage.Conversation conversation, long[] before, int count, String table, List<WFCMessage.Message> messages, Collection<Integer> contentTypes) {
long beforeUid = before[0];
String sql = "select `_mid`, `_from`, `_type`, `_target`, `_line`, `_data`, `_dt`, `_to` from " + table +" where";
if (conversation.getType() == ProtoConstants.ConversationType.ConversationType_Private) {
Expand All @@ -820,7 +820,21 @@ boolean loadRemoteMessagesFromTable(String user, WFCMessage.Conversation convers
sql += " _type = ? and _line = ? and _mid < ? and _target = ?";
}

sql += "order by `_mid` DESC limit ?";
if(contentTypes != null && !contentTypes.isEmpty()) {
sql += " and _content_type in (";
boolean first = true;
for (int i:contentTypes) {
if(first) {
first = false;
} else {
sql += ",";
}
sql += i;
}
sql += ")";
}

sql += " order by `_mid` DESC limit ?";

Connection connection = null;
PreparedStatement statement = null;
Expand Down Expand Up @@ -893,20 +907,21 @@ boolean loadRemoteMessagesFromTable(String user, WFCMessage.Conversation convers
return false;
}

void persistUserMessage(final String userId, final long messageId, final long messageSeq) {
void persistUserMessage(final String userId, final long messageId, final long messageSeq, final int messageContentType) {
mScheduler.execute(()->{
Connection connection = null;
PreparedStatement statement = null;
try {
connection = DBUtil.getConnection();

String sql = "insert into " + getUserMessageTable(userId) + " (`_mid`, `_uid`, `_seq`) values(?, ?, ?)";
String sql = "insert into " + getUserMessageTable(userId) + " (`_mid`, `_uid`, `_seq`, `_cont_type`) values(?, ?, ?, ?)";

statement = connection.prepareStatement(sql);
int index = 1;
statement.setLong(index++, messageId);
statement.setString(index++, userId);
statement.setLong(index++, messageSeq);
statement.setInt(index++, messageContentType);
int count = statement.executeUpdate();
LOG.info("Update rows {}", count);
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ public WFCMessage.PullMessageResult fetchMessage(String user, String exceptClien
}

@Override
public WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count) {
public WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count, Collection<Integer> contentTypes) {
WFCMessage.PullMessageResult.Builder builder = WFCMessage.PullMessageResult.newBuilder();
List<WFCMessage.Message> messages;
boolean loadMessage = IS_MESSAGE_REMOTE_HISTORY_MESSAGE;
Expand All @@ -741,7 +741,7 @@ public WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.C
}

if (loadMessage) {
messages = databaseStore.loadRemoteMessages(user, conversation, beforeUid, count);
messages = databaseStore.loadRemoteMessages(user, conversation, beforeUid, count, contentTypes);
} else {
messages = new ArrayList<>();
}
Expand Down Expand Up @@ -871,7 +871,7 @@ public long insertUserMessages(String sender, int conversationType, String targe
mWriteLock.unlock();
}

databaseStore.persistUserMessage(user, messageId, messageSeq);
databaseStore.persistUserMessage(user, messageId, messageSeq, messageContentType);
return messageSeq;
}

Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/spi/IMessagesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public String toString() {
int getNotifyReceivers(String fromUser, WFCMessage.Message.Builder message, Set<String> notifyReceivers);
Set<String> getAllEnds();
WFCMessage.PullMessageResult fetchMessage(String user, String exceptClientId, long fromMessageId, int pullType);
WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count);
WFCMessage.PullMessageResult loadRemoteMessages(String user, WFCMessage.Conversation conversation, long beforeUid, int count, Collection<Integer> contentTypes);
long insertUserMessages(String sender, int conversationType, String target, int line, int messageContentType, String userId, long messageId);
WFCMessage.GroupInfo createGroup(String operator, WFCMessage.GroupInfo groupInfo, List<WFCMessage.GroupMember> memberList, String memberExtra, boolean isAdmin);
ErrorCode addGroupMembers(String operator, boolean isAdmin, String groupId, List<WFCMessage.GroupMember> memberList, String extra);
Expand Down
Loading

0 comments on commit 92c59d2

Please sign in to comment.