Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): make CompactionManager#compact public #877

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.12.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.13.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.12.0-SNAPSHOT</version>
<version>0.13.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ public Builder withItem(StreamObjectsCompactionTask.CompactionSummary compaction
return this;
}
this.involvedStreamCount++;
this.sourceObjectsTotalSize += compactionSummary.getTotalObjectSize();
this.sourceObjectsCount += compactionSummary.getSourceObjectsCount();
this.targetObjectsCount += compactionSummary.getTargetObjectCount();
this.smallSizeCopyWriteCount += compactionSummary.getSmallSizeCopyWriteCount();
this.sourceObjectsTotalSize += compactionSummary.totalObjectSize();
this.sourceObjectsCount += compactionSummary.sourceObjectsCount();
this.targetObjectsCount += compactionSummary.targetObjectCount();
this.smallSizeCopyWriteCount += compactionSummary.smallSizeCopyWriteCount();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ private Queue<List<S3StreamObjectMetadataSplitWrapper>> groupEligibleObjects(Lis
}
}
if (groups.isEmpty()) {
long startOffset = streamObjects.get(0).getOffsetRanges().get(0).getStartOffset();
long endOffset = streamObjects.get(streamObjects.size() - 1).getOffsetRanges().get(0).getEndOffset();
long startOffset = streamObjects.get(0).getOffsetRanges().get(0).startOffset();
long endOffset = streamObjects.get(streamObjects.size() - 1).getOffsetRanges().get(0).endOffset();
LOGGER.trace("{} no eligible stream objects found for range [{}, {})", logIdent, startOffset, endOffset);
}
return groups;
Expand Down Expand Up @@ -464,35 +464,35 @@ public CompactionSummary(long streamId, long startOffset, long endOffset, long t
this.smallSizeCopyWriteCount = smallSizeCopyWriteCount;
}

public long getStreamId() {
public long streamId() {
return streamId;
}

public long getStartOffset() {
public long startOffset() {
return startOffset;
}

public long getEndOffset() {
public long endOffset() {
return endOffset;
}

public long getTimeCostInMs() {
public long timeCostInMs() {
return timeCostInMs;
}

public long getTotalObjectSize() {
public long totalObjectSize() {
return totalObjectSize;
}

public long getSourceObjectsCount() {
public long sourceObjectsCount() {
return sourceObjectsCount;
}

public long getTargetObjectCount() {
public long targetObjectCount() {
return targetObjectCount;
}

public long getSmallSizeCopyWriteCount() {
public long smallSizeCopyWriteCount() {
return smallSizeCopyWriteCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ public void shutdown() {
this.uploader.stop();
}

private CompletableFuture<Void> compact() {
public CompletableFuture<Void> compact() {
return this.objectManager.getServerObjects().thenComposeAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::getStreamId).distinct().toList();
.map(StreamOffsetRange::streamId).distinct().toList();
return this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList ->
this.compact(streamMetadataList, objectMetadataList), compactThreadPool);
}, compactThreadPool);
Expand Down Expand Up @@ -287,7 +287,7 @@ public CompletableFuture<Void> forceSplitAll() {
//TODO: deal with metadata delay
this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::getStreamId).distinct().toList();
.map(StreamOffsetRange::streamId).distinct().toList();
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
if (objectMetadataList.isEmpty()) {
logger.info("No stream set objects to force split");
Expand Down Expand Up @@ -488,32 +488,32 @@ boolean isSanityCheckFailed(List<StreamMetadata> streamMetadataList, List<S3Obje
request.getStreamRanges().forEach(o -> compactedStreamOffsetRanges.add(new StreamOffsetRange(o.getStreamId(), o.getStartOffset(), o.getEndOffset())));
request.getStreamObjects().forEach(o -> compactedStreamOffsetRanges.add(new StreamOffsetRange(o.getStreamId(), o.getStartOffset(), o.getEndOffset())));
Map<Long, List<StreamOffsetRange>> sortedStreamOffsetRanges = compactedStreamOffsetRanges.stream()
.collect(Collectors.groupingBy(StreamOffsetRange::getStreamId));
.collect(Collectors.groupingBy(StreamOffsetRange::streamId));
sortedStreamOffsetRanges.replaceAll((k, v) -> sortAndMerge(v));
for (long objectId : request.getCompactedObjectIds()) {
S3ObjectMetadata metadata = objectMetadataMap.get(objectId);
for (StreamOffsetRange streamOffsetRange : metadata.getOffsetRanges()) {
if (!streamMetadataMap.containsKey(streamOffsetRange.getStreamId())) {
if (!streamMetadataMap.containsKey(streamOffsetRange.streamId())) {
// skip non-exist stream
continue;
}
long streamStartOffset = streamMetadataMap.get(streamOffsetRange.getStreamId()).startOffset();
if (streamOffsetRange.getEndOffset() <= streamStartOffset) {
long streamStartOffset = streamMetadataMap.get(streamOffsetRange.streamId()).startOffset();
if (streamOffsetRange.endOffset() <= streamStartOffset) {
// skip stream offset range that has been trimmed
continue;
}
if (streamOffsetRange.getStartOffset() < streamStartOffset) {
if (streamOffsetRange.startOffset() < streamStartOffset) {
// trim stream offset range
streamOffsetRange = new StreamOffsetRange(streamOffsetRange.getStreamId(), streamStartOffset, streamOffsetRange.getEndOffset());
streamOffsetRange = new StreamOffsetRange(streamOffsetRange.streamId(), streamStartOffset, streamOffsetRange.endOffset());
}
if (!sortedStreamOffsetRanges.containsKey(streamOffsetRange.getStreamId())) {
logger.error("Sanity check failed, stream {} is missing after compact", streamOffsetRange.getStreamId());
if (!sortedStreamOffsetRanges.containsKey(streamOffsetRange.streamId())) {
logger.error("Sanity check failed, stream {} is missing after compact", streamOffsetRange.streamId());
return true;
}
boolean contained = false;
for (StreamOffsetRange compactedStreamOffsetRange : sortedStreamOffsetRanges.get(streamOffsetRange.getStreamId())) {
if (streamOffsetRange.getStartOffset() >= compactedStreamOffsetRange.getStartOffset()
&& streamOffsetRange.getEndOffset() <= compactedStreamOffsetRange.getEndOffset()) {
for (StreamOffsetRange compactedStreamOffsetRange : sortedStreamOffsetRanges.get(streamOffsetRange.streamId())) {
if (streamOffsetRange.startOffset() >= compactedStreamOffsetRange.startOffset()
&& streamOffsetRange.endOffset() <= compactedStreamOffsetRange.endOffset()) {
contained = true;
break;
}
Expand All @@ -532,7 +532,7 @@ private List<StreamOffsetRange> sortAndMerge(List<StreamOffsetRange> streamOffse
if (streamOffsetRangeList.size() < 2) {
return streamOffsetRangeList;
}
long streamId = streamOffsetRangeList.get(0).getStreamId();
long streamId = streamOffsetRangeList.get(0).streamId();
Collections.sort(streamOffsetRangeList);
List<StreamOffsetRange> mergedList = new ArrayList<>();
long start = -1L;
Expand All @@ -541,14 +541,14 @@ private List<StreamOffsetRange> sortAndMerge(List<StreamOffsetRange> streamOffse
StreamOffsetRange curr = streamOffsetRangeList.get(i);
StreamOffsetRange next = streamOffsetRangeList.get(i + 1);
if (start == -1) {
start = curr.getStartOffset();
end = curr.getEndOffset();
start = curr.startOffset();
end = curr.endOffset();
}
if (curr.getEndOffset() < next.getStartOffset()) {
mergedList.add(new StreamOffsetRange(curr.getStreamId(), start, end));
start = next.getStartOffset();
if (curr.endOffset() < next.startOffset()) {
mergedList.add(new StreamOffsetRange(curr.streamId(), start, end));
start = next.startOffset();
}
end = next.getEndOffset();
end = next.endOffset();
}
mergedList.add(new StreamOffsetRange(streamId, start, end));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,19 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long st
List<S3ObjectMetadata> streamSetObjectList = streamSetObjects.values()
.stream()
.map(Pair::getRight)
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1)))
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.toList();
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1)))
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.toList();

List<S3ObjectMetadata> result = new ArrayList<>();
result.addAll(streamSetObjectList);
result.addAll(streamObjectList);
result.sort((o1, o2) -> {
long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.streamId() == streamId).findFirst().get().startOffset();
long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.streamId() == streamId).findFirst().get().startOffset();
return Long.compare(startOffset1, startOffset2);
});

Expand All @@ -199,7 +199,7 @@ public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(l
long endOffset, int limit) {
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1)))
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1)))
.limit(limit)
.toList();
return CompletableFuture.completedFuture(streamObjectList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,22 @@ public long startOffset() {
if (offsetRanges == null || offsetRanges.isEmpty()) {
return S3StreamConstant.INVALID_OFFSET;
}
return offsetRanges.get(0).getStartOffset();
return offsetRanges.get(0).startOffset();
}

public long endOffset() {
if (offsetRanges == null || offsetRanges.isEmpty()) {
return S3StreamConstant.INVALID_OFFSET;
}
return offsetRanges.get(offsetRanges.size() - 1).getEndOffset();
return offsetRanges.get(offsetRanges.size() - 1).endOffset();
}

public boolean intersect(long streamId, long startOffset, long endOffset) {
if (offsetRanges == null || offsetRanges.isEmpty()) {
return false;
}
for (StreamOffsetRange offsetRange : offsetRanges) {
if (offsetRange.getStreamId() == streamId && offsetRange.intersect(startOffset, endOffset)) {
if (offsetRange.streamId() == streamId && offsetRange.intersect(startOffset, endOffset)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ public StreamOffsetRange(long streamId, long startOffset, long endOffset) {
this.endOffset = endOffset;
}

public long getStreamId() {
public long streamId() {
return streamId;
}

public long getStartOffset() {
public long startOffset() {
return startOffset;
}

public long getEndOffset() {
public long endOffset() {
return endOffset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ void testCommitAndCompact() {
List<StreamOffsetRange> ranges = streamSetMetadata.getOffsetRanges();
assertEquals(2, ranges.size());

assertEquals(0, ranges.get(0).getStreamId());
assertEquals(0, ranges.get(0).getStartOffset());
assertEquals(3, ranges.get(0).getEndOffset());
assertEquals(0, ranges.get(0).streamId());
assertEquals(0, ranges.get(0).startOffset());
assertEquals(3, ranges.get(0).endOffset());

assertEquals(1, ranges.get(1).getStreamId());
assertEquals(0, ranges.get(1).getStartOffset());
assertEquals(5, ranges.get(1).getEndOffset());
assertEquals(1, ranges.get(1).streamId());
assertEquals(0, ranges.get(1).startOffset());
assertEquals(5, ranges.get(1).endOffset());

List<S3ObjectMetadata> streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join();
assertEquals(1, streamObjectMetadataList.size());
ranges = streamObjectMetadataList.get(0).getOffsetRanges();
assertEquals(1, ranges.size());
assertEquals(2, ranges.get(0).getStreamId());
assertEquals(0, ranges.get(0).getStartOffset());
assertEquals(10, ranges.get(0).getEndOffset());
assertEquals(2, ranges.get(0).streamId());
assertEquals(0, ranges.get(0).startOffset());
assertEquals(10, ranges.get(0).endOffset());

streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 20, 100).join();
assertEquals(2, streamObjectMetadataList.size());
Expand Down Expand Up @@ -153,9 +153,9 @@ void testCommitAndCompact() {
assertEquals(1, streamObjectMetadataList.size());
ranges = streamObjectMetadataList.get(0).getOffsetRanges();
assertEquals(1, ranges.size());
assertEquals(2, ranges.get(0).getStreamId());
assertEquals(0, ranges.get(0).getStartOffset());
assertEquals(20, ranges.get(0).getEndOffset());
assertEquals(2, ranges.get(0).streamId());
assertEquals(0, ranges.get(0).startOffset());
assertEquals(20, ranges.get(0).endOffset());
}

@Test
Expand Down
Loading