Skip to content

Commit

Permalink
fix: commit wal implementation (#548)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 2, 2023
1 parent b3f75ad commit 5c2da57
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public interface StreamMapper {

int create(Stream stream);

/**
* For test purpose only.
*/
int insert(Stream stream);

Stream getByStreamId(long id);

int increaseEpoch(long id);
Expand Down
18 changes: 18 additions & 0 deletions metadata-jdbc/src/main/resources/database/mapper/StreamMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@
)
</insert>

<insert id="insert" parameterType="Stream" useGeneratedKeys="false">
INSERT INTO stream
(id, topic_id, queue_id, stream_role, group_id, src_node_id, dst_node_id, epoch, range_id, start_offset, state)
VALUES (
#{id},
#{topicId},
#{queueId},
#{streamRole},
#{groupId},
#{srcNodeId},
#{dstNodeId},
#{epoch},
#{rangeId},
#{startOffset},
#{state}
)
</insert>

<update id="increaseEpoch" parameterType="long">
UPDATE stream
SET epoch = epoch + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -134,24 +135,40 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
int brokerId = walObject.getBrokerId();
long objectId = walObject.getObjectId();

if (Objects.isNull(compactedObjects) || compactedObjects.isEmpty()) {
// verify stream continuity
List<long[]> offsets = java.util.stream.Stream.concat(
streamObjects.stream()
.map(s3StreamObject -> new long[] {s3StreamObject.getStreamId(), s3StreamObject.getStartOffset(), s3StreamObject.getEndOffset()}),
walObject.getSubStreams().getSubStreamsMap().entrySet()
.stream()
.map(obj -> new long[] {obj.getKey(), obj.getValue().getStartOffset(), obj.getValue().getEndOffset()})
).toList();

if (!checkStreamAdvance(session, offsets)) {
LOGGER.error("S3WALObject[object-id={}]'s stream advance check failed", walObject.getObjectId());
ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE, String.format("S3WALObject[object-id=%d]'s stream advance check failed", walObject.getObjectId()));
future.completeExceptionally(e);
return future;
Map<Long, List<Pair<Long, Long>>> streamSegments = new HashMap<>();
for (S3StreamObject item : streamObjects) {
if (!streamSegments.containsKey(item.getStreamId())) {
streamSegments.put(item.getStreamId(), new ArrayList<>());
}
streamSegments.get(item.getStreamId()).add(new ImmutablePair<>(item.getStartOffset(), item.getEndOffset()));
}

walObject.getSubStreams().getSubStreamsMap()
.forEach((key, value) -> {
if (!streamSegments.containsKey(key)) {
streamSegments.put(key, new ArrayList<>());
}
assert key == value.getStreamId();
streamSegments.get(key).add(new ImmutablePair<>(value.getStartOffset(), value.getEndOffset()));
});

// reduce and verify segment continuity
Map<Long, Pair<Long, Long>> reduced = new HashMap<>();
streamSegments.forEach((streamId, list) -> {
list.sort(Comparator.comparingLong(Pair::getLeft));
long start = list.get(0).getLeft();
long current = start;
for (Pair<Long, Long> p : list) {
if (p.getLeft() != current) {
LOGGER.warn("Trying to commit an unexpected disjoint stream ranges: {}", list);
}
current = p.getRight();
}
reduced.put(streamId, new ImmutablePair<>(start, current));
});

extendRange(session, reduced);

// commit S3 object
if (objectId != S3Constants.NOOP_OBJECT_ID && !commitObject(objectId, StreamConstants.NOOP_STREAM_ID, walObject.getObjectSize(), session)) {
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE,
Expand Down Expand Up @@ -523,39 +540,36 @@ private boolean commitObject(Long objectId, long streamId, long objectSize, SqlS
return true;
}

private boolean checkStreamAdvance(SqlSession session, List<long[]> offsets) {
if (offsets == null || offsets.isEmpty()) {
return true;
private void extendRange(SqlSession session, Map<Long, Pair<Long, Long>> segments) {
if (segments.isEmpty()) {
return;
}
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);
for (long[] offset : offsets) {
long streamId = offset[0], startOffset = offset[1], endOffset = offset[2];
// verify the stream exists and is open

for (Map.Entry<Long, Pair<Long, Long>> entry : segments.entrySet()) {
long streamId = entry.getKey();
Pair<Long, Long> segment = entry.getValue();
Stream stream = streamMapper.getByStreamId(streamId);
if (stream.getState() != StreamState.OPEN) {
LOGGER.warn("Stream[stream-id={}] not opened", streamId);
return false;
LOGGER.warn("Stream[stream-id={}] state is not OPEN", streamId);
}

Range range = rangeMapper.get(stream.getRangeId(), streamId, null);
if (Objects.isNull(range)) {
// should not happen
LOGGER.error("Stream[stream-id={}]'s current range[range-id={}] not exist when stream has been created",
streamId, stream.getRangeId());
return false;
continue;
}

if (range.getEndOffset() != startOffset) {
LOGGER.warn("Stream[stream-id={}]'s current range[range-id={}]'s end offset[{}] is not equal to request start offset[{}]",
streamId, range.getRangeId(), range.getEndOffset(), startOffset);
return false;
LOGGER.info("Extend stream range[stream-id={}, range-id={}] with segment [{}, {})",
streamId, range.getRangeId(), segment.getLeft(), segment.getRight());
if (segment.getRight() > range.getEndOffset()) {
range.setEndOffset(segment.getRight());
rangeMapper.update(range);
}

range.setEndOffset(endOffset);
rangeMapper.update(range);
}
return true;
}

public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObjects(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.automq.rocketmq.metadata.mapper.StreamMapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.LongStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -588,6 +589,33 @@ public void testCommitStreamObject_StreamNotExist() throws IOException {

}

private void insertStream(SqlSession session, long streamId, StreamState state) {
StreamMapper mapper = session.getMapper(StreamMapper.class);
Stream stream = new Stream();
stream.setId(streamId);
stream.setState(state);
stream.setQueueId(1);
stream.setTopicId(1L);
stream.setSrcNodeId(1);
stream.setDstNodeId(1);
stream.setStartOffset(0L);
stream.setRangeId(0);
stream.setStreamRole(StreamRole.STREAM_ROLE_DATA);
mapper.insert(stream);
}

private void createRange(SqlSession session, long streamId, int rangeId, long startOffset, long endOffset) {
RangeMapper mapper = session.getMapper(RangeMapper.class);
Range range = new Range();
range.setRangeId(rangeId);
range.setStartOffset(startOffset);
range.setEndOffset(endOffset);
range.setStreamId(streamId);
range.setNodeId(1);
range.setEpoch(1L);
mapper.create(range);
}

@Test
public void testCommitWALObject() throws IOException, ExecutionException, InterruptedException {
long objectId;
Expand All @@ -608,6 +636,11 @@ public void testCommitWALObject() throws IOException, ExecutionException, Interr
try (SqlSession session = this.getSessionFactory().openSession()) {
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);

LongStream.range(1, 3).forEach(streamId -> {
insertStream(session, streamId, StreamState.OPEN);
createRange(session, streamId, 0, 0, 20);
});

buildS3WalObjs(objectId + 2, 1).stream().peek(s3WalObject -> {
Map<Long, SubStream> subStreams = buildWalSubStreams(1, 20L, 10L);
s3WalObject.setSubStreams(toJson(subStreams));
Expand Down

0 comments on commit 5c2da57

Please sign in to comment.