Skip to content

Commit

Permalink
Adding Connection Records
Browse files Browse the repository at this point in the history
  • Loading branch information
weihubeats committed Nov 22, 2024
1 parent dfe56df commit 39ef20b
Showing 1 changed file with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl.factory;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -24,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,7 +37,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
Expand Down Expand Up @@ -66,6 +67,8 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
Expand All @@ -83,8 +86,6 @@
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import static org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;

Expand Down Expand Up @@ -158,6 +159,10 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
if (clientConfig.isEnableHeartbeatChannelEventListener()) {
channelEventListener = new ChannelEventListener() {
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;

private final ConcurrentMap<String, Boolean> brokerConnectionIsFirstTable = new ConcurrentHashMap<>();


@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
Expand All @@ -176,6 +181,25 @@ public void onChannelIdle(String remoteAddr, Channel channel) {

@Override
public void onChannelActive(String remoteAddr, Channel channel) {

if (Objects.isNull(brokerConnectionIsFirstTable.get(remoteAddr))) {
brokerConnectionIsFirstTable.put(remoteAddr, Boolean.TRUE);
} else {
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (Map.Entry<Long, String> entry : addressEntry.getValue().entrySet()) {
String addr = entry.getValue();
if (addr.equals(remoteAddr)) {
long id = entry.getKey();
String brokerName = addressEntry.getKey();
if (sendHeartbeatToBroker(id, brokerName, addr)) {
rebalanceImmediately();
}
break;
}
}
}
}

}
};
} else {
Expand Down

0 comments on commit 39ef20b

Please sign in to comment.