Skip to content

Commit

Permalink
fix: simplify stream mapper (#491)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 27, 2023
1 parent 3000b7a commit c54a538
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,13 @@ public CompletableFuture<StreamMetadata> getStream(long topicId, int queueId, Lo
return CompletableFuture.supplyAsync(() -> {
try (SqlSession session = openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, groupId).stream()
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.withGroupId(groupId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria)
.stream()
.filter(stream -> stream.getStreamRole() == streamRole).toList();
if (streams.isEmpty()) {
if (streamRole == StreamRole.STREAM_ROLE_RETRY) {
Expand Down Expand Up @@ -1022,7 +1028,11 @@ public CompletableFuture<List<StreamMetadata>> listOpenStreams(int nodeId) {
continue;
}
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<StreamMetadata> streams = buildStreamMetadata(streamMapper.listByNode(nodeId, StreamState.OPEN), session);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withDstNodeId(nodeId)
.withState(StreamState.OPEN)
.build();
List<StreamMetadata> streams = buildStreamMetadata(streamMapper.byCriteria(criteria), session);
future.complete(streams);
break;
}
Expand Down Expand Up @@ -1069,7 +1079,8 @@ public CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds)
return CompletableFuture.supplyAsync(() -> {
try (SqlSession session = openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.listByStreamIds(streamIds);
StreamCriteria criteria = StreamCriteria.newBuilder().addBatchStreamIds(streamIds).build();
List<Stream> streams = streamMapper.byCriteria(criteria);
return buildStreamMetadata(streams, session);
}
}, asyncExecutorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,24 @@
package com.automq.rocketmq.controller.metadata.database.dao;

import apache.rocketmq.controller.v1.StreamState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;

public class StreamCriteria {
Long id;
List<Long> ids;

Integer dstNodeId;

Integer srcNodeId;

Long groupId;

Long topicId;

Integer queueId;

StreamState state;

Date updateTime;
Expand All @@ -34,11 +46,38 @@ public static class StreamCriteriaBuilder {
StreamCriteriaBuilder() {
}

public StreamCriteriaBuilder withStreamId(long streamId) {
criteria.id = streamId;
public StreamCriteriaBuilder addStreamId(long streamId) {
if (null == criteria.ids) {
criteria.ids = new ArrayList<>();
}
criteria.ids.add(streamId);
return this;
}

public StreamCriteriaBuilder addBatchStreamIds(Collection<Long> ids) {
if (null == criteria.ids) {
criteria.ids = new ArrayList<>();
}
criteria.ids.addAll(ids);
return this;
}

public StreamCriteriaBuilder withDstNodeId(int dstNodeId) {
criteria.dstNodeId = dstNodeId;
return this;
}

public StreamCriteriaBuilder withSrcNodeId(int srcNodeId) {
criteria.srcNodeId = srcNodeId;
return this;
}

public StreamCriteriaBuilder withGroupId(Long groupId) {
criteria.groupId = groupId;
return this;
}


public StreamCriteriaBuilder withTopicId(long topicId) {
criteria.topicId = topicId;
return this;
Expand Down Expand Up @@ -68,8 +107,20 @@ public static StreamCriteriaBuilder newBuilder() {
return new StreamCriteriaBuilder();
}

public Long getId() {
return id;
public List<Long> getIds() {
return ids;
}

public Integer getDstNodeId() {
return dstNodeId;
}

public Integer getSrcNodeId() {
return srcNodeId;
}

public Long getGroupId() {
return groupId;
}

public Long getTopicId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ public interface StreamMapper {

int delete(@Param("id") Long id);

List<Stream> list(@Param("topicId") Long topicId, @Param("queueId") Integer queueId, @Param("groupId") Long groupId);

List<Stream> byCriteria(@Param("criteria") StreamCriteria criteria);

List<Stream> listByNode(@Param("nodeId") int nodeId, @Param("state") StreamState state);

void update(Stream stream);

long queueEpoch(@Param("topicId") long topicId,
Expand All @@ -55,6 +51,4 @@ int planMove(@Param("criteria") StreamCriteria criteria,
@Param("srcNodeId") int src,
@Param("dstNodeId") int dst,
@Param("state") StreamState state);

List<Stream> listByStreamIds(@Param("streamIds") List<Long> streamIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.controller.metadata.database.dao.Stream;
import com.automq.rocketmq.controller.metadata.database.dao.StreamCriteria;
import com.automq.rocketmq.controller.metadata.database.dao.Topic;
import com.automq.rocketmq.controller.metadata.database.mapper.S3ObjectMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.S3StreamObjectMapper;
Expand Down Expand Up @@ -57,7 +58,10 @@ public void process() throws ControllerException {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.HOUR, -topic.getRetentionHours());
Date threshold = calendar.getTime();
List<Long> streamIds = streamMapper.list(topic.getId(), null, null)
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topic.getId())
.build();
List<Long> streamIds = streamMapper.byCriteria(criteria)
.stream()
.map(Stream::getId)
.toList();
Expand Down
53 changes: 18 additions & 35 deletions controller/src/main/resources/database/mapper/StreamMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@
UPDATE stream
SET state = #{state}
<where>
<if test="null != criteria.id">id = #{criteria.id}</if>
<if test="null != criteria.ids">
<foreach item="item" collection="criteria.ids" open="id IN (" separator="," close=")">
#{item}
</foreach>
</if>
<if test="null != criteria.topicId">AND topic_id = #{criteria.topicId}</if>
<if test="null != criteria.queueId">AND queue_id = #{criteria.queueId}</if>
<if test="null != criteria.state">AND state = #{criteria.state}</if>
Expand All @@ -86,7 +90,11 @@
dst_node_id = #{dstNodeId},
state = #{state}
<where>
<if test="null != criteria.id">id = #{criteria.id}</if>
<if test="null != criteria.ids">
<foreach item="item" collection="criteria.ids" open="id IN (" separator="," close=")">
#{item}
</foreach>
</if>
<if test="null != criteria.topicId">AND topic_id = #{criteria.topicId}</if>
<if test="null != criteria.queueId">AND queue_id = #{criteria.queueId}</if>
<if test="null != criteria.state">AND state = #{criteria.state}</if>
Expand All @@ -101,54 +109,29 @@
</where>
</delete>

<select id="list" resultType="Stream">
SELECT id, topic_id, queue_id, stream_role, group_id, src_node_id, dst_node_id, epoch, range_id, start_offset,
state
FROM stream
<where>
<if test="null != topicId">topic_id = #{topicId}</if>
<if test="null != queueId">AND queue_id = #{queueId}</if>
<if test="null != groupId">AND group_id = #{groupId}</if>
</where>
</select>

<select id="listByNode" resultType="Stream">
SELECT id, topic_id, queue_id, stream_role, group_id, src_node_id, dst_node_id, epoch, range_id, start_offset,
state
FROM stream
<where>
<if test="null != nodeId">dst_node_id = #{nodeId}</if>
<if test="null != state">AND state = #{state}</if>
</where>
</select>

<select id="queueEpoch" resultType="long">
SELECT MAX(epoch)
FROM stream
WHERE topic_id = #{topicId} AND queue_id = #{queueId}
</select>

<select id="listByStreamIds" resultType="Stream">
SELECT id, topic_id, queue_id, stream_role, group_id, src_node_id, dst_node_id, epoch, range_id, start_offset,
state
FROM stream
<where>
<foreach item="item" index="index" collection="streamIds" open="id IN (" separator="," close=")">
#{item}
</foreach>
</where>
</select>

<select id="byCriteria" resultType="Stream">
SELECT id, topic_id, queue_id, stream_role, group_id, src_node_id, dst_node_id, epoch, range_id, start_offset,
state
FROM stream
<where>
<if test="null != criteria.id">id = #{criteria.id}</if>
<if test="null != criteria.ids">
<foreach item="item" collection="criteria.ids" open="id IN (" separator="," close=")">
#{item}
</foreach>
</if>
<if test="null != criteria.topicId">AND topic_id = #{criteria.topicId}</if>
<if test="null != criteria.queueId">AND queue_id = #{criteria.queueId}</if>
<if test="null != criteria.state">AND state = #{criteria.state}</if>
<if test="null != criteria.updateTime">AND update_time >= #{criteria.updateTime}</if>
<if test="null != criteria.srcNodeId">AND src_node_id = #{criteria.srcNodeId}</if>
<if test="null != criteria.dstNodeId">AND dst_node_id = #{criteria.dstNodeId}</if>
<if test="null != criteria.groupId">AND group_id = #{criteria.groupId}</if>
</where>
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject;
import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject;
import com.automq.rocketmq.controller.metadata.database.dao.Stream;
import com.automq.rocketmq.controller.metadata.database.dao.StreamCriteria;
import com.automq.rocketmq.controller.metadata.database.dao.Topic;
import com.automq.rocketmq.controller.metadata.database.mapper.GroupMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.GroupProgressMapper;
Expand Down Expand Up @@ -607,7 +608,11 @@ public void testOpenStream() throws IOException, ExecutionException, Interrupted

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, null);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria);
Assertions.assertEquals(1, streams.size());
Assertions.assertEquals(StreamState.OPEN, streams.get(0).getState());
}
Expand Down Expand Up @@ -794,7 +799,11 @@ public void testCloseStream() throws IOException, ExecutionException, Interrupte

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, null);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria);
Assertions.assertEquals(1, streams.size());
Assertions.assertEquals(StreamState.CLOSED, streams.get(0).getState());
}
Expand Down Expand Up @@ -923,7 +932,11 @@ public void testTrimStream() throws IOException, ExecutionException, Interrupted

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, null);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria);
Assertions.assertEquals(1, streams.size());
Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset());

Expand Down Expand Up @@ -1020,7 +1033,11 @@ public void testTrimStream_WithS3Stream() throws IOException, ExecutionException

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, null);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria);
Assertions.assertEquals(1, streams.size());
Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset());

Expand Down Expand Up @@ -1087,10 +1104,9 @@ public void testTrimStream_WithS3WAL() throws IOException, ExecutionException, I

S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
buildS3WalObjs(objectId, 1).stream()
.map(s3WalObject -> {
.peek(s3WalObject -> {
Map<Long, SubStream> subStreams = buildWalSubStreams(1, 0, 10);
s3WalObject.setSubStreams(gson.toJson(subStreams));
return s3WalObject;
}).forEach(s3WALObjectMapper::create);
session.commit();
}
Expand All @@ -1117,7 +1133,11 @@ public void testTrimStream_WithS3WAL() throws IOException, ExecutionException, I

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, null);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria);
Assertions.assertEquals(1, streams.size());
Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset());

Expand Down Expand Up @@ -1219,7 +1239,11 @@ public void testTrimStream_WithRange() throws IOException, ExecutionException, I

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
List<Stream> streams = streamMapper.list(topicId, queueId, null);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
List<Stream> streams = streamMapper.byCriteria(criteria);
Assertions.assertEquals(1, streams.size());
Assertions.assertEquals(newStartOffset, streams.get(0).getStartOffset());

Expand Down Expand Up @@ -1304,7 +1328,7 @@ public void test3StreamObjects_2PC() throws IOException, ExecutionException, Int

try (SqlSession session = this.getSessionFactory().openSession()) {
S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
buildS3StreamObjs(objectId,2 ,1234, 1234).forEach(s3StreamObjectMapper::create);
buildS3StreamObjs(objectId, 2, 1234, 1234).forEach(s3StreamObjectMapper::create);
session.commit();
}

Expand Down Expand Up @@ -1384,7 +1408,7 @@ public void test3StreamObjects_2PC_Expired() throws IOException, ExecutionExcept
s3ObjectMapper.prepare(s3Object1);

S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
buildS3StreamObjs(objectId,1 ,1234, 1234).forEach(s3StreamObjectMapper::create);
buildS3StreamObjs(objectId, 1, 1234, 1234).forEach(s3StreamObjectMapper::create);
session.commit();
}

Expand Down Expand Up @@ -1749,7 +1773,6 @@ public void test3WALObjects_2PC_Expired() throws IOException, ExecutionException
.build())
.toList();


apache.rocketmq.controller.v1.S3WALObject walObject = apache.rocketmq.controller.v1.S3WALObject.newBuilder()
.setObjectId(objectId + 4)
.setSequenceId(11)
Expand Down Expand Up @@ -1806,7 +1829,6 @@ public void test3WALObjects_2PC() throws IOException, ExecutionException, Interr
objectId = reply.getFirstObjectId();
}


try (SqlSession session = this.getSessionFactory().openSession()) {
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
buildS3WalObjs(objectId + 2, 1).stream()
Expand Down Expand Up @@ -1925,7 +1947,6 @@ public void test3WALObjects_2PC_duplicate() throws IOException, ExecutionExcepti
objectId = reply.getFirstObjectId();
}


try (SqlSession session = this.getSessionFactory().openSession()) {
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
buildS3WalObjs(objectId + 2, 1).stream()
Expand Down
Loading

0 comments on commit c54a538

Please sign in to comment.