Skip to content

Commit

Permalink
重构黑名单,修改代码
Browse files Browse the repository at this point in the history
  • Loading branch information
heavyrain2012 committed Nov 18, 2019
1 parent 029b6ed commit c1b9a81
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 24 deletions.
2 changes: 2 additions & 0 deletions broker/migrate/h2/V16__add_friend_blocked.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table t_friend add column `_blacked` tinyint DEFAULT 0;
update t_friend set `_blacked` = 1 where `_state` = 2;
2 changes: 2 additions & 0 deletions broker/migrate/mysql/V21__add_friend_blocked.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table t_friend add column `_blacked` tinyint DEFAULT 0 COMMENT "0, normal; 1, blacked";
update t_friend set `_blacked` = 1 where `_state` = 2;
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public boolean isTransactionAction() {
public boolean action(Request request, Response response) {
if (request.getNettyRequest() instanceof FullHttpRequest) {
InputGetFriendList inputGetFriendList = getRequestBody(request.getNettyRequest(), InputGetFriendList.class);
List<FriendData> dataList = messagesStore.getFriendList(inputGetFriendList.getUserId(), 0);
List<FriendData> dataList = messagesStore.getFriendList(inputGetFriendList.getUserId(), null, 0);
List<String> list = new ArrayList<>();
for (FriendData data : dataList) {
if (data.getState() == inputGetFriendList.getStatus()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ public class FriendData implements Serializable {
private String friendUid;
private String alias;
private int state;
private int blacked;
private long timestamp;


public FriendData(String userId, String friendUid, String alias, int state, long timestamp) {
public FriendData(String userId, String friendUid, String alias, int state, int blacked, long timestamp) {
this.userId = userId;
this.friendUid = friendUid;
this.alias = alias;
this.state = state;
this.blacked = blacked;
this.timestamp = timestamp;
}

Expand Down Expand Up @@ -61,6 +63,14 @@ public void setState(int state) {
this.state = state;
}

public int getBlacked() {
return blacked;
}

public void setBlacked(int blacked) {
this.blacked = blacked;
}

public long getTimestamp() {
return timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class FriendPullHandler extends IMHandler<WFCMessage.Version> {
@Override
public ErrorCode action(ByteBuf ackPayload, String clientID, String fromUser, boolean isAdmin, WFCMessage.Version request, Qos1PublishHandler.IMCallback callback) {
List<FriendData> friendDatas = m_messagesStore.getFriendList(fromUser, request.getVersion());
List<FriendData> friendDatas = m_messagesStore.getFriendList(fromUser, clientID, request.getVersion());
WFCMessage.GetFriendsResult.Builder builder = WFCMessage.GetFriendsResult.newBuilder();
for (FriendData data : friendDatas
) {
Expand Down
17 changes: 12 additions & 5 deletions broker/src/main/java/io/moquette/persistence/DatabaseStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ void reloadFriendsFromDB(HazelcastInstance hzInstance) {

try {
connection = DBUtil.getConnection();
String sql = "select `_uid`, `_friend_uid`, `_alias`, `_state`, `_dt` from t_friend";
String sql = "select `_uid`, `_friend_uid`, `_alias`, `_state`, `_blacked`, `_dt` from t_friend";
statement = connection.prepareStatement(sql);

int index;
Expand All @@ -375,6 +375,9 @@ void reloadFriendsFromDB(HazelcastInstance hzInstance) {
int intvalue = rs.getInt(index++);
builder.setState(intvalue);

intvalue = rs.getInt(index++);
builder.setBlacked(intvalue);

long longvalue = rs.getLong(index++);
builder.setTimestamp(longvalue);

Expand Down Expand Up @@ -2139,7 +2142,7 @@ List<FriendData> getPersistFriends(String userId) {
ResultSet rs = null;
try {
connection = DBUtil.getConnection();
String sql = "select `_friend_uid`, `_alias`, `_state`, `_dt` from t_friend where `_uid` = ?";
String sql = "select `_friend_uid`, `_alias`, `_state`, `_blacked`, `_dt` from t_friend where `_uid` = ?";
statement = connection.prepareStatement(sql);


Expand All @@ -2153,9 +2156,10 @@ List<FriendData> getPersistFriends(String userId) {
String uid = rs.getString(1);
String alias = rs.getString(2);
int state = rs.getInt(3);
long timestamp = rs.getLong(4);
int blacked = rs.getInt(4);
long timestamp = rs.getLong(5);

FriendData data = new FriendData(userId, uid, alias, state, timestamp);
FriendData data = new FriendData(userId, uid, alias, state, blacked, timestamp);
out.add(data);
}
return out;
Expand Down Expand Up @@ -2305,10 +2309,11 @@ void persistOrUpdateFriendData(final FriendData request) {
PreparedStatement statement = null;
try {
connection = DBUtil.getConnection();
String sql = "insert into t_friend (`_uid`, `_friend_uid`, `_alias`, `_state`, `_dt`) values(?, ?, ?, ?, ?)" +
String sql = "insert into t_friend (`_uid`, `_friend_uid`, `_alias`, `_state`, `_blacked`, `_dt`) values(?, ?, ?, ?, ?, ?)" +
" ON DUPLICATE KEY UPDATE " +
"`_alias` = ?," +
"`_state` = ?," +
"`_blacked` = ?," +
"`_dt` = ?";


Expand All @@ -2318,9 +2323,11 @@ void persistOrUpdateFriendData(final FriendData request) {
statement.setString(index++, request.getFriendUid());
statement.setString(index++, request.getAlias());
statement.setInt(index++, request.getState());
statement.setInt(index++, request.getBlacked());
statement.setLong(index++, request.getTimestamp());
statement.setString(index++, request.getAlias());
statement.setInt(index++, request.getState());
statement.setInt(index++, request.getBlacked());
statement.setLong(index++, request.getTimestamp());
int count = statement.executeUpdate();
LOG.info("Update rows {}", count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.security.Tokenor;
import io.moquette.spi.impl.subscriptions.Topic;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import win.liyufan.im.*;
Expand Down Expand Up @@ -1652,7 +1653,7 @@ synchronized Collection<WFCMessage.FriendRequest> loadFriendRequest(MultiMap<Str
}

@Override
public List<FriendData> getFriendList(String userId, long version) {
public List<FriendData> getFriendList(String userId, String clientId, long version) {
List<FriendData> out = new ArrayList<FriendData>();

HazelcastInstance hzInstance = m_Server.getHazelcastInstance();
Expand All @@ -1662,9 +1663,26 @@ public List<FriendData> getFriendList(String userId, long version) {
friends = loadFriend(friendsMap, userId);
}

boolean needFriendMigrate = false;

if (!StringUtil.isNullOrEmpty(clientId)) {
MemorySessionStore.Session session = m_Server.getStore().sessionsStore().getSession(clientId);
if(session != null && session.getMqttVersion() != null && session.getMqttVersion().protocolLevel() < MqttVersion.Wildfire_2.protocolLevel()) {
needFriendMigrate = true;
}
}

for (FriendData friend : friends) {
if (friend.getTimestamp() > version) {
out.add(friend);
if (needFriendMigrate) {
if (friend.getBlacked() > 0) {
friend.setState(2);
}
out.add(friend);
} else {
out.add(friend);
}

}
}

Expand Down Expand Up @@ -1719,6 +1737,24 @@ public ErrorCode saveAddFriendRequest(String userId, WFCMessage.AddFriendRequest
return ErrorCode.ERROR_CODE_FRIEND_ALREADY_REQUEST;
}
}

MultiMap<String, FriendData> friendsMap = hzInstance.getMultiMap(USER_FRIENDS);

FriendData friendData1 = null;
Collection<FriendData> friendDatas = friendsMap.get(userId);
if (friendDatas == null || friendDatas.size() == 0) {
friendDatas = loadFriend(friendsMap, userId);
}
for (FriendData fd : friendDatas) {
if (fd.getFriendUid().equals(request.getTargetUid())) {
friendData1 = fd;
break;
}
}
if (friendData1 != null && friendData1.getBlacked() > 0) {
return ErrorCode.ERROR_CODE_IN_BLACK_LIST;
}

WFCMessage.FriendRequest newRequest = WFCMessage.FriendRequest
.newBuilder()
.setFromUid(userId)
Expand Down Expand Up @@ -1770,7 +1806,7 @@ public ErrorCode handleFriendRequest(String userId, WFCMessage.HandleFriendReque
}
}
if (friendData1 == null) {
friendData1 = new FriendData(userId, request.getTargetUid(), "", request.getStatus(), System.currentTimeMillis());
friendData1 = new FriendData(userId, request.getTargetUid(), "", request.getStatus(), 0, System.currentTimeMillis());
} else {
friendData1.setState(request.getStatus());
friendData1.setTimestamp(System.currentTimeMillis());
Expand All @@ -1792,7 +1828,7 @@ public ErrorCode handleFriendRequest(String userId, WFCMessage.HandleFriendReque
}
}
if (friendData2 == null) {
friendData2 = new FriendData(request.getTargetUid(), userId, "", request.getStatus(), friendData1.getTimestamp());
friendData2 = new FriendData(request.getTargetUid(), userId, "", request.getStatus(), 0, friendData1.getTimestamp());
} else {
friendsMap.remove(request.getTargetUid(), friendData2);
friendData2.setState(request.getStatus());
Expand Down Expand Up @@ -1833,10 +1869,10 @@ public ErrorCode handleFriendRequest(String userId, WFCMessage.HandleFriendReque
databaseStore.persistOrUpdateFriendRequest(existRequest);
MultiMap<String, FriendData> friendsMap = hzInstance.getMultiMap(USER_FRIENDS);

FriendData friendData1 = new FriendData(userId, request.getTargetUid(), "", 0, System.currentTimeMillis());
FriendData friendData1 = new FriendData(userId, request.getTargetUid(), "", 0, 0, System.currentTimeMillis());
databaseStore.persistOrUpdateFriendData(friendData1);

FriendData friendData2 = new FriendData(request.getTargetUid(), userId, "", 0, friendData1.getTimestamp());
FriendData friendData2 = new FriendData(request.getTargetUid(), userId, "", 0, 0, friendData1.getTimestamp());
databaseStore.persistOrUpdateFriendData(friendData2);

requestMap.remove(userId);
Expand All @@ -1858,6 +1894,9 @@ public ErrorCode handleFriendRequest(String userId, WFCMessage.HandleFriendReque

@Override
public ErrorCode blackUserRequest(String fromUser, String targetUserId, int state, long[] heads) {
if (state == 2) {
state = 1;
}
HazelcastInstance hzInstance = m_Server.getHazelcastInstance();
MultiMap<String, FriendData> friendsMap = hzInstance.getMultiMap(USER_FRIENDS);

Expand All @@ -1875,9 +1914,9 @@ public ErrorCode blackUserRequest(String fromUser, String targetUserId, int stat
}

if (friendData == null) {
friendData = new FriendData(fromUser, targetUserId, "", state, System.currentTimeMillis());
friendData = new FriendData(fromUser, targetUserId, "", 1, state, System.currentTimeMillis());
}
friendData.setState(state);
friendData.setBlacked(state);
friendData.setTimestamp(System.currentTimeMillis());


Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/spi/IMessagesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public String toString() {
ErrorCode verifyToken(String userId, String token, List<String> serverIPs, List<Integer> ports);
ErrorCode login(String name, String password, List<String> userIdRet);

List<FriendData> getFriendList(String userId, long version);
List<FriendData> getFriendList(String userId, String clientId, long version);
List<WFCMessage.FriendRequest> getFriendRequestList(String userId, long version);

ErrorCode saveAddFriendRequest(String userId, WFCMessage.AddFriendRequest request, long[] head);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,8 @@ public void processConnect(Channel channel, MqttConnectMessage msg) {
String clientId = payload.clientIdentifier();
LOG.info("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());

if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
&& msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()
&& msg.variableHeader().version() != MqttVersion.Wildfire_1.protocolLevel()) {
if (msg.variableHeader().version() <= MqttVersion.MQTT_3_1_1.protocolLevel() ||
msg.variableHeader().version() >= MqttVersion.Wildfire_Max.protocolLevel()) {
MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);

LOG.error("MQTT protocol version is not valid. CId={}", clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void imHandler(String clientID, String fromUser, String topic, byte[] payloadCon
} else {
MemorySessionStore.Session session = m_sessionStore.getSession(clientID);
if (session != null && session.getUsername().equals(fromUser)) {
if (data.length > 7*1024 && session.getMqttVersion() == MqttVersion.Wildfire_1) {
if (data.length > 7*1024 && session.getMqttVersion().protocolLevel() >= MqttVersion.Wildfire_1.protocolLevel()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static boolean isValidClientId(MqttVersion mqttVersion, String clientId) {
return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
clientId.length() <= MAX_CLIENT_ID_LENGTH;
}
if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.Wildfire_1) {
if (mqttVersion.protocolLevel() >= MqttVersion.MQTT_3_1_1.protocolLevel()) {
// In 3.1.3.1 Client Identifier of MQTT 3.1.1 specification, The Server MAY allow ClientId’s
// that contain more than 23 encoded bytes. And, The Server MAY allow zero-length ClientId.
return clientId != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(
final int willQos = (b1 & 0x18) >> 3;
final boolean willFlag = (b1 & 0x04) == 0x04;
final boolean cleanSession = (b1 & 0x02) == 0x02;
if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.Wildfire_1) {
if (mqttVersion.protocolLevel() >= MqttVersion.MQTT_3_1_1.protocolLevel()) {
final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
if (!zeroReservedFlag) {
// MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
public enum MqttVersion {
MQTT_3_1("MQIsdp", (byte) 3),
MQTT_3_1_1("MQTT", (byte) 4),
Wildfire_1("MQTT", (byte) 5);
Wildfire_1("MQTT", (byte) 5),
Wildfire_2("MQTT", (byte) 6),
Wildfire_Max("MQTT", (byte) 7);

private final String name;
private final byte level;
Expand Down

0 comments on commit c1b9a81

Please sign in to comment.