Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): support stream epoch when commit stream object #926

Merged
merged 1 commit into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public interface Stream {
*/
long streamId();

/**
* Get stream epoch.
*/
long streamEpoch();

/**
* Get stream start offset.
*/
Expand Down
5 changes: 5 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long streamId() {
return this.streamId;
}

@Override
public long streamEpoch() {
return this.epoch;
}

@Override
public long startOffset() {
return this.startOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ void compact0() throws ExecutionException, InterruptedException {
continue;
}
long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, startOffset,
objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, stream.streamEpoch(),
startOffset, objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
if (requestOpt.isPresent()) {
CompactStreamObjectRequest request = requestOpt.get();
objectManager.compactStreamObject(request).get();
Expand All @@ -97,15 +97,17 @@ List<List<S3ObjectMetadata>> group() throws ExecutionException, InterruptedExcep
static class StreamObjectGroupCompactor {
private final List<S3ObjectMetadata> objectGroup;
private final long streamId;
private final long streamEpoch;
private final long startOffset;
// compact object group to the new object
private final long objectId;
private final S3Operator s3Operator;
private final int dataBlockGroupSizeThreshold;

public StreamObjectGroupCompactor(long streamId, long startOffset, List<S3ObjectMetadata> objectGroup,
public StreamObjectGroupCompactor(long streamId, long streamEpoch, long startOffset, List<S3ObjectMetadata> objectGroup,
long objectId, int dataBlockGroupSizeThreshold, S3Operator s3Operator) {
this.streamId = streamId;
this.streamEpoch = streamEpoch;
this.startOffset = startOffset;
this.objectGroup = objectGroup;
this.objectId = objectId;
Expand Down Expand Up @@ -177,7 +179,8 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
objectSize += indexBlockAndFooter.readableBytes();
writer.write(indexBlockAndFooter.duplicate());
writer.close().get();
return Optional.of(new CompactStreamObjectRequest(objectId, objectSize, streamId, compactedStartOffset, compactedEndOffset, compactedObjectIds));
return Optional.of(new CompactStreamObjectRequest(objectId, objectSize, streamId, streamEpoch,
compactedStartOffset, compactedEndOffset, compactedObjectIds));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public synchronized CompletableFuture<Void> compactStreamObject(CompactStreamObj
long streamId = request.getStreamId();
StreamMetadata stream = streams.get(streamId);
assert stream != null;
if (stream.epoch() != request.getStreamEpoch()) {
throw new IllegalArgumentException("stream " + streamId + " epoch " + stream.epoch() + " is not equal to request " + request.getStreamEpoch());
}
if (stream.endOffset() < request.getEndOffset()) {
throw new IllegalArgumentException("stream " + streamId + " end offset " + stream.endOffset() + " is lesser than request " + request.getEndOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ public class CompactStreamObjectRequest {
private long streamId;
private long startOffset;
private long endOffset;
private final long streamEpoch;
/**
* The source objects' id of the stream object.
*/
private List<Long> sourceObjectIds;

public CompactStreamObjectRequest(long objectId, long objectSize, long streamId, long startOffset, long endOffset,
List<Long> sourceObjectIds) {
public CompactStreamObjectRequest(long objectId, long objectSize, long streamId, long streamEpoch, long startOffset,
long endOffset, List<Long> sourceObjectIds) {
this.objectId = objectId;
this.objectSize = objectSize;
this.streamId = streamId;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.streamEpoch = streamEpoch;
this.sourceObjectIds = sourceObjectIds;
}

Expand Down Expand Up @@ -74,6 +76,10 @@ public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public long getStreamEpoch() {
return streamEpoch;
}

public List<Long> getSourceObjectIds() {
return sourceObjectIds;
}
Expand All @@ -90,6 +96,7 @@ public String toString() {
", streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", streamEpoch=" + streamEpoch +
", sourceObjectIds=" + sourceObjectIds +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testCompact() throws ExecutionException, InterruptedException {
public void testCompact_groupBlocks() throws ExecutionException, InterruptedException {
List<S3ObjectMetadata> objects = prepareData();

CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 14L,
CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 0L, 14L,
objects.subList(0, 2), 5, 5000, s3Operator).compact().get();
// verify compact request
assertEquals(5, req.getObjectId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void testCommitAndCompact() {
assertEquals(1, streamObjectMetadataList.size());

// Compact stream object.
objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0, 20, List.of(1L, 4L))).join();
objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0L, 0, 20, List.of(1L, 4L))).join();
streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join();
assertEquals(1, streamObjectMetadataList.size());
ranges = streamObjectMetadataList.get(0).getOffsetRanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(Co
@Override
public CompletableFuture<Void> compactStreamObject(CompactStreamObjectRequest request) {
// Build S3StreamObject
//TODO: implement stream epoch verification
S3StreamObject.Builder builder = S3StreamObject.newBuilder();
builder.setStreamId(request.getStreamId());
builder.setStartOffset(request.getStartOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void commitStreamSetObject() {

@Test
void commitStreamObject() {
CompactStreamObjectRequest request = new CompactStreamObjectRequest(1L, 1000, 2000, 100, 1000, List.of(10L));
CompactStreamObjectRequest request = new CompactStreamObjectRequest(1L, 1000, 2000, 0L, 100, 1000, List.of(10L));
when(metadataService.compactStreamObject(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

objectManager.compactStreamObject(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public long streamId() {
return streamId;
}

@Override
public long streamEpoch() {
return 0;
}

@Override
public long startOffset() {
return startOffset.get();
Expand Down
Loading