From bd89d09b1784acbc8adc773d8849bf8ac4a481c9 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Mon, 25 Dec 2023 19:22:49 +0800 Subject: [PATCH 1/3] feat(stream): implement MemoryMetadataManager Signed-off-by: SSpirits --- .../s3/memory/MemoryMetadataManager.java | 202 +++++++++++++--- .../AbstractOrderedCollection.java | 2 +- .../stream/s3/objects/ObjectManagerTest.java | 228 ++++++++++++++++++ .../stream/s3/streams/StreamManagerTest.java | 147 +++++++++++ 4 files changed, 539 insertions(+), 40 deletions(-) create mode 100644 s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java create mode 100644 s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index 48082bb2f..eacf50a2e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -22,6 +22,7 @@ import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.metadata.StreamState; import com.automq.stream.s3.objects.CommitStreamSetObjectRequest; import com.automq.stream.s3.objects.CommitStreamSetObjectResponse; import com.automq.stream.s3.objects.CompactStreamObjectRequest; @@ -29,51 +30,79 @@ import com.automq.stream.s3.objects.ObjectStreamRange; import com.automq.stream.s3.objects.StreamObject; import com.automq.stream.s3.streams.StreamManager; -import com.automq.stream.utils.FutureUtil; - -import java.util.Collections; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; - +import org.apache.commons.lang3.tuple.Pair; public class MemoryMetadataManager implements StreamManager, ObjectManager { + private final static AtomicLong nodeIdAlloc = new AtomicLong(); + + // Data structure of stream metadata + private final AtomicLong streamIdAlloc = new AtomicLong(); + private final ConcurrentMap streams = new ConcurrentHashMap<>(); + + // Data structure of object metadata private final AtomicLong objectIdAlloc = new AtomicLong(); - private final Map> streamObjects = new HashMap<>(); - private final Map streamSetObjects = new HashMap<>(); + private final ConcurrentMap> streamObjects = new ConcurrentHashMap<>(); + private final ConcurrentMap> streamSetObjects = new ConcurrentHashMap<>(); + + public static void advanceNodeId() { + nodeIdAlloc.getAndIncrement(); + } @Override public synchronized CompletableFuture prepareObject(int count, long ttl) { - return CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement()); + return CompletableFuture.completedFuture(objectIdAlloc.getAndAdd(count)); + } + + private static StreamOffsetRange to(ObjectStreamRange s) { + return new StreamOffsetRange(s.getStreamId(), s.getStartOffset(), s.getEndOffset()); } @Override - public synchronized CompletableFuture commitStreamSetObject(CommitStreamSetObjectRequest request) { + public synchronized CompletableFuture commitStreamSetObject( + CommitStreamSetObjectRequest request) { long dataTimeInMs = System.currentTimeMillis(); if (!request.getCompactedObjectIds().isEmpty()) { for (long id : request.getCompactedObjectIds()) { - dataTimeInMs = Math.min(streamSetObjects.get(id).dataTimeInMs(), dataTimeInMs); + dataTimeInMs = Math.min(streamSetObjects.get(id).getRight().dataTimeInMs(), dataTimeInMs); streamSetObjects.remove(id); } } long now = System.currentTimeMillis(); if (request.getObjectId() != ObjectUtils.NOOP_OBJECT_ID) { + for (ObjectStreamRange range : request.getStreamRanges()) { + StreamMetadata stream = streams.get(range.getStreamId()); + assert stream != null; + stream.setEndOffset(range.getEndOffset()); + } + S3ObjectMetadata object = new S3ObjectMetadata( - request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()), - dataTimeInMs, now, request.getObjectSize(), request.getOrderId()); - streamSetObjects.put(request.getObjectId(), object); - } - for (StreamObject r : request.getStreamObjects()) { - List objects = streamObjects.computeIfAbsent(r.getStreamId(), id -> new LinkedList<>()); - objects.add( - new S3ObjectMetadata( - r.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(r.getStreamId(), r.getStartOffset(), r.getEndOffset())), - dataTimeInMs, now, r.getObjectSize(), 0 - ) + request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()), + dataTimeInMs, now, request.getObjectSize(), request.getOrderId()); + streamSetObjects.put(request.getObjectId(), Pair.of(nodeIdAlloc.get(), object)); + } + + for (StreamObject streamObject : request.getStreamObjects()) { + long streamId = streamObject.getStreamId(); + StreamMetadata stream = streams.get(streamId); + assert stream != null; + stream.setEndOffset(streamObject.getEndOffset()); + + List metadataList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()); + metadataList.add( + new S3ObjectMetadata( + streamObject.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, streamObject.getStartOffset(), streamObject.getEndOffset())), + dataTimeInMs, now, streamObject.getObjectSize(), 0 + ) ); } request.getCompactedObjectIds().forEach(streamSetObjects::remove); @@ -82,60 +111,155 @@ public synchronized CompletableFuture commitStrea @Override public synchronized CompletableFuture compactStreamObject(CompactStreamObjectRequest request) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + long streamId = request.getStreamId(); + StreamObject streamObject = new StreamObject(); + streamObject.setStreamId(streamId); + streamObject.setStartOffset(request.getStartOffset()); + streamObject.setEndOffset(request.getEndOffset()); + streamObject.setObjectId(request.getObjectId()); + streamObject.setObjectSize(request.getObjectSize()); + + streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) + .add(new S3ObjectMetadata( + request.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, request.getStartOffset(), request.getEndOffset())), + System.currentTimeMillis(), System.currentTimeMillis(), request.getObjectSize(), 0 + )); + + HashSet idSet = new HashSet<>(request.getSourceObjectIds()); + streamObjects.get(streamId).removeIf(metadata -> idSet.contains(metadata.objectId())); + return CompletableFuture.completedFuture(null); } @Override - public synchronized CompletableFuture> getObjects(long streamId, long startOffset, long endOffset, int limit) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + public synchronized CompletableFuture> getObjects(long streamId, long startOffset, + long endOffset, int limit) { + List streamSetObjectList = streamSetObjects.values() + .stream() + .map(Pair::getRight) + .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1))) + .toList(); + List streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) + .stream() + .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1))) + .toList(); + + List result = new ArrayList<>(); + result.addAll(streamSetObjectList); + result.addAll(streamObjectList); + result.sort((o1, o2) -> { + long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset(); + long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset(); + return Long.compare(startOffset1, startOffset2); + }); + + return CompletableFuture.completedFuture(result.stream().limit(limit).toList()); } @Override public synchronized CompletableFuture> getServerObjects() { - return CompletableFuture.completedFuture(new LinkedList<>(streamSetObjects.values())); + List result = streamSetObjects.values() + .stream() + .filter(pair -> pair.getLeft() == nodeIdAlloc.get()) + .map(Pair::getRight).toList(); + return CompletableFuture.completedFuture(result); } @Override - public synchronized CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + public synchronized CompletableFuture> getStreamObjects(long streamId, long startOffset, + long endOffset, int limit) { + List streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) + .stream() + .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1))) + .limit(limit) + .toList(); + return CompletableFuture.completedFuture(streamObjectList); } @Override public synchronized CompletableFuture> getOpeningStreams() { - return CompletableFuture.completedFuture(Collections.emptyList()); + return CompletableFuture.completedFuture(streams.values().stream().toList()); } @Override public CompletableFuture> getStreams(List streamIds) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).toList()); } @Override public synchronized CompletableFuture createStream() { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + long streamId = streamIdAlloc.getAndIncrement(); + streams.put(streamId, new StreamMetadata(streamId, -1, 0, 0, StreamState.CLOSED)); + return CompletableFuture.completedFuture(streamId); } @Override public synchronized CompletableFuture openStream(long streamId, long epoch) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + StreamMetadata stream = streams.get(streamId); + if (stream == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); + } + if (stream.getState() == StreamState.OPENED) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " has been opened")); + } + if (stream.getEpoch() >= epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not newer than current epoch " + stream.getEpoch())); + } + stream.setEpoch(epoch); + stream.setState(StreamState.OPENED); + return CompletableFuture.completedFuture(stream); } @Override public synchronized CompletableFuture trimStream(long streamId, long epoch, long newStartOffset) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + StreamMetadata stream = streams.get(streamId); + if (stream == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); + } + if (stream.getState() != StreamState.OPENED) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened")); + } + if (stream.getEpoch() != epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch())); + } + if (newStartOffset < stream.getStartOffset()) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is less than current start offset " + stream.getStartOffset())); + } + if (newStartOffset > stream.getEndOffset()) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is greater than current end offset " + stream.getEndOffset())); + } + stream.setStartOffset(newStartOffset); + return CompletableFuture.completedFuture(null); } @Override public synchronized CompletableFuture closeStream(long streamId, long epoch) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); + StreamMetadata stream = streams.get(streamId); + if (stream == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); + } + if (stream.getState() != StreamState.OPENED) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened")); + } + if (stream.getEpoch() != epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch())); + } + stream.setState(StreamState.CLOSED); + return CompletableFuture.completedFuture(null); } @Override public synchronized CompletableFuture deleteStream(long streamId, long epoch) { - return FutureUtil.failedFuture(new UnsupportedOperationException()); - } - - private static StreamOffsetRange to(ObjectStreamRange s) { - return new StreamOffsetRange(s.getStreamId(), s.getStartOffset(), s.getEndOffset()); + StreamMetadata stream = streams.get(streamId); + if (stream == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); + } + if (stream.getState() != StreamState.CLOSED) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not closed")); + } + if (stream.getEpoch() != epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch())); + } + streams.remove(streamId); + return CompletableFuture.completedFuture(null); } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java index e8657f2c8..c04544d4f 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java @@ -27,7 +27,7 @@ public int search(T target) { int low = 0; int high = size() - 1; while (low <= high) { - int mid = (low + high) >>> 1; + int mid = low + (high - low) >>> 1; ComparableItem midVal = get(mid); if (midVal.isLessThan(target)) { low = mid + 1; diff --git a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java new file mode 100644 index 000000000..098bd4043 --- /dev/null +++ b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.objects; + +import com.automq.stream.s3.memory.MemoryMetadataManager; +import com.automq.stream.s3.metadata.ObjectUtils; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.StreamMetadata; +import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.streams.StreamManager; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ObjectManagerTest { + private StreamManager streamManager; + private ObjectManager objectManager; + + @BeforeEach + void setUp() { + MemoryMetadataManager metadataManager = new MemoryMetadataManager(); + streamManager = metadataManager; + objectManager = metadataManager; + } + + @Test + void prepareObject() { + assertEquals(0, objectManager.prepareObject(1, 1000).join()); + assertEquals(1, objectManager.prepareObject(1, 1000).join()); + assertEquals(2, objectManager.prepareObject(8, 1000).join()); + assertEquals(10, objectManager.prepareObject(1, 1000).join()); + } + + @Test + void testCommitAndCompact() { + Long streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + + CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); + // Commit stream set object with stream 0 offset [0, 3) and stream 1 offset [0, 5). + ArrayList streamRangeList = new ArrayList<>(); + streamRangeList.add(new ObjectStreamRange(0, 0, 0, 3, 300)); + streamRangeList.add(new ObjectStreamRange(1, 0, 0, 5, 500)); + request.setStreamRanges(streamRangeList); + request.setOrderId(0); + + // Commit stream object with stream 2 offset [0, 10). + List streamObjectList = new ArrayList<>(); + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(1); + streamObject.setStreamId(2); + streamObject.setStartOffset(0); + streamObject.setEndOffset(10); + streamObjectList.add(streamObject); + request.setStreamObjects(streamObjectList); + objectManager.commitStreamSetObject(request).join(); + + List streamMetadataList = streamManager.getStreams(List.of(0L, 1L, 2L)).join(); + assertEquals(3, streamMetadataList.size()); + assertEquals(3, streamMetadataList.get(0).getEndOffset()); + assertEquals(5, streamMetadataList.get(1).getEndOffset()); + assertEquals(10, streamMetadataList.get(2).getEndOffset()); + + List streamSetObjectMetadataList = objectManager.getServerObjects().join(); + assertEquals(1, streamSetObjectMetadataList.size()); + S3ObjectMetadata streamSetMetadata = streamSetObjectMetadataList.get(0); + List ranges = streamSetMetadata.getOffsetRanges(); + assertEquals(2, ranges.size()); + + assertEquals(0, ranges.get(0).getStreamId()); + assertEquals(0, ranges.get(0).getStartOffset()); + assertEquals(3, ranges.get(0).getEndOffset()); + + assertEquals(1, ranges.get(1).getStreamId()); + assertEquals(0, ranges.get(1).getStartOffset()); + assertEquals(5, ranges.get(1).getEndOffset()); + + List streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join(); + assertEquals(1, streamObjectMetadataList.size()); + ranges = streamObjectMetadataList.get(0).getOffsetRanges(); + assertEquals(1, ranges.size()); + assertEquals(2, ranges.get(0).getStreamId()); + assertEquals(0, ranges.get(0).getStartOffset()); + assertEquals(10, ranges.get(0).getEndOffset()); + + // Compact stream set object and commit stream object. + request = new CommitStreamSetObjectRequest(); + request.setObjectId(ObjectUtils.NOOP_OBJECT_ID); + request.setCompactedObjectIds(List.of(0L)); + + streamObjectList = new ArrayList<>(); + streamObject = new StreamObject(); + streamObject.setObjectId(2); + streamObject.setStreamId(0); + streamObject.setStartOffset(0); + streamObject.setEndOffset(3); + streamObjectList.add(streamObject); + + streamObject = new StreamObject(); + streamObject.setObjectId(3); + streamObject.setStreamId(1); + streamObject.setStartOffset(0); + streamObject.setEndOffset(5); + streamObjectList.add(streamObject); + + streamObject = new StreamObject(); + streamObject.setObjectId(4); + streamObject.setStreamId(2); + streamObject.setStartOffset(10); + streamObject.setEndOffset(20); + streamObjectList.add(streamObject); + + request.setStreamObjects(streamObjectList); + objectManager.commitStreamSetObject(request).join(); + + streamSetObjectMetadataList = objectManager.getServerObjects().join(); + assertEquals(0, streamSetObjectMetadataList.size()); + + streamObjectMetadataList = objectManager.getStreamObjects(0, 0, 10, 100).join(); + assertEquals(1, streamObjectMetadataList.size()); + streamObjectMetadataList = objectManager.getStreamObjects(1, 0, 10, 100).join(); + assertEquals(1, streamObjectMetadataList.size()); + streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join(); + assertEquals(2, streamObjectMetadataList.size()); + + // Compact stream object. + objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0, 20, List.of(1L, 4L))).join(); + streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join(); + assertEquals(1, streamObjectMetadataList.size()); + ranges = streamObjectMetadataList.get(0).getOffsetRanges(); + assertEquals(1, ranges.size()); + assertEquals(2, ranges.get(0).getStreamId()); + assertEquals(0, ranges.get(0).getStartOffset()); + assertEquals(20, ranges.get(0).getEndOffset()); + } + + @Test + void testGetObject() { + // Create and open stream 0 and 1. + Long streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + + CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); + // Commit stream set object with stream 0 offset [0, 3) and stream 1 offset [0, 5). + ArrayList streamRangeList = new ArrayList<>(); + streamRangeList.add(new ObjectStreamRange(0, 0, 0, 3, 300)); + streamRangeList.add(new ObjectStreamRange(1, 0, 0, 5, 500)); + request.setStreamRanges(streamRangeList); + request.setOrderId(0); + + List streamObjectList = new ArrayList<>(); + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(1); + streamObject.setStreamId(0); + streamObject.setStartOffset(3); + streamObject.setEndOffset(5); + streamObjectList.add(streamObject); + + streamObject = new StreamObject(); + streamObject.setObjectId(2); + streamObject.setStreamId(0); + streamObject.setStartOffset(5); + streamObject.setEndOffset(10); + streamObjectList.add(streamObject); + + request.setStreamObjects(streamObjectList); + objectManager.commitStreamSetObject(request).join(); + + // Get object with stream 0 offset [0, 10). + List streamObjectMetadataList = objectManager.getObjects(0, 0, 10, 100).join(); + assertEquals(3, streamObjectMetadataList.size()); + // Get object with stream 0 offset [1, 9). + streamObjectMetadataList = objectManager.getObjects(0, 1, 9, 100).join(); + assertEquals(3, streamObjectMetadataList.size()); + // Get object with stream 0 offset [3, 4). + streamObjectMetadataList = objectManager.getObjects(0, 0, 1, 100).join(); + assertEquals(1, streamObjectMetadataList.size()); + assertEquals(0, streamObjectMetadataList.get(0).objectId()); + // Get object with limit 1. + streamObjectMetadataList = objectManager.getObjects(0, 0, 10, 1).join(); + assertEquals(1, streamObjectMetadataList.size()); + assertEquals(0, streamObjectMetadataList.get(0).objectId()); + + // Get stream object with stream 0 offset [3, 10). + streamObjectMetadataList = objectManager.getStreamObjects(0, 3, 10, 100).join(); + assertEquals(2, streamObjectMetadataList.size()); + // Get stream object with stream 0 offset [5, 10). + streamObjectMetadataList = objectManager.getStreamObjects(0, 5, 10, 100).join(); + assertEquals(1, streamObjectMetadataList.size()); + assertEquals(2, streamObjectMetadataList.get(0).objectId()); + // Get object with limit 1. + streamObjectMetadataList = objectManager.getStreamObjects(0, 0, 10, 1).join(); + assertEquals(1, streamObjectMetadataList.size()); + assertEquals(1, streamObjectMetadataList.get(0).objectId()); + + // Get all stream set objects belonging current node. + streamObjectMetadataList = objectManager.getServerObjects().join(); + assertEquals(1, streamObjectMetadataList.size()); + // Change node id. + MemoryMetadataManager.advanceNodeId(); + streamObjectMetadataList = objectManager.getServerObjects().join(); + assertEquals(0, streamObjectMetadataList.size()); + } +} \ No newline at end of file diff --git a/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java new file mode 100644 index 000000000..e8b58161f --- /dev/null +++ b/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.streams; + +import com.automq.stream.s3.memory.MemoryMetadataManager; +import com.automq.stream.s3.metadata.StreamMetadata; +import com.automq.stream.s3.metadata.StreamState; +import com.automq.stream.s3.objects.CommitStreamSetObjectRequest; +import com.automq.stream.s3.objects.ObjectManager; +import com.automq.stream.s3.objects.StreamObject; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class StreamManagerTest { + StreamManager streamManager; + + @BeforeEach + public void setUp() throws Exception { + streamManager = new MemoryMetadataManager(); + } + + @Test + public void testCreateAndOpenStream() { + // Create and open stream with epoch 0. + Long streamId = streamManager.createStream().join(); + StreamMetadata streamMetadata = streamManager.openStream(streamId, 0).join(); + assertEquals(streamId, streamMetadata.getStreamId()); + assertEquals(0, streamMetadata.getEpoch()); + assertEquals(0, streamMetadata.getStartOffset()); + assertEquals(0, streamMetadata.getEndOffset()); + assertEquals(StreamState.OPENED, streamMetadata.getState()); + } + + @Test + public void testOpenAndCloseStream() { + // Create and open stream with epoch 0. + Long streamId = streamManager.createStream().join(); + StreamMetadata streamMetadata = streamManager.openStream(streamId, 0).join(); + + // Close stream with epoch 1. + CompletableFuture future = streamManager.closeStream(streamId, 1); + assertEquals(StreamState.OPENED, streamMetadata.getState()); + assertTrue(future.isCompletedExceptionally()); + + // Close stream with epoch 0. + streamManager.closeStream(streamId, 0).join(); + assertEquals(StreamState.CLOSED, streamMetadata.getState()); + + // Open stream with epoch 0. + CompletableFuture future1 = streamManager.openStream(streamId, 0); + assertTrue(future1.isCompletedExceptionally()); + + // Open stream with epoch 1. + streamMetadata = streamManager.openStream(streamId, 1).join(); + assertEquals(streamId, streamMetadata.getStreamId()); + assertEquals(1, streamMetadata.getEpoch()); + assertEquals(0, streamMetadata.getStartOffset()); + assertEquals(0, streamMetadata.getEndOffset()); + assertEquals(StreamState.OPENED, streamMetadata.getState()); + + // Close stream with epoch 1. + streamManager.closeStream(streamId, 1).join(); + assertEquals(StreamState.CLOSED, streamMetadata.getState()); + streamManager.deleteStream(streamId, 1).join(); + List streamMetadataList = streamManager.getOpeningStreams().join(); + assertEquals(0, streamMetadataList.size()); + } + + @Test + public void testTrimStream() { + // Create and open stream with epoch 0. + Long streamId = streamManager.createStream().join(); + StreamMetadata streamMetadata = streamManager.openStream(streamId, 0).join(); + + // Trim stream with epoch 1. + CompletableFuture future = streamManager.trimStream(streamId, 1, 1); + assertTrue(future.isCompletedExceptionally()); + + // Trim stream to invalid offset. + CompletableFuture future1 = streamManager.trimStream(streamId, 0, -1); + assertTrue(future1.isCompletedExceptionally()); + future1 = streamManager.trimStream(streamId, 0, 1); + assertTrue(future1.isCompletedExceptionally()); + + // Advance offset and trim stream. + CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); + ArrayList streamObjectList = new ArrayList<>(); + StreamObject streamObject = new StreamObject(); + streamObject.setStreamId(streamId); + streamObject.setStartOffset(0); + streamObject.setEndOffset(10); + streamObjectList.add(streamObject); + request.setStreamObjects(streamObjectList); + ((ObjectManager) streamManager).commitStreamSetObject(request).join(); + assertEquals(10, streamMetadata.getEndOffset()); + + streamManager.trimStream(streamId, 0, 5).join(); + assertEquals(5, streamMetadata.getStartOffset()); + } + + @Test + public void testGetStreams() { + ArrayList streamIds = new ArrayList<>(); + // Create and open stream with epoch 0. + Long streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + streamIds.add(streamId); + + streamId = streamManager.createStream().join(); + streamManager.openStream(streamId, 0).join(); + streamIds.add(streamId); + + // Get streams. + List streamMetadataList = streamManager.getStreams(streamIds).join(); + assertEquals(2, streamMetadataList.size()); + assertEquals(streamId, streamMetadataList.get(1).getStreamId()); + assertEquals(0, streamMetadataList.get(1).getEpoch()); + assertEquals(0, streamMetadataList.get(1).getStartOffset()); + assertEquals(0, streamMetadataList.get(1).getEndOffset()); + assertEquals(StreamState.OPENED, streamMetadataList.get(1).getState()); + + streamIds.add(Long.MAX_VALUE); + streamMetadataList = streamManager.getStreams(streamIds).join(); + assertEquals(2, streamMetadataList.size()); + } +} \ No newline at end of file From de3cad36eb5ce1dc6ebe16f70e898cfdcd011f93 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Mon, 25 Dec 2023 19:27:09 +0800 Subject: [PATCH 2/3] fix(stream): rename a static variable Signed-off-by: SSpirits --- .../automq/stream/s3/memory/MemoryMetadataManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index eacf50a2e..f64ccc195 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -43,7 +43,7 @@ import org.apache.commons.lang3.tuple.Pair; public class MemoryMetadataManager implements StreamManager, ObjectManager { - private final static AtomicLong nodeIdAlloc = new AtomicLong(); + private final static AtomicLong NODE_ID_ALLOC = new AtomicLong(); // Data structure of stream metadata private final AtomicLong streamIdAlloc = new AtomicLong(); @@ -55,7 +55,7 @@ public class MemoryMetadataManager implements StreamManager, ObjectManager { private final ConcurrentMap> streamSetObjects = new ConcurrentHashMap<>(); public static void advanceNodeId() { - nodeIdAlloc.getAndIncrement(); + NODE_ID_ALLOC.getAndIncrement(); } @Override @@ -88,7 +88,7 @@ public synchronized CompletableFuture commitStrea S3ObjectMetadata object = new S3ObjectMetadata( request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()), dataTimeInMs, now, request.getObjectSize(), request.getOrderId()); - streamSetObjects.put(request.getObjectId(), Pair.of(nodeIdAlloc.get(), object)); + streamSetObjects.put(request.getObjectId(), Pair.of(NODE_ID_ALLOC.get(), object)); } for (StreamObject streamObject : request.getStreamObjects()) { @@ -159,7 +159,7 @@ public synchronized CompletableFuture> getObjects(long st public synchronized CompletableFuture> getServerObjects() { List result = streamSetObjects.values() .stream() - .filter(pair -> pair.getLeft() == nodeIdAlloc.get()) + .filter(pair -> pair.getLeft() == NODE_ID_ALLOC.get()) .map(Pair::getRight).toList(); return CompletableFuture.completedFuture(result); } From f653b0dfca6c77f97b92382b9b1e8397bda34afd Mon Sep 17 00:00:00 2001 From: SSpirits Date: Mon, 25 Dec 2023 19:50:25 +0800 Subject: [PATCH 3/3] fix(stream): fix binary search Signed-off-by: SSpirits --- .../stream/utils/biniarysearch/AbstractOrderedCollection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java index c04544d4f..f004abd31 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/AbstractOrderedCollection.java @@ -27,7 +27,7 @@ public int search(T target) { int low = 0; int high = size() - 1; while (low <= high) { - int mid = low + (high - low) >>> 1; + int mid = low + ((high - low) >>> 1); ComparableItem midVal = get(mid); if (midVal.isLessThan(target)) { low = mid + 1;