Skip to content

Commit

Permalink
feat(s3stream): support stream epoch when commit stream object (#926)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Feb 8, 2024
1 parent 2932dc3 commit 90d61c9
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 9 deletions.
5 changes: 5 additions & 0 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public interface Stream {
*/
long streamId();

/**
* Get stream epoch.
*/
long streamEpoch();

/**
* Get stream start offset.
*/
Expand Down
5 changes: 5 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long streamId() {
return this.streamId;
}

@Override
public long streamEpoch() {
return this.epoch;
}

@Override
public long startOffset() {
return this.startOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ void compact0() throws ExecutionException, InterruptedException {
continue;
}
long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, startOffset,
objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, stream.streamEpoch(),
startOffset, objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
if (requestOpt.isPresent()) {
CompactStreamObjectRequest request = requestOpt.get();
objectManager.compactStreamObject(request).get();
Expand All @@ -97,15 +97,17 @@ List<List<S3ObjectMetadata>> group() throws ExecutionException, InterruptedExcep
static class StreamObjectGroupCompactor {
private final List<S3ObjectMetadata> objectGroup;
private final long streamId;
private final long streamEpoch;
private final long startOffset;
// compact object group to the new object
private final long objectId;
private final S3Operator s3Operator;
private final int dataBlockGroupSizeThreshold;

public StreamObjectGroupCompactor(long streamId, long startOffset, List<S3ObjectMetadata> objectGroup,
public StreamObjectGroupCompactor(long streamId, long streamEpoch, long startOffset, List<S3ObjectMetadata> objectGroup,
long objectId, int dataBlockGroupSizeThreshold, S3Operator s3Operator) {
this.streamId = streamId;
this.streamEpoch = streamEpoch;
this.startOffset = startOffset;
this.objectGroup = objectGroup;
this.objectId = objectId;
Expand Down Expand Up @@ -177,7 +179,8 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
objectSize += indexBlockAndFooter.readableBytes();
writer.write(indexBlockAndFooter.duplicate());
writer.close().get();
return Optional.of(new CompactStreamObjectRequest(objectId, objectSize, streamId, compactedStartOffset, compactedEndOffset, compactedObjectIds));
return Optional.of(new CompactStreamObjectRequest(objectId, objectSize, streamId, streamEpoch,
compactedStartOffset, compactedEndOffset, compactedObjectIds));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public synchronized CompletableFuture<Void> compactStreamObject(CompactStreamObj
long streamId = request.getStreamId();
StreamMetadata stream = streams.get(streamId);
assert stream != null;
if (stream.epoch() != request.getStreamEpoch()) {
throw new IllegalArgumentException("stream " + streamId + " epoch " + stream.epoch() + " is not equal to request " + request.getStreamEpoch());
}
if (stream.endOffset() < request.getEndOffset()) {
throw new IllegalArgumentException("stream " + streamId + " end offset " + stream.endOffset() + " is lesser than request " + request.getEndOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ public class CompactStreamObjectRequest {
private long streamId;
private long startOffset;
private long endOffset;
private final long streamEpoch;
/**
* The source objects' id of the stream object.
*/
private List<Long> sourceObjectIds;

public CompactStreamObjectRequest(long objectId, long objectSize, long streamId, long startOffset, long endOffset,
List<Long> sourceObjectIds) {
public CompactStreamObjectRequest(long objectId, long objectSize, long streamId, long streamEpoch, long startOffset,
long endOffset, List<Long> sourceObjectIds) {
this.objectId = objectId;
this.objectSize = objectSize;
this.streamId = streamId;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.streamEpoch = streamEpoch;
this.sourceObjectIds = sourceObjectIds;
}

Expand Down Expand Up @@ -74,6 +76,10 @@ public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public long getStreamEpoch() {
return streamEpoch;
}

public List<Long> getSourceObjectIds() {
return sourceObjectIds;
}
Expand All @@ -90,6 +96,7 @@ public String toString() {
", streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", streamEpoch=" + streamEpoch +
", sourceObjectIds=" + sourceObjectIds +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testCompact() throws ExecutionException, InterruptedException {
public void testCompact_groupBlocks() throws ExecutionException, InterruptedException {
List<S3ObjectMetadata> objects = prepareData();

CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 14L,
CompactStreamObjectRequest req = new StreamObjectCompactor.StreamObjectGroupCompactor(streamId, 0L, 14L,
objects.subList(0, 2), 5, 5000, s3Operator).compact().get();
// verify compact request
assertEquals(5, req.getObjectId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void testCommitAndCompact() {
assertEquals(1, streamObjectMetadataList.size());

// Compact stream object.
objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0, 20, List.of(1L, 4L))).join();
objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0L, 0, 20, List.of(1L, 4L))).join();
streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join();
assertEquals(1, streamObjectMetadataList.size());
ranges = streamObjectMetadataList.get(0).getOffsetRanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(Co
@Override
public CompletableFuture<Void> compactStreamObject(CompactStreamObjectRequest request) {
// Build S3StreamObject
//TODO: implement stream epoch verification
S3StreamObject.Builder builder = S3StreamObject.newBuilder();
builder.setStreamId(request.getStreamId());
builder.setStartOffset(request.getStartOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void commitStreamSetObject() {

@Test
void commitStreamObject() {
CompactStreamObjectRequest request = new CompactStreamObjectRequest(1L, 1000, 2000, 100, 1000, List.of(10L));
CompactStreamObjectRequest request = new CompactStreamObjectRequest(1L, 1000, 2000, 0L, 100, 1000, List.of(10L));
when(metadataService.compactStreamObject(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

objectManager.compactStreamObject(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public long streamId() {
return streamId;
}

@Override
public long streamEpoch() {
return 0;
}

@Override
public long startOffset() {
return startOffset.get();
Expand Down

0 comments on commit 90d61c9

Please sign in to comment.