diff --git a/broker/pom.xml b/broker/pom.xml
index 4986b57c2..7f1bd42e0 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -10,4 +10,134 @@
rocketmq-broker
+
+
+
+ io.grpc
+ grpc-netty-shaded
+ runtime
+
+
+ io.grpc
+ grpc-protobuf
+
+
+ io.grpc
+ grpc-services
+
+
+ io.grpc
+ grpc-stub
+
+
+ com.google.protobuf
+ protobuf-java-util
+ ${protobuf.version}
+
+
+ com.google.code.gson
+ gson
+ 2.10.1
+
+
+ com.google.guava
+ guava
+ 32.0.1-jre
+
+
+ com.google.j2objc
+ j2objc-annotations
+ 2.8
+
+
+ org.apache.tomcat
+ annotations-api
+ 6.0.53
+ provided
+
+
+ io.grpc
+ grpc-testing
+ test
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.4.0
+ test
+
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.1
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 1.4.1
+
+
+ enforce
+
+ enforce
+
+
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.2.0
+
+
+ generate-sources
+
+ add-source
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/broker/src/main/proto/controller.proto b/broker/src/main/proto/controller.proto
new file mode 100644
index 000000000..577fa322e
--- /dev/null
+++ b/broker/src/main/proto/controller.proto
@@ -0,0 +1,297 @@
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+
+package apache.rocketmq.controller.v1;
+
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.controller.v1";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+message Uuid {
+ int64 most_significant_bits = 1;
+ int64 least_significant_bits = 2;
+}
+
+// Define all error codes here
+enum Code {
+ OK = 0;
+ NOT_FOUND = 404;
+}
+
+message Status {
+ Code code = 1;
+ string message = 2;
+}
+
+// Aggregates common request headers, authentication, etc.
+message ControllerRequestContext {}
+
+message BrokerRegistrationRequest {
+ ControllerRequestContext context = 1;
+
+ int32 broker_id = 2;
+ string cluster_id = 3;
+
+ // To uniquely identify each instance of a specific broker ID. This field is designed to change in case of
+ // broker restarts.
+ Uuid fingerprint = 4;
+}
+
+message BrokerRegistrationReply {
+ Status status = 1;
+ int64 epoch = 2;
+}
+
+message BrokerUnregistrationRequest {
+ ControllerRequestContext context = 1;
+ int32 broker_id = 2;
+ string cluster_id = 3;
+}
+
+message BrokerUnregistrationReply {
+ Status status = 1;
+}
+
+message BrokerHeartbeatRequest {
+ int32 broker_id = 1;
+ int64 broker_epoch = 2;
+
+ // Flag whether this broker is going to shutdown shortly.
+ bool going_away = 3;
+}
+
+message BrokerHeartbeatReply {
+ Status status = 1;
+}
+
+message CreateTopicRequest {
+ ControllerRequestContext context = 1;
+ string topic = 2;
+ int32 count = 3;
+}
+
+message CreateTopicReply {
+ Status status = 1;
+ int64 topic_id = 2;
+}
+
+message DescribeTopicRequest {
+ ControllerRequestContext context = 1;
+ int64 topic_id = 2;
+ string topic_name = 3;
+}
+
+message DescribeTopicReply {
+ Status status = 1;
+ Topic topic = 2;
+}
+
+message ListTopicsRequest {
+ ControllerRequestContext context = 1;
+}
+
+message Topic {
+ string name = 1;
+ int64 topic_id = 2;
+ int32 count = 3;
+ repeated MessageQueueAssignment assignments = 4;
+ // Message queues that are in the progress of reassignment
+ repeated OngoingMessageQueueReassignment reassignments = 5;
+}
+
+// Use server streaming in case there are a large number of topics available.
+message ListTopicsReply {
+ Status status = 1;
+ Topic topic = 2;
+}
+
+message UpdateTopicRequest {
+ ControllerRequestContext context = 1;
+ // Required field
+ int64 topic_id = 2;
+
+ // No op if this field is absent
+ string name = 3;
+
+ // Effective if non-zero
+ int32 count = 4;
+}
+
+message UpdateTopicReply {
+ Status status = 1;
+ Topic topic = 2;
+}
+
+message DeleteTopicRequest {
+ ControllerRequestContext context = 1;
+ int64 topic_id = 2;
+}
+
+message DeleteTopicReply {
+ Status status = 1;
+}
+
+message MessageQueueAssignment {
+ int64 topic_id = 1;
+ int64 queue_id = 2;
+ int32 broker_id = 3;
+}
+
+message ListMessageQueueReassignmentsRequest {
+ ControllerRequestContext context = 1;
+ repeated string topics = 2;
+}
+
+message OngoingTopicReassignment {
+ string name = 1;
+ repeated OngoingMessageQueueReassignment queues = 2;
+}
+
+message OngoingMessageQueueReassignment {
+ int64 topic_id = 1;
+ int64 queue_id = 2;
+ int32 src_broker_id = 3;
+ int32 dst_broker_id = 4;
+}
+
+message ListMessageQueueReassignmentsReply {
+ Status status = 1;
+ repeated OngoingTopicReassignment reassignments = 2;
+}
+
+message ReassignMessageQueueRequest {
+ ControllerRequestContext context = 1;
+ int64 topic_id = 2;
+ int64 queue_id = 3;
+ int32 src_broker_id = 4;
+ int32 dst_broker_id = 5;
+}
+
+message ReassignMessageQueueReply {
+ Status status = 1;
+}
+
+message ListTopicMessageQueueAssignmentsRequest {
+ ControllerRequestContext context = 1;
+
+ // Use 'topic name' instead of 'topic_id' for the convenience of building toolset
+ string name = 2;
+}
+
+message ListTopicMessageQueueAssignmentsReply {
+ Status status = 1;
+ repeated MessageQueueAssignment assignments = 2;
+}
+
+message Range {
+ // Begin offset, inclusive.
+ int64 begin = 1;
+
+ // End offset of this range, exclusive.
+ int64 end = 2;
+}
+
+message OssSegment {
+ int64 segment_id = 1;
+ int64 topic_id = 2;
+ int64 queue_id = 3;
+ Range range = 4;
+
+ // Objective Storage Service path.
+ string path = 5;
+}
+
+message AddOssSegmentRequest {
+ ControllerRequestContext context = 1;
+ int32 broker_id = 2;
+ int64 broker_epoch = 3;
+ OssSegment segment = 4;
+
+ // Store timestamp of the last message. It, as a result, would be
+ // safe to determine lifespan of the segment through this field.
+ google.protobuf.Timestamp store_time = 5;
+}
+
+message AddOssSegmentReply {
+ Status status = 1;
+}
+
+message ListOssSegmentsCriteria {
+ // Omit if negative
+ int64 topic_id = 1;
+
+ // Omit if negative
+ int64 queue_id = 2;
+
+ // Accept if OSS segment store_time is prior to this timestamp
+ google.protobuf.Timestamp timestamp = 3;
+}
+
+message ListOssSegmentsRequest {
+ ControllerRequestContext context = 1;
+ ListOssSegmentsCriteria criteria = 2;
+}
+
+message ListOssSegmentsResponse {
+ Status status = 1;
+ repeated OssSegment segments = 2;
+}
+
+message DeleteOssSegmentRequest {
+ ControllerRequestContext context = 1;
+ int64 segment_id = 2;
+}
+
+message DeleteOssSegmentReply {
+ Status status = 1;
+}
+
+message CommitOffsetRequest {
+ ControllerRequestContext context = 1;
+ int64 group_id = 2;
+ int64 topic_id = 3;
+ int64 queue_id = 4;
+ int64 offset = 5;
+}
+
+message CommitOffsetReply {
+ Status status = 1;
+}
+
+service ControllerService {
+ rpc registerBroker(BrokerRegistrationRequest) returns (BrokerRegistrationReply) {}
+
+ rpc unregisterBroker(BrokerUnregistrationRequest) returns (BrokerUnregistrationReply) {}
+
+ rpc processBrokerHeartbeat(BrokerHeartbeatRequest) returns (BrokerHeartbeatReply) {}
+
+ rpc createTopic(CreateTopicRequest) returns (CreateTopicReply) {}
+
+ rpc describeTopic(DescribeTopicRequest) returns (DescribeTopicReply) {}
+
+ rpc listAllTopics(ListTopicsRequest) returns (stream ListTopicsReply) {}
+
+ rpc updateTopic(UpdateTopicRequest) returns (Topic) {}
+
+ rpc deleteTopic(DeleteTopicRequest) returns (DeleteTopicReply) {}
+
+ rpc listTopicMessageQueues(ListTopicMessageQueueAssignmentsRequest) returns (ListTopicMessageQueueAssignmentsReply) {}
+
+ // Reassign message queue from one broker to another.
+ rpc reassignMessageQueue(ReassignMessageQueueRequest) returns (ReassignMessageQueueReply) {}
+
+ // List ongoing message queue reassignments.
+ rpc listMessageQueueReassignments(ListMessageQueueReassignmentsRequest) returns (ListMessageQueueReassignmentsReply) {}
+
+ rpc addOssSegment(AddOssSegmentRequest) returns (AddOssSegmentReply) {}
+
+ rpc listOssSegments(ListOssSegmentsRequest) returns (ListOssSegmentsResponse) {}
+
+ rpc deleteOssSegment(DeleteOssSegmentRequest) returns (DeleteOssSegmentReply) {}
+
+ rpc commitOffset(CommitOffsetRequest) returns (CommitOffsetReply) {}
+}
+
diff --git a/pom.xml b/pom.xml
index 0aebba49a..c03f62778 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,6 +23,20 @@
17
17
UTF-8
+ 1.58.0
+ 3.24.0
+ 3.24.0
+
+
+
+ io.grpc
+ grpc-bom
+ ${grpc.version}
+ pom
+ import
+
+
+
\ No newline at end of file