Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8970] Remove redundant heartbeats #8971

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f79cbe4
Adding null does not update
weihubeats Jun 8, 2023
0d4e93e
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Jul 26, 2023
b621456
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Aug 17, 2023
35a7d4c
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Oct 16, 2023
199999e
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Oct 18, 2023
5fbbed2
merge
weihubeats Feb 27, 2024
f52d41e
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Mar 20, 2024
11f0867
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Apr 19, 2024
c930b08
Merge remote-tracking branch 'refs/remotes/apache_master/develop' int…
weihubeats May 22, 2024
9eb378e
rolling back
weihubeats May 22, 2024
842fec8
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Jun 11, 2024
1eba5d6
Merge remote-tracking branch 'refs/remotes/apache_master/develop' int…
weihubeats Jun 27, 2024
acb4ede
Merge remote-tracking branch 'refs/remotes/apache_master/develop' int…
weihubeats Jul 3, 2024
db1ae3f
Merge remote-tracking branch 'upstream/develop' into develop
weihubeats Jul 8, 2024
5b6637e
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Sep 2, 2024
215a82b
Merge remote-tracking branch 'weihubeats_master/develop' into weihube…
weihubeats Sep 2, 2024
21f2a71
Merge remote-tracking branch 'refs/remotes/apache_master/develop' int…
weihubeats Oct 22, 2024
40a90b4
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
weihubeats Nov 20, 2024
dfe56df
Remove redundant heartbeats
weihubeats Nov 21, 2024
39ef20b
Adding Connection Records
weihubeats Nov 22, 2024
a60c62e
ignore tryLock fail
weihubeats Nov 27, 2024
2bed031
ignore tryLock fail
weihubeats Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl.factory;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
Expand Down Expand Up @@ -66,6 +66,8 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
Expand All @@ -83,8 +85,6 @@
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import static org.apache.rocketmq.remoting.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;

Expand Down Expand Up @@ -157,7 +157,9 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
ChannelEventListener channelEventListener;
if (clientConfig.isEnableHeartbeatChannelEventListener()) {
channelEventListener = new ChannelEventListener() {

private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;

@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
Expand All @@ -182,7 +184,7 @@ public void onChannelActive(String remoteAddr, Channel channel) {
if (addr.equals(remoteAddr)) {
long id = entry.getKey();
String brokerName = addressEntry.getKey();
if (sendHeartbeatToBroker(id, brokerName, addr)) {
if (sendHeartbeatToBroker(id, brokerName, addr, false)) {
rebalanceImmediately();
}
break;
Expand Down Expand Up @@ -591,6 +593,18 @@ private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
}

public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) {
return sendHeartbeatToBroker(id, brokerName, addr, true);
}

/**
* @param id
* @param brokerName
* @param addr
* @param strictLockMode When the connection is initially established, sending a heartbeat will simultaneously trigger the onChannelActive event to acquire the lock again, causing an exception. Therefore,
* the exception that occurs when sending the heartbeat during the initial onChannelActive event can be ignored.
* @return
*/
public boolean sendHeartbeatToBroker(long id, String brokerName, String addr, boolean strictLockMode) {
if (this.lockHeartbeat.tryLock()) {
final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
Expand All @@ -615,7 +629,9 @@ public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
if (strictLockMode) {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
return false;
}
Expand Down
Loading