Skip to content

Commit

Permalink
Adding the EnableLmqStats option allows monitoring of LMQ statistics …
Browse files Browse the repository at this point in the history
…at runtime (#8973)
  • Loading branch information
RongtongJin authored Nov 22, 2024
1 parent e876bed commit 715dd5a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public BrokerController(
this.messageStoreConfig = messageStoreConfig;
this.authConfig = authConfig;
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) {
this.configStorage = new ConfigStorage(messageStoreConfig.getStorePathRootDir());
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ public class BrokerConfig extends BrokerIdentity {

private boolean appendCkAsync = false;


private boolean enableLmqStats = false;

/**
* V2 is recommended in cases where LMQ feature is extensively used.
*/
Expand Down Expand Up @@ -1905,6 +1908,14 @@ public void setAppendCkAsync(boolean appendCkAsync) {
this.appendCkAsync = appendCkAsync;
}

public boolean isEnableLmqStats() {
return enableLmqStats;
}

public void setEnableLmqStats(boolean enableLmqStats) {
this.enableLmqStats = enableLmqStats;
}

public String getConfigManagerVersion() {
return configManagerVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@
*/
package org.apache.rocketmq.store.stats;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;

public class LmqBrokerStatsManager extends BrokerStatsManager {

public LmqBrokerStatsManager(String clusterName, boolean enableQueueStat) {
super(clusterName, enableQueueStat);
private final BrokerConfig brokerConfig;

public LmqBrokerStatsManager(BrokerConfig brokerConfig) {
super(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat());
this.brokerConfig = brokerConfig;
}

@Override
public void incGroupGetNums(final String group, final String topic, final int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.incGroupGetNums(lmqGroup, lmqTopic, incValue);
}
Expand All @@ -41,25 +47,28 @@ public void incGroupGetNums(final String group, final String topic, final int in
public void incGroupGetSize(final String group, final String topic, final int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.incGroupGetSize(lmqGroup, lmqTopic, incValue);
}


@Override
public void incGroupAckNums(final String group, final String topic, final int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.incGroupAckNums(lmqGroup, lmqTopic, incValue);
}
Expand All @@ -68,11 +77,13 @@ public void incGroupAckNums(final String group, final String topic, final int in
public void incGroupCkNums(final String group, final String topic, final int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.incGroupCkNums(lmqGroup, lmqTopic, incValue);
}
Expand All @@ -81,11 +92,13 @@ public void incGroupCkNums(final String group, final String topic, final int inc
public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.incGroupGetLatency(lmqGroup, lmqTopic, queueId, incValue);
}
Expand All @@ -94,11 +107,13 @@ public void incGroupGetLatency(final String group, final String topic, final int
public void incSendBackNums(final String group, final String topic) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.incSendBackNums(lmqGroup, lmqTopic);
}
Expand All @@ -107,11 +122,13 @@ public void incSendBackNums(final String group, final String topic) {
public double tpsGroupGetNums(final String group, final String topic) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
return super.tpsGroupGetNums(lmqGroup, lmqTopic);
}
Expand All @@ -121,11 +138,13 @@ public void recordDiskFallBehindTime(final String group, final String topic, fin
final long fallBehind) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.recordDiskFallBehindTime(lmqGroup, lmqTopic, queueId, fallBehind);
}
Expand All @@ -135,11 +154,13 @@ public void recordDiskFallBehindSize(final String group, final String topic, fin
final long fallBehind) {
String lmqGroup = group;
String lmqTopic = topic;
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
if (!brokerConfig.isEnableLmqStats()) {
if (MixAll.isLmq(group)) {
lmqGroup = MixAll.LMQ_PREFIX;
}
if (MixAll.isLmq(topic)) {
lmqTopic = MixAll.LMQ_PREFIX;
}
}
super.recordDiskFallBehindSize(lmqGroup, lmqTopic, queueId, fallBehind);
}
Expand Down

0 comments on commit 715dd5a

Please sign in to comment.