diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java index 2da0f01e7..899731cf8 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java @@ -619,7 +619,13 @@ public CompletableFuture getStream(long topicId, int queueId, Lo return CompletableFuture.supplyAsync(() -> { try (SqlSession session = openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, groupId).stream() + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .withGroupId(groupId) + .build(); + List streams = streamMapper.byCriteria(criteria) + .stream() .filter(stream -> stream.getStreamRole() == streamRole).toList(); if (streams.isEmpty()) { if (streamRole == StreamRole.STREAM_ROLE_RETRY) { @@ -1022,7 +1028,11 @@ public CompletableFuture> listOpenStreams(int nodeId) { continue; } StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = buildStreamMetadata(streamMapper.listByNode(nodeId, StreamState.OPEN), session); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withDstNodeId(nodeId) + .withState(StreamState.OPEN) + .build(); + List streams = buildStreamMetadata(streamMapper.byCriteria(criteria), session); future.complete(streams); break; } @@ -1069,7 +1079,8 @@ public CompletableFuture> getStreams(List streamIds) return CompletableFuture.supplyAsync(() -> { try (SqlSession session = openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.listByStreamIds(streamIds); + StreamCriteria criteria = StreamCriteria.newBuilder().addBatchStreamIds(streamIds).build(); + List streams = streamMapper.byCriteria(criteria); return buildStreamMetadata(streams, session); } }, asyncExecutorService); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/StreamCriteria.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/StreamCriteria.java index b960dedd0..0cda1f9f8 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/StreamCriteria.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/StreamCriteria.java @@ -18,12 +18,24 @@ package com.automq.rocketmq.controller.metadata.database.dao; import apache.rocketmq.controller.v1.StreamState; +import java.util.ArrayList; +import java.util.Collection; import java.util.Date; +import java.util.List; public class StreamCriteria { - Long id; + List ids; + + Integer dstNodeId; + + Integer srcNodeId; + + Long groupId; + Long topicId; + Integer queueId; + StreamState state; Date updateTime; @@ -34,11 +46,38 @@ public static class StreamCriteriaBuilder { StreamCriteriaBuilder() { } - public StreamCriteriaBuilder withStreamId(long streamId) { - criteria.id = streamId; + public StreamCriteriaBuilder addStreamId(long streamId) { + if (null == criteria.ids) { + criteria.ids = new ArrayList<>(); + } + criteria.ids.add(streamId); return this; } + public StreamCriteriaBuilder addBatchStreamIds(Collection ids) { + if (null == criteria.ids) { + criteria.ids = new ArrayList<>(); + } + criteria.ids.addAll(ids); + return this; + } + + public StreamCriteriaBuilder withDstNodeId(int dstNodeId) { + criteria.dstNodeId = dstNodeId; + return this; + } + + public StreamCriteriaBuilder withSrcNodeId(int srcNodeId) { + criteria.srcNodeId = srcNodeId; + return this; + } + + public StreamCriteriaBuilder withGroupId(Long groupId) { + criteria.groupId = groupId; + return this; + } + + public StreamCriteriaBuilder withTopicId(long topicId) { criteria.topicId = topicId; return this; @@ -68,8 +107,20 @@ public static StreamCriteriaBuilder newBuilder() { return new StreamCriteriaBuilder(); } - public Long getId() { - return id; + public List getIds() { + return ids; + } + + public Integer getDstNodeId() { + return dstNodeId; + } + + public Integer getSrcNodeId() { + return srcNodeId; + } + + public Long getGroupId() { + return groupId; } public Long getTopicId() { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/StreamMapper.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/StreamMapper.java index fdb4abb52..34c8aa7d5 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/StreamMapper.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/StreamMapper.java @@ -39,12 +39,8 @@ public interface StreamMapper { int delete(@Param("id") Long id); - List list(@Param("topicId") Long topicId, @Param("queueId") Integer queueId, @Param("groupId") Long groupId); - List byCriteria(@Param("criteria") StreamCriteria criteria); - List listByNode(@Param("nodeId") int nodeId, @Param("state") StreamState state); - void update(Stream stream); long queueEpoch(@Param("topicId") long topicId, @@ -55,6 +51,4 @@ int planMove(@Param("criteria") StreamCriteria criteria, @Param("srcNodeId") int src, @Param("dstNodeId") int dst, @Param("state") StreamState state); - - List listByStreamIds(@Param("streamIds") List streamIds); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java b/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java index 7ffc5c5d3..d7c3bea08 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java @@ -21,6 +21,7 @@ import com.automq.rocketmq.controller.exception.ControllerException; import com.automq.rocketmq.controller.metadata.MetadataStore; import com.automq.rocketmq.controller.metadata.database.dao.Stream; +import com.automq.rocketmq.controller.metadata.database.dao.StreamCriteria; import com.automq.rocketmq.controller.metadata.database.dao.Topic; import com.automq.rocketmq.controller.metadata.database.mapper.S3ObjectMapper; import com.automq.rocketmq.controller.metadata.database.mapper.S3StreamObjectMapper; @@ -57,7 +58,10 @@ public void process() throws ControllerException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.HOUR, -topic.getRetentionHours()); Date threshold = calendar.getTime(); - List streamIds = streamMapper.list(topic.getId(), null, null) + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topic.getId()) + .build(); + List streamIds = streamMapper.byCriteria(criteria) .stream() .map(Stream::getId) .toList(); diff --git a/controller/src/main/resources/database/mapper/StreamMapper.xml b/controller/src/main/resources/database/mapper/StreamMapper.xml index 2ecca02ee..c1e5ed0cd 100644 --- a/controller/src/main/resources/database/mapper/StreamMapper.xml +++ b/controller/src/main/resources/database/mapper/StreamMapper.xml @@ -73,7 +73,11 @@ UPDATE stream SET state = #{state} - id = #{criteria.id} + + + #{item} + + AND topic_id = #{criteria.topicId} AND queue_id = #{criteria.queueId} AND state = #{criteria.state} @@ -86,7 +90,11 @@ dst_node_id = #{dstNodeId}, state = #{state} - id = #{criteria.id} + + + #{item} + + AND topic_id = #{criteria.topicId} AND queue_id = #{criteria.queueId} AND state = #{criteria.state} @@ -101,54 +109,29 @@ - - - - - - diff --git a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java index 5469c9051..d158858a5 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java @@ -73,6 +73,7 @@ import com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject; import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject; import com.automq.rocketmq.controller.metadata.database.dao.Stream; +import com.automq.rocketmq.controller.metadata.database.dao.StreamCriteria; import com.automq.rocketmq.controller.metadata.database.dao.Topic; import com.automq.rocketmq.controller.metadata.database.mapper.GroupMapper; import com.automq.rocketmq.controller.metadata.database.mapper.GroupProgressMapper; @@ -607,7 +608,11 @@ public void testOpenStream() throws IOException, ExecutionException, Interrupted try (SqlSession session = getSessionFactory().openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, null); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + List streams = streamMapper.byCriteria(criteria); Assertions.assertEquals(1, streams.size()); Assertions.assertEquals(StreamState.OPEN, streams.get(0).getState()); } @@ -794,7 +799,11 @@ public void testCloseStream() throws IOException, ExecutionException, Interrupte try (SqlSession session = getSessionFactory().openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, null); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + List streams = streamMapper.byCriteria(criteria); Assertions.assertEquals(1, streams.size()); Assertions.assertEquals(StreamState.CLOSED, streams.get(0).getState()); } @@ -923,7 +932,11 @@ public void testTrimStream() throws IOException, ExecutionException, Interrupted try (SqlSession session = getSessionFactory().openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, null); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + List streams = streamMapper.byCriteria(criteria); Assertions.assertEquals(1, streams.size()); Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset()); @@ -1020,7 +1033,11 @@ public void testTrimStream_WithS3Stream() throws IOException, ExecutionException try (SqlSession session = getSessionFactory().openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, null); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + List streams = streamMapper.byCriteria(criteria); Assertions.assertEquals(1, streams.size()); Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset()); @@ -1087,10 +1104,9 @@ public void testTrimStream_WithS3WAL() throws IOException, ExecutionException, I S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); buildS3WalObjs(objectId, 1).stream() - .map(s3WalObject -> { + .peek(s3WalObject -> { Map subStreams = buildWalSubStreams(1, 0, 10); s3WalObject.setSubStreams(gson.toJson(subStreams)); - return s3WalObject; }).forEach(s3WALObjectMapper::create); session.commit(); } @@ -1117,7 +1133,11 @@ public void testTrimStream_WithS3WAL() throws IOException, ExecutionException, I try (SqlSession session = getSessionFactory().openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, null); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + List streams = streamMapper.byCriteria(criteria); Assertions.assertEquals(1, streams.size()); Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset()); @@ -1219,7 +1239,11 @@ public void testTrimStream_WithRange() throws IOException, ExecutionException, I try (SqlSession session = getSessionFactory().openSession()) { StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, queueId, null); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + List streams = streamMapper.byCriteria(criteria); Assertions.assertEquals(1, streams.size()); Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset()); @@ -1304,7 +1328,7 @@ public void test3StreamObjects_2PC() throws IOException, ExecutionException, Int try (SqlSession session = this.getSessionFactory().openSession()) { S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(objectId,2 ,1234, 1234).forEach(s3StreamObjectMapper::create); + buildS3StreamObjs(objectId, 2, 1234, 1234).forEach(s3StreamObjectMapper::create); session.commit(); } @@ -1384,7 +1408,7 @@ public void test3StreamObjects_2PC_Expired() throws IOException, ExecutionExcept s3ObjectMapper.prepare(s3Object1); S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(objectId,1 ,1234, 1234).forEach(s3StreamObjectMapper::create); + buildS3StreamObjs(objectId, 1, 1234, 1234).forEach(s3StreamObjectMapper::create); session.commit(); } @@ -1749,7 +1773,6 @@ public void test3WALObjects_2PC_Expired() throws IOException, ExecutionException .build()) .toList(); - apache.rocketmq.controller.v1.S3WALObject walObject = apache.rocketmq.controller.v1.S3WALObject.newBuilder() .setObjectId(objectId + 4) .setSequenceId(11) @@ -1806,7 +1829,6 @@ public void test3WALObjects_2PC() throws IOException, ExecutionException, Interr objectId = reply.getFirstObjectId(); } - try (SqlSession session = this.getSessionFactory().openSession()) { S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); buildS3WalObjs(objectId + 2, 1).stream() @@ -1925,7 +1947,6 @@ public void test3WALObjects_2PC_duplicate() throws IOException, ExecutionExcepti objectId = reply.getFirstObjectId(); } - try (SqlSession session = this.getSessionFactory().openSession()) { S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); buildS3WalObjs(objectId + 2, 1).stream() diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/GrpcControllerClientTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/GrpcControllerClientTest.java index e97da5f83..b0d914d11 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/GrpcControllerClientTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/GrpcControllerClientTest.java @@ -154,6 +154,9 @@ public void testCreateTopic_duplicate() throws IOException { int queueNum = 4; MetadataStore metadataStore = Mockito.mock(MetadataStore.class); ControllerServiceImpl svc = new ControllerServiceImpl(metadataStore); + ControllerException e = new ControllerException(Code.DUPLICATED_VALUE, "Topic is not available"); + Mockito.when(metadataStore.createTopic(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.failedFuture(e)); try (ControllerTestServer testServer = new ControllerTestServer(0, svc); ControllerClient client = new GrpcControllerClient(config) ) { diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/StreamTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/StreamTest.java index 9a2be3c28..5a2a877d0 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/StreamTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/StreamTest.java @@ -64,7 +64,7 @@ public void testCreateStream() throws IOException { streamMapper.delete(createdStream.getId()); - List streams = streamMapper.list(null, null, null); + List streams = streamMapper.byCriteria(StreamCriteria.newBuilder().build()); Assertions.assertTrue(streams.isEmpty()); } } @@ -147,7 +147,7 @@ public void testUpdateStreamState() throws IOException { Assertions.assertEquals(StreamState.OPEN, stream1.getState()); StreamCriteria criteria = StreamCriteria.newBuilder() - .withStreamId(stream1.getId()) + .addStreamId(stream1.getId()) .withState(StreamState.OPEN) .build(); affectedRows = streamMapper.updateStreamState(criteria, StreamState.CLOSED); diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java index f3e92aa5b..b25a5f703 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java @@ -47,6 +47,7 @@ import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject; import com.automq.rocketmq.controller.metadata.database.dao.Stream; import com.automq.rocketmq.controller.metadata.database.dao.Range; +import com.automq.rocketmq.controller.metadata.database.dao.StreamCriteria; import com.automq.rocketmq.controller.metadata.database.dao.Topic; import com.automq.rocketmq.controller.metadata.database.mapper.GroupMapper; import com.automq.rocketmq.controller.metadata.database.mapper.NodeMapper; @@ -250,7 +251,7 @@ void testCreateTopic() throws IOException, ExecutionException, InterruptedExcept Assertions.assertEquals(4, assignments.size()); StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, null, null); + List streams = streamMapper.byCriteria(StreamCriteria.newBuilder().withTopicId(topicId).build()); // By default, we create 3 streams for each message queue: data, ops, snapshot Assertions.assertEquals(queueNum * 3, streams.size()); } @@ -324,7 +325,7 @@ void testUpdateTopic() throws IOException, ExecutionException, InterruptedExcept Assertions.assertEquals(4, assignments.size()); StreamMapper streamMapper = session.getMapper(StreamMapper.class); - List streams = streamMapper.list(topicId, null, null); + List streams = streamMapper.byCriteria(StreamCriteria.newBuilder().withTopicId(topicId).build()); // By default, we create 3 streams for each message queue: data, ops, snapshot Assertions.assertEquals(queueNum * 3, streams.size()); }