Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): implement MemoryMetadataManager #854

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,87 @@
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.metadata.StreamState;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.CommitStreamSetObjectResponse;
import com.automq.stream.s3.objects.CompactStreamObjectRequest;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.FutureUtil;

import java.util.Collections;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

public class MemoryMetadataManager implements StreamManager, ObjectManager {
private final static AtomicLong NODE_ID_ALLOC = new AtomicLong();

// Data structure of stream metadata
private final AtomicLong streamIdAlloc = new AtomicLong();
private final ConcurrentMap<Long, StreamMetadata> streams = new ConcurrentHashMap<>();

// Data structure of object metadata
private final AtomicLong objectIdAlloc = new AtomicLong();
private final Map<Long, List<S3ObjectMetadata>> streamObjects = new HashMap<>();
private final Map<Long, S3ObjectMetadata> streamSetObjects = new HashMap<>();
private final ConcurrentMap<Long, List<S3ObjectMetadata>> streamObjects = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, Pair<Long, S3ObjectMetadata>> streamSetObjects = new ConcurrentHashMap<>();

public static void advanceNodeId() {
NODE_ID_ALLOC.getAndIncrement();
}

@Override
public synchronized CompletableFuture<Long> prepareObject(int count, long ttl) {
return CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement());
return CompletableFuture.completedFuture(objectIdAlloc.getAndAdd(count));
}

private static StreamOffsetRange to(ObjectStreamRange s) {
return new StreamOffsetRange(s.getStreamId(), s.getStartOffset(), s.getEndOffset());
}

@Override
public synchronized CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(CommitStreamSetObjectRequest request) {
public synchronized CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(
CommitStreamSetObjectRequest request) {
long dataTimeInMs = System.currentTimeMillis();
if (!request.getCompactedObjectIds().isEmpty()) {
for (long id : request.getCompactedObjectIds()) {
dataTimeInMs = Math.min(streamSetObjects.get(id).dataTimeInMs(), dataTimeInMs);
dataTimeInMs = Math.min(streamSetObjects.get(id).getRight().dataTimeInMs(), dataTimeInMs);
streamSetObjects.remove(id);
}
}
long now = System.currentTimeMillis();
if (request.getObjectId() != ObjectUtils.NOOP_OBJECT_ID) {
for (ObjectStreamRange range : request.getStreamRanges()) {
StreamMetadata stream = streams.get(range.getStreamId());
assert stream != null;
stream.setEndOffset(range.getEndOffset());
}

S3ObjectMetadata object = new S3ObjectMetadata(
request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()),
dataTimeInMs, now, request.getObjectSize(), request.getOrderId());
streamSetObjects.put(request.getObjectId(), object);
}
for (StreamObject r : request.getStreamObjects()) {
List<S3ObjectMetadata> objects = streamObjects.computeIfAbsent(r.getStreamId(), id -> new LinkedList<>());
objects.add(
new S3ObjectMetadata(
r.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(r.getStreamId(), r.getStartOffset(), r.getEndOffset())),
dataTimeInMs, now, r.getObjectSize(), 0
)
request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()),
dataTimeInMs, now, request.getObjectSize(), request.getOrderId());
streamSetObjects.put(request.getObjectId(), Pair.of(NODE_ID_ALLOC.get(), object));
}

for (StreamObject streamObject : request.getStreamObjects()) {
long streamId = streamObject.getStreamId();
StreamMetadata stream = streams.get(streamId);
assert stream != null;
stream.setEndOffset(streamObject.getEndOffset());

List<S3ObjectMetadata> metadataList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>());
metadataList.add(
new S3ObjectMetadata(
streamObject.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, streamObject.getStartOffset(), streamObject.getEndOffset())),
dataTimeInMs, now, streamObject.getObjectSize(), 0
)
);
}
request.getCompactedObjectIds().forEach(streamSetObjects::remove);
Expand All @@ -82,60 +111,155 @@ public synchronized CompletableFuture<CommitStreamSetObjectResponse> commitStrea

@Override
public synchronized CompletableFuture<Void> compactStreamObject(CompactStreamObjectRequest request) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
long streamId = request.getStreamId();
StreamObject streamObject = new StreamObject();
streamObject.setStreamId(streamId);
streamObject.setStartOffset(request.getStartOffset());
streamObject.setEndOffset(request.getEndOffset());
streamObject.setObjectId(request.getObjectId());
streamObject.setObjectSize(request.getObjectSize());

streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.add(new S3ObjectMetadata(
request.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, request.getStartOffset(), request.getEndOffset())),
System.currentTimeMillis(), System.currentTimeMillis(), request.getObjectSize(), 0
));

HashSet<Long> idSet = new HashSet<>(request.getSourceObjectIds());
streamObjects.get(streamId).removeIf(metadata -> idSet.contains(metadata.objectId()));
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long startOffset, long endOffset, int limit) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long startOffset,
long endOffset, int limit) {
List<S3ObjectMetadata> streamSetObjectList = streamSetObjects.values()
.stream()
.map(Pair::getRight)
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1)))
.toList();
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1)))
.toList();

List<S3ObjectMetadata> result = new ArrayList<>();
result.addAll(streamSetObjectList);
result.addAll(streamObjectList);
result.sort((o1, o2) -> {
long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
return Long.compare(startOffset1, startOffset2);
});

return CompletableFuture.completedFuture(result.stream().limit(limit).toList());
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getServerObjects() {
return CompletableFuture.completedFuture(new LinkedList<>(streamSetObjects.values()));
List<S3ObjectMetadata> result = streamSetObjects.values()
.stream()
.filter(pair -> pair.getLeft() == NODE_ID_ALLOC.get())
.map(Pair::getRight).toList();
return CompletableFuture.completedFuture(result);
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId, long startOffset,
long endOffset, int limit) {
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1)))
.limit(limit)
.toList();
return CompletableFuture.completedFuture(streamObjectList);
}

@Override
public synchronized CompletableFuture<List<StreamMetadata>> getOpeningStreams() {
return CompletableFuture.completedFuture(Collections.emptyList());
return CompletableFuture.completedFuture(streams.values().stream().toList());
ShadowySpirits marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).toList());
}

@Override
public synchronized CompletableFuture<Long> createStream() {
return FutureUtil.failedFuture(new UnsupportedOperationException());
long streamId = streamIdAlloc.getAndIncrement();
streams.put(streamId, new StreamMetadata(streamId, -1, 0, 0, StreamState.CLOSED));
return CompletableFuture.completedFuture(streamId);
}

@Override
public synchronized CompletableFuture<StreamMetadata> openStream(long streamId, long epoch) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() == StreamState.OPENED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " has been opened"));
}
if (stream.getEpoch() >= epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not newer than current epoch " + stream.getEpoch()));
}
stream.setEpoch(epoch);
stream.setState(StreamState.OPENED);
return CompletableFuture.completedFuture(stream);
}

@Override
public synchronized CompletableFuture<Void> trimStream(long streamId, long epoch, long newStartOffset) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() != StreamState.OPENED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened"));
}
if (stream.getEpoch() != epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch()));
}
if (newStartOffset < stream.getStartOffset()) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is less than current start offset " + stream.getStartOffset()));
}
if (newStartOffset > stream.getEndOffset()) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is greater than current end offset " + stream.getEndOffset()));
}
stream.setStartOffset(newStartOffset);
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> closeStream(long streamId, long epoch) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() != StreamState.OPENED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened"));
}
if (stream.getEpoch() != epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch()));
}
stream.setState(StreamState.CLOSED);
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> deleteStream(long streamId, long epoch) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

private static StreamOffsetRange to(ObjectStreamRange s) {
return new StreamOffsetRange(s.getStreamId(), s.getStartOffset(), s.getEndOffset());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() != StreamState.CLOSED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not closed"));
}
if (stream.getEpoch() != epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch()));
}
streams.remove(streamId);
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public int search(T target) {
int low = 0;
int high = size() - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
int mid = low + ((high - low) >>> 1);
ComparableItem<T> midVal = get(mid);
if (midVal.isLessThan(target)) {
low = mid + 1;
Expand Down
Loading
Loading