From f79cbe4d24d9f6c2eee4ed9314f92f53c226d91e Mon Sep 17 00:00:00 2001 From: weihu Date: Thu, 8 Jun 2023 11:36:20 +0800 Subject: [PATCH 1/6] Adding null does not update --- .../apache/rocketmq/broker/controller/ReplicasManager.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index abae7cdb01a..6543db66f49 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -32,7 +32,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.out.BrokerOuterAPI; @@ -801,7 +801,10 @@ private void scanAvailableControllerAddresses() { private void updateControllerAddr() { if (brokerConfig.isFetchControllerAddrByDnsLookup()) { - this.controllerAddresses = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr()); + List addrs = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr()); + if (CollectionUtils.isNotEmpty(addrs)) { + this.controllerAddresses = addrs; + } } else { final String controllerPaths = this.brokerConfig.getControllerAddr(); final String[] controllers = controllerPaths.split(";"); From 9eb378eb6655c588ed316534c9b85946ac2309eb Mon Sep 17 00:00:00 2001 From: weihu Date: Wed, 22 May 2024 16:06:21 +0800 Subject: [PATCH 2/6] rolling back --- .../apache/rocketmq/broker/controller/ReplicasManager.java | 7 ++----- .../java/org/apache/rocketmq/common/stats/StatsItem.java | 1 - distribution/bin/os.sh | 2 +- .../java/org/apache/rocketmq/example/simple/Producer.java | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 1c8e8988030..a1d711cb275 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -30,7 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.apache.commons.collections.CollectionUtils; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.out.BrokerOuterAPI; @@ -803,10 +803,7 @@ private void scanAvailableControllerAddresses() { private void updateControllerAddr() { if (brokerConfig.isFetchControllerAddrByDnsLookup()) { - List addrs = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr()); - if (CollectionUtils.isNotEmpty(addrs)) { - this.controllerAddresses = addrs; - } + this.controllerAddresses = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr()); } else { final String controllerPaths = this.brokerConfig.getControllerAddr(); final String[] controllers = controllerPaths.split(";"); diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index 1d2ac32a218..8307c20aa68 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.common.stats; -import java.util.Collections; import java.util.LinkedList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; diff --git a/distribution/bin/os.sh b/distribution/bin/os.sh index 1c9c717b2b2..1a8a6cdfbb2 100644 --- a/distribution/bin/os.sh +++ b/distribution/bin/os.sh @@ -50,7 +50,7 @@ sysctl vm.max_map_count sysctl vm.dirty_background_ratio sysctl vm.dirty_ratio sysctl vm.dirty_writeback_centisecs -sysctl vm.page-clusterH +sysctl vm.page-cluster sysctl vm.swappiness su - admin -c 'ulimit -n' diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index e3475528662..920d481b939 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -34,7 +34,7 @@ public static void main(String[] args) throws MQClientException, InterruptedExce DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); // Uncomment the following line while debugging, namesrvAddr should be set to your local address - producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); + //producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); producer.start(); for (int i = 0; i < 128; i++) { From dfe56dff1abd2d6c28fe017d39ad00ab6509fecf Mon Sep 17 00:00:00 2001 From: weihu Date: Thu, 21 Nov 2024 19:24:10 +0800 Subject: [PATCH 3/6] Remove redundant heartbeats --- .../client/impl/factory/MQClientInstance.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index ad0676d091c..e3eeed5af38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -176,19 +176,6 @@ public void onChannelIdle(String remoteAddr, Channel channel) { @Override public void onChannelActive(String remoteAddr, Channel channel) { - for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { - for (Map.Entry entry : addressEntry.getValue().entrySet()) { - String addr = entry.getValue(); - if (addr.equals(remoteAddr)) { - long id = entry.getKey(); - String brokerName = addressEntry.getKey(); - if (sendHeartbeatToBroker(id, brokerName, addr)) { - rebalanceImmediately(); - } - break; - } - } - } } }; } else { From 39ef20bc3cabf5984d73fd304f29e16df54ae8a8 Mon Sep 17 00:00:00 2001 From: weihu Date: Fri, 22 Nov 2024 11:38:34 +0800 Subject: [PATCH 4/6] Adding Connection Records --- .../client/impl/factory/MQClientInstance.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index e3eeed5af38..80ebea62c82 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.impl.factory; +import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import java.util.Collections; import java.util.HashMap; @@ -24,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -35,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; @@ -66,6 +67,8 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.HeartbeatV2Result; @@ -83,8 +86,6 @@ import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import static org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic; @@ -158,6 +159,10 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli if (clientConfig.isEnableHeartbeatChannelEventListener()) { channelEventListener = new ChannelEventListener() { private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; + + private final ConcurrentMap brokerConnectionIsFirstTable = new ConcurrentHashMap<>(); + + @Override public void onChannelConnect(String remoteAddr, Channel channel) { } @@ -176,6 +181,25 @@ public void onChannelIdle(String remoteAddr, Channel channel) { @Override public void onChannelActive(String remoteAddr, Channel channel) { + + if (Objects.isNull(brokerConnectionIsFirstTable.get(remoteAddr))) { + brokerConnectionIsFirstTable.put(remoteAddr, Boolean.TRUE); + } else { + for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { + for (Map.Entry entry : addressEntry.getValue().entrySet()) { + String addr = entry.getValue(); + if (addr.equals(remoteAddr)) { + long id = entry.getKey(); + String brokerName = addressEntry.getKey(); + if (sendHeartbeatToBroker(id, brokerName, addr)) { + rebalanceImmediately(); + } + break; + } + } + } + } + } }; } else { From a60c62e4b443841f2dce585220f0ab01363ff57a Mon Sep 17 00:00:00 2001 From: weihu Date: Wed, 27 Nov 2024 14:16:46 +0800 Subject: [PATCH 5/6] ignore tryLock fail --- .../client/impl/factory/MQClientInstance.java | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 80ebea62c82..4163c9162bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -160,9 +160,6 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli channelEventListener = new ChannelEventListener() { private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; - private final ConcurrentMap brokerConnectionIsFirstTable = new ConcurrentHashMap<>(); - - @Override public void onChannelConnect(String remoteAddr, Channel channel) { } @@ -181,25 +178,16 @@ public void onChannelIdle(String remoteAddr, Channel channel) { @Override public void onChannelActive(String remoteAddr, Channel channel) { - - if (Objects.isNull(brokerConnectionIsFirstTable.get(remoteAddr))) { - brokerConnectionIsFirstTable.put(remoteAddr, Boolean.TRUE); - } else { - for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { - for (Map.Entry entry : addressEntry.getValue().entrySet()) { - String addr = entry.getValue(); - if (addr.equals(remoteAddr)) { - long id = entry.getKey(); - String brokerName = addressEntry.getKey(); - if (sendHeartbeatToBroker(id, brokerName, addr)) { - rebalanceImmediately(); - } - break; + + brokerAddrTable.forEach((brokerName, addrMap) -> { + addrMap.forEach((brokerId, addr) -> { + if (Objects.equals(addr, remoteAddr)) { + if (sendHeartbeatToBroker(brokerId, brokerName, addr, false)) { + rebalanceImmediately(); } } - } - } - + }); + }); } }; } else { @@ -602,6 +590,18 @@ private boolean isBrokerAddrExistInTopicRouteTable(final String addr) { } public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) { + return sendHeartbeatToBroker(id, brokerName, addr, true); + } + + /** + * @param id + * @param brokerName + * @param addr + * @param strictLockMode When the connection is initially established, sending a heartbeat will simultaneously trigger the onChannelActive event to acquire the lock again, causing an exception. Therefore, + * the exception that occurs when sending the heartbeat during the initial onChannelActive event can be ignored. + * @return + */ + public boolean sendHeartbeatToBroker(long id, String brokerName, String addr, boolean strictLockMode) { if (this.lockHeartbeat.tryLock()) { final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false); final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty(); @@ -626,7 +626,9 @@ public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) { this.lockHeartbeat.unlock(); } } else { - log.warn("lock heartBeat, but failed. [{}]", this.clientId); + if (strictLockMode) { + log.warn("lock heartBeat, but failed. [{}]", this.clientId); + } } return false; } From 2bed031e0bcbcea0a28f8e7d8dd738e3352e2b5a Mon Sep 17 00:00:00 2001 From: weihu Date: Wed, 27 Nov 2024 14:22:00 +0800 Subject: [PATCH 6/6] ignore tryLock fail --- .../client/impl/factory/MQClientInstance.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 4163c9162bb..82b5fb8cbb2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -158,6 +157,7 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli ChannelEventListener channelEventListener; if (clientConfig.isEnableHeartbeatChannelEventListener()) { channelEventListener = new ChannelEventListener() { + private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; @Override @@ -178,16 +178,19 @@ public void onChannelIdle(String remoteAddr, Channel channel) { @Override public void onChannelActive(String remoteAddr, Channel channel) { - - brokerAddrTable.forEach((brokerName, addrMap) -> { - addrMap.forEach((brokerId, addr) -> { - if (Objects.equals(addr, remoteAddr)) { - if (sendHeartbeatToBroker(brokerId, brokerName, addr, false)) { + for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { + for (Map.Entry entry : addressEntry.getValue().entrySet()) { + String addr = entry.getValue(); + if (addr.equals(remoteAddr)) { + long id = entry.getKey(); + String brokerName = addressEntry.getKey(); + if (sendHeartbeatToBroker(id, brokerName, addr, false)) { rebalanceImmediately(); } + break; } - }); - }); + } + } } }; } else {