Skip to content

Commit

Permalink
feat(proxy): limit client consumption mode (#524)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Oct 31, 2023
1 parent ed826cb commit 088a897
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.automq.rocketmq.proxy.remoting;

import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
Expand Down Expand Up @@ -45,9 +47,10 @@ public static RemotingCommand clientNotSupportedResponse(RemotingCommand request

/**
* Build a response command with the given response code, the opaque of the request, and the given custom header.
* @param request The request command.
*
* @param request The request command.
* @param responseCode The response code.
* @param classHeader The class of the custom header.
* @param classHeader The class of the custom header.
* @return The response command.
*/
public static RemotingCommand buildResponseCommand(RemotingCommand request, int responseCode,
Expand All @@ -60,7 +63,7 @@ public static RemotingCommand buildResponseCommand(RemotingCommand request, int
/**
* Build a response command with the given response code, and the opaque of the request.
*
* @param request The request command.
* @param request The request command.
* @param responseCode The response code.
* @return The response command.
*/
Expand All @@ -69,4 +72,8 @@ public static RemotingCommand buildResponseCommand(RemotingCommand request, int
response.setOpaque(request.getOpaque());
return response;
}

public static boolean isRemotingProtocol(ProxyContext ctx) {
return ChannelProtocolType.REMOTING.getName().equals(ctx.getProtocolType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.model.FlatMessageExt;
Expand Down Expand Up @@ -386,6 +387,10 @@ public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMess
throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
}

if (group.getSubMode() != SubscriptionMode.SUB_MODE_POP) {
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("Group [%s] do not support pop mode.", group.getName())));
}

topicReference.set(topic);
consumerGroupReference.set(group);
return null;
Expand Down Expand Up @@ -489,7 +494,7 @@ public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, Addressable
QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<ConsumerGroup> consumeGroupFuture = metadataService.consumerGroupOf(requestHeader.getConsumerGroup());
CompletableFuture<Topic> topicFuture = metadataService.topicOf(requestHeader.getTopic());
// TODO: distinguish different offset management between pop pattern and push pattern, now only query offset in pop pattern.
VirtualQueue virtualQueue = new VirtualQueue(messageQueue);

// TODO:Implement the bellow logic in the next iteration.
// If the consumer doesn't have offset record on the specified queue:
Expand All @@ -499,7 +504,13 @@ public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, Addressable
.thenCompose(pair -> {
ConsumerGroup consumerGroup = pair.getLeft();
Topic topic = pair.getRight();
return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), requestHeader.getQueueId());
if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) {
return metadataService.consumerOffsetOf(
pair.getLeft().getGroupId(),
pair.getRight().getTopicId(),
virtualQueue.physicalQueueId());
}
return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId());
});
}

Expand All @@ -510,12 +521,18 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, Addressabl
CompletableFuture<Topic> topicFuture = metadataService.topicOf(requestHeader.getTopic());
VirtualQueue virtualQueue = new VirtualQueue(messageQueue);

// TODO: distinguish different offset management between pop pattern and push pattern, now only reset offset in pop pattern.
// TODO: support retry topic.
return consumeGroupFuture.thenCombine(topicFuture, Pair::of)
.thenCompose(pair -> {
ConsumerGroup consumerGroup = pair.getLeft();
Topic topic = pair.getRight();
if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) {
return metadataService.updateConsumerOffset(
consumerGroup.getGroupId(),
topic.getTopicId(),
virtualQueue.physicalQueueId(),
requestHeader.getCommitOffset())
.thenApply(nil -> new ResetConsumeOffsetResult(ResetConsumeOffsetResult.Status.SUCCESS));
}
return store.resetConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset());
}).thenAccept(resetConsumeOffsetResult -> {
if (resetConsumeOffsetResult.status() != ResetConsumeOffsetResult.Status.SUCCESS) {
Expand Down Expand Up @@ -559,10 +576,24 @@ public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, AddressableMe
.thenCompose(pair -> {
Topic topic = pair.getLeft();
ConsumerGroup group = pair.getRight();
if (topic.getTopicId() != virtualQueue.topicId()) {
LOGGER.error("Topic id in request header {} does not match topic id in message queue {}, maybe the topic is recreated.",
topic.getTopicId(), virtualQueue.topicId());
throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
}

if (group.getSubMode() != SubscriptionMode.SUB_MODE_PULL) {
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("Group [%s] do not support pull mode.", group.getName())));
}

topicReference.set(topic);
consumerGroupReference.set(group);

// Update the consumer offset if the commit offset is specified.
if (requestHeader.getCommitOffset() != null && requestHeader.getCommitOffset() >= 0) {
metadataService.updateConsumerOffset(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset());
}

return store.pull(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false);
})
.thenCompose(result -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@ protected void cleanExpiredRequest() {

@Override
public void run() {
waitForRunning(100);

while (!stopped) {
waitForRunning(100);
try {
cleanExpiredRequest();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.model.VirtualQueue;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import java.time.Duration;
Expand All @@ -43,7 +44,6 @@
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
Expand Down Expand Up @@ -92,7 +92,7 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List<Address>

ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());
boolean isGrpc = isGrpcProtocol(ctx);
boolean isGrpc = !RemotingUtil.isRemotingProtocol(ctx);

for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData();
Expand Down Expand Up @@ -173,7 +173,7 @@ private List<MessageQueueAssignment> assignmentsOf(ProxyContext ctx, String topi
.exceptionallyCompose(ex -> {
if (ExceptionUtils.getRealException(ex) instanceof ControllerException controllerException) {
// If pull retry topic does not exist.
boolean isRemoting = !isGrpcProtocol(ctx);
boolean isRemoting = RemotingUtil.isRemotingProtocol(ctx);
if (controllerException.getErrorCode() == Code.NOT_FOUND.getNumber()
&& isRemoting && topicName.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
CreateTopicRequest request = CreateTopicRequest.newBuilder()
Expand Down Expand Up @@ -207,14 +207,6 @@ private List<MessageQueueAssignment> assignmentsOf(ProxyContext ctx, String topi
return assignmentList;
}

private boolean isGrpcProtocol(ProxyContext ctx) {
if (ChannelProtocolType.GRPC_V2.getName().equals(ctx.getProtocolType()) ||
ChannelProtocolType.GRPC_V1.getName().equals(ctx.getProtocolType())) {
return true;
}
return false;
}

// Define a filter for the MessageQueueAssignment class.
interface QueueFilter {
QueueFilter ALL = nodeId -> true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import apache.rocketmq.controller.v1.MessageQueue;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import java.util.HashMap;
Expand Down Expand Up @@ -77,7 +78,13 @@ public CompletableFuture<String> addressOf(int brokerId) {
@Override
public CompletableFuture<ConsumerGroup> consumerGroupOf(String groupName) {
long groupId = 8;
return CompletableFuture.completedFuture(ConsumerGroup.newBuilder().setName(groupName).setGroupId(groupId).build());
return CompletableFuture.completedFuture(
ConsumerGroup.newBuilder()
.setName(groupName)
.setGroupId(groupId)
.setMaxDeliveryAttempt(16)
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void offset() {
queryConsumerOffsetRequestHeader.setConsumerGroup("group");
queryConsumerOffsetRequestHeader.setTopic("topic");
queryConsumerOffsetRequestHeader.setQueueId(0);
Long offset = messageService.queryConsumerOffset(ProxyContextExt.create(), null, queryConsumerOffsetRequestHeader, 0).join();
Long offset = messageService.queryConsumerOffset(ProxyContextExt.create(), messageQueue, queryConsumerOffsetRequestHeader, 0).join();
assertEquals(100L, offset);
}
}

0 comments on commit 088a897

Please sign in to comment.