Skip to content

Commit

Permalink
feat: Cluster description shall include a summary section (#488)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 27, 2023
1 parent 715352a commit 8381020
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 7 deletions.
27 changes: 24 additions & 3 deletions cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.cli;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.ClusterSummary;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.Node;
import apache.rocketmq.controller.v1.OngoingMessageQueueReassignment;
Expand Down Expand Up @@ -53,9 +54,30 @@ public static void printCluster(Cluster cluster) {
return;
}

CWC_LongestLine cwc = new CWC_LongestLine();

// Cluster Summary
AsciiTable summary = new AsciiTable();
summary.addRule();
AT_Row row = summary.addRow(null, null, null, null, "CLUSTER SUMMARY");
alignCentral(row);
summary.addRule();
row = summary.addRow("NODE QUANTITY", "TOPIC QUANTITY", "QUEUE QUANTITY", "STREAM QUANTITY",
"GROUP QUANTITY");
alignCentral(row);
ClusterSummary cs = cluster.getSummary();
row = summary.addRow(cs.getNodeQuantity(), cs.getTopicQuantity(), cs.getQueueQuantity(),
cs.getStreamQuantity(), cs.getGroupQuantity());
alignCentral(row);
summary.addRule();
summary.getRenderer().setCWC(cwc);
String render = summary.render();
System.out.println(render);

// Nodes List
AsciiTable nodeTable = new AsciiTable();
nodeTable.addRule();
AT_Row row = nodeTable.addRow("NODE ID", "NODE NAME", "TOPIC QUANTITY", "QUEUE QUANTITY",
row = nodeTable.addRow("NODE ID", "NODE NAME", "TOPIC QUANTITY", "QUEUE QUANTITY",
"STREAM QUANTITY", "LAST HEARTBEAT", "ROLE", "EPOCH", "EXPIRATION");

alignCentral(row);
Expand All @@ -70,9 +92,8 @@ public static void printCluster(Cluster cluster) {
}
nodeTable.addRule();

CWC_LongestLine cwc = new CWC_LongestLine();
nodeTable.getRenderer().setCWC(cwc);
String render = nodeTable.render();
render = nodeTable.render();
System.out.println(render);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import apache.rocketmq.controller.v1.AssignmentStatus;
import apache.rocketmq.controller.v1.CloseStreamRequest;
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.ClusterSummary;
import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateTopicRequest;
Expand Down Expand Up @@ -219,14 +220,23 @@ public CompletableFuture<Cluster> describeCluster(DescribeClusterRequest request
.setEpoch(lease.getEpoch())
.setNodeId(lease.getNodeId())
.setExpirationTimestamp(toTimestamp(lease.getExpirationTime())).build());

builder.setSummary(ClusterSummary.newBuilder()
.setNodeQuantity(nodes.size())
.setTopicQuantity(topicManager.topicQuantity())
.setQueueQuantity(topicManager.queueQuantity())
.setStreamQuantity(topicManager.streamQuantity())
.setGroupQuantity(groupManager.groupCache.groupQuantity())
.build());

for (Map.Entry<Integer, BrokerNode> entry : nodes.entrySet()) {
BrokerNode brokerNode = entry.getValue();
apache.rocketmq.controller.v1.Node node = apache.rocketmq.controller.v1.Node.newBuilder()
.setId(entry.getKey())
.setName(brokerNode.getNode().getName())
.setLastHeartbeat(toTimestamp(brokerNode.lastKeepAliveTime(config)))
.setTopicNum(topicManager.topicNumOfNode(entry.getKey()))
.setQueueNum(topicManager.topicNumOfNode(entry.getKey()))
.setQueueNum(topicManager.queueNumOfNode(entry.getKey()))
.setStreamNum(topicManager.streamNumOfNode(entry.getKey()))
.setGoingAway(brokerNode.isGoingAway())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public TopicManager(MetadataStore metadataStore) {
this.topicNameRequests = new ConcurrentHashMap<>();
}

public int topicQuantity() {
return topicCache.topicQuantity();
}

public int queueQuantity() {
return assignmentCache.queueQuantity();
}

public int streamQuantity() {
return streamCache.streamQuantity();
}

public int topicNumOfNode(int nodeId) {
return assignmentCache.topicNumOfNode(nodeId);
}
Expand All @@ -100,7 +112,6 @@ public int streamNumOfNode(int nodeId) {
return streamCache.streamNumOfNode(nodeId);
}


public CompletableFuture<Long> createTopic(CreateTopicRequest request) {
CompletableFuture<Long> future = new CompletableFuture<>();
for (; ; ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ public int queueNumOfNode(int nodeId) {
}
return count;
}

public int queueQuantity() {
return assignments.values().stream().map(Map::size).reduce(0, Integer::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ private void refresh(Group group) {
}
}

public int groupQuantity() {
return groups.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ private void cacheItem(Stream stream) {
streams.put(stream.getId(), stream);
}
}

public int streamQuantity() {
return streams.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ private void cacheItem(Topic topic) {
}
}
}

public int topicQuantity() {
return topics.size();
}
}
27 changes: 25 additions & 2 deletions proto/src/main/proto/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,30 @@ message Node {
google.protobuf.Timestamp last_heartbeat = 7;
}

message ClusterSummary {
// Total quantity of topics
int32 topic_quantity = 1;

// Total quantity of queues
int32 queue_quantity = 2;

// Total quantity of streams
int32 stream_quantity = 3;

// Total quantity of nodes
int32 node_quantity = 4;

// Total quantity of groups
int32 group_quantity = 5;
}

message Cluster {
Lease lease = 1;
repeated Node nodes = 2;
// Overall cluster summary
ClusterSummary summary = 1;

// Cluster leadership lease
Lease lease = 2;

// List of cluster nodes
repeated Node nodes = 3;
}

0 comments on commit 8381020

Please sign in to comment.