From 088a897b62015c718da9e538d40e87a060a420c2 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 31 Oct 2023 16:09:14 +0800 Subject: [PATCH] feat(proxy): limit client consumption mode (#524) Signed-off-by: SSpirits --- .../rocketmq/proxy/remoting/RemotingUtil.java | 13 +++++-- .../proxy/service/MessageServiceImpl.java | 39 +++++++++++++++++-- .../proxy/service/SuspendRequestService.java | 3 +- .../proxy/service/TopicRouteServiceImpl.java | 14 ++----- .../proxy/mock/MockProxyMetadataService.java | 9 ++++- .../proxy/service/MessageServiceImplTest.java | 2 +- 6 files changed, 58 insertions(+), 22 deletions(-) diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java index bb8ddc2d2..0e08c5333 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingUtil.java @@ -18,6 +18,8 @@ package com.automq.rocketmq.proxy.remoting; import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; @@ -45,9 +47,10 @@ public static RemotingCommand clientNotSupportedResponse(RemotingCommand request /** * Build a response command with the given response code, the opaque of the request, and the given custom header. - * @param request The request command. + * + * @param request The request command. * @param responseCode The response code. - * @param classHeader The class of the custom header. + * @param classHeader The class of the custom header. * @return The response command. */ public static RemotingCommand buildResponseCommand(RemotingCommand request, int responseCode, @@ -60,7 +63,7 @@ public static RemotingCommand buildResponseCommand(RemotingCommand request, int /** * Build a response command with the given response code, and the opaque of the request. * - * @param request The request command. + * @param request The request command. * @param responseCode The response code. * @return The response command. */ @@ -69,4 +72,8 @@ public static RemotingCommand buildResponseCommand(RemotingCommand request, int response.setOpaque(request.getOpaque()); return response; } + + public static boolean isRemotingProtocol(ProxyContext ctx) { + return ChannelProtocolType.REMOTING.getName().equals(ctx.getProtocolType()); + } } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java index b1bf56fa1..5cfae5a6a 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java @@ -19,6 +19,7 @@ import apache.rocketmq.controller.v1.Code; import apache.rocketmq.controller.v1.ConsumerGroup; +import apache.rocketmq.controller.v1.SubscriptionMode; import apache.rocketmq.controller.v1.Topic; import com.automq.rocketmq.common.config.ProxyConfig; import com.automq.rocketmq.common.model.FlatMessageExt; @@ -386,6 +387,10 @@ public CompletableFuture popMessage(ProxyContext ctx, AddressableMess throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist")); } + if (group.getSubMode() != SubscriptionMode.SUB_MODE_POP) { + throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("Group [%s] do not support pop mode.", group.getName()))); + } + topicReference.set(topic); consumerGroupReference.set(group); return null; @@ -489,7 +494,7 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, Addressable QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { CompletableFuture consumeGroupFuture = metadataService.consumerGroupOf(requestHeader.getConsumerGroup()); CompletableFuture topicFuture = metadataService.topicOf(requestHeader.getTopic()); - // TODO: distinguish different offset management between pop pattern and push pattern, now only query offset in pop pattern. + VirtualQueue virtualQueue = new VirtualQueue(messageQueue); // TODOļ¼šImplement the bellow logic in the next iteration. // If the consumer doesn't have offset record on the specified queue: @@ -499,7 +504,13 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, Addressable .thenCompose(pair -> { ConsumerGroup consumerGroup = pair.getLeft(); Topic topic = pair.getRight(); - return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), requestHeader.getQueueId()); + if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) { + return metadataService.consumerOffsetOf( + pair.getLeft().getGroupId(), + pair.getRight().getTopicId(), + virtualQueue.physicalQueueId()); + } + return store.getConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId()); }); } @@ -510,12 +521,18 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, Addressabl CompletableFuture topicFuture = metadataService.topicOf(requestHeader.getTopic()); VirtualQueue virtualQueue = new VirtualQueue(messageQueue); - // TODO: distinguish different offset management between pop pattern and push pattern, now only reset offset in pop pattern. - // TODO: support retry topic. return consumeGroupFuture.thenCombine(topicFuture, Pair::of) .thenCompose(pair -> { ConsumerGroup consumerGroup = pair.getLeft(); Topic topic = pair.getRight(); + if (consumerGroup.getSubMode() == SubscriptionMode.SUB_MODE_PULL) { + return metadataService.updateConsumerOffset( + consumerGroup.getGroupId(), + topic.getTopicId(), + virtualQueue.physicalQueueId(), + requestHeader.getCommitOffset()) + .thenApply(nil -> new ResetConsumeOffsetResult(ResetConsumeOffsetResult.Status.SUCCESS)); + } return store.resetConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset()); }).thenAccept(resetConsumeOffsetResult -> { if (resetConsumeOffsetResult.status() != ResetConsumeOffsetResult.Status.SUCCESS) { @@ -559,10 +576,24 @@ public CompletableFuture pullMessage(ProxyContext ctx, AddressableMe .thenCompose(pair -> { Topic topic = pair.getLeft(); ConsumerGroup group = pair.getRight(); + if (topic.getTopicId() != virtualQueue.topicId()) { + LOGGER.error("Topic id in request header {} does not match topic id in message queue {}, maybe the topic is recreated.", + topic.getTopicId(), virtualQueue.topicId()); + throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist")); + } + + if (group.getSubMode() != SubscriptionMode.SUB_MODE_PULL) { + throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("Group [%s] do not support pull mode.", group.getName()))); + } topicReference.set(topic); consumerGroupReference.set(group); + // Update the consumer offset if the commit offset is specified. + if (requestHeader.getCommitOffset() != null && requestHeader.getCommitOffset() >= 0) { + metadataService.updateConsumerOffset(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset()); + } + return store.pull(group.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), filter, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), false); }) .thenCompose(result -> { diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java index b5768703b..68be89776 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java @@ -220,9 +220,8 @@ protected void cleanExpiredRequest() { @Override public void run() { - waitForRunning(100); - while (!stopped) { + waitForRunning(100); try { cleanExpiredRequest(); } catch (Exception e) { diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/TopicRouteServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/TopicRouteServiceImpl.java index 0ce0ef3fa..99b18cb6a 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/TopicRouteServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/TopicRouteServiceImpl.java @@ -27,6 +27,7 @@ import com.automq.rocketmq.controller.exception.ControllerException; import com.automq.rocketmq.metadata.api.ProxyMetadataService; import com.automq.rocketmq.proxy.model.VirtualQueue; +import com.automq.rocketmq.proxy.remoting.RemotingUtil; import com.google.common.collect.Lists; import com.google.common.net.HostAndPort; import java.time.Duration; @@ -43,7 +44,6 @@ import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; -import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueView; import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; @@ -92,7 +92,7 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData(); proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas()); - boolean isGrpc = isGrpcProtocol(ctx); + boolean isGrpc = !RemotingUtil.isRemotingProtocol(ctx); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData(); @@ -173,7 +173,7 @@ private List assignmentsOf(ProxyContext ctx, String topi .exceptionallyCompose(ex -> { if (ExceptionUtils.getRealException(ex) instanceof ControllerException controllerException) { // If pull retry topic does not exist. - boolean isRemoting = !isGrpcProtocol(ctx); + boolean isRemoting = RemotingUtil.isRemotingProtocol(ctx); if (controllerException.getErrorCode() == Code.NOT_FOUND.getNumber() && isRemoting && topicName.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { CreateTopicRequest request = CreateTopicRequest.newBuilder() @@ -207,14 +207,6 @@ private List assignmentsOf(ProxyContext ctx, String topi return assignmentList; } - private boolean isGrpcProtocol(ProxyContext ctx) { - if (ChannelProtocolType.GRPC_V2.getName().equals(ctx.getProtocolType()) || - ChannelProtocolType.GRPC_V1.getName().equals(ctx.getProtocolType())) { - return true; - } - return false; - } - // Define a filter for the MessageQueueAssignment class. interface QueueFilter { QueueFilter ALL = nodeId -> true; diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockProxyMetadataService.java b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockProxyMetadataService.java index 366dfe960..8036f2ad0 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockProxyMetadataService.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockProxyMetadataService.java @@ -23,6 +23,7 @@ import apache.rocketmq.controller.v1.MessageQueue; import apache.rocketmq.controller.v1.MessageQueueAssignment; import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import apache.rocketmq.controller.v1.Topic; import com.automq.rocketmq.metadata.api.ProxyMetadataService; import java.util.HashMap; @@ -77,7 +78,13 @@ public CompletableFuture addressOf(int brokerId) { @Override public CompletableFuture consumerGroupOf(String groupName) { long groupId = 8; - return CompletableFuture.completedFuture(ConsumerGroup.newBuilder().setName(groupName).setGroupId(groupId).build()); + return CompletableFuture.completedFuture( + ConsumerGroup.newBuilder() + .setName(groupName) + .setGroupId(groupId) + .setMaxDeliveryAttempt(16) + .setSubMode(SubscriptionMode.SUB_MODE_POP) + .build()); } @Override diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java index 6990c4196..3bfd9e4bf 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java @@ -243,7 +243,7 @@ void offset() { queryConsumerOffsetRequestHeader.setConsumerGroup("group"); queryConsumerOffsetRequestHeader.setTopic("topic"); queryConsumerOffsetRequestHeader.setQueueId(0); - Long offset = messageService.queryConsumerOffset(ProxyContextExt.create(), null, queryConsumerOffsetRequestHeader, 0).join(); + Long offset = messageService.queryConsumerOffset(ProxyContextExt.create(), messageQueue, queryConsumerOffsetRequestHeader, 0).join(); assertEquals(100L, offset); } } \ No newline at end of file