Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): delete out-dated object directly during compaction #896

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,29 +312,30 @@ public CompletableFuture<Void> forceSplitAll() {
*
* @param streamMetadataList metadata of opened streams
* @param objectMetadata stream set object to split
* @return List of CompletableFuture of StreamObject
* @param cfs List of CompletableFuture of StreamObject
* @return true if split succeed, false otherwise
*/
private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectMetadata) {
private boolean splitStreamSetObject(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectMetadata, Collection<CompletableFuture<StreamObject>> cfs) {
if (objectMetadata == null) {
return new ArrayList<>();
return false;
}

Map<Long, List<StreamDataBlock>> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList,
Collections.singletonList(objectMetadata), s3Operator, logger);
if (streamDataBlocksMap.isEmpty()) {
// object not exist, metadata is out of date
logger.warn("Object {} not exist, metadata is out of date", objectMetadata.objectId());
return new ArrayList<>();
logger.warn("Read index for object {} failed", objectMetadata.objectId());
return false;
}
List<StreamDataBlock> streamDataBlocks = streamDataBlocksMap.get(objectMetadata.objectId());
if (streamDataBlocks.isEmpty()) {
// object is empty, metadata is out of date
logger.warn("Object {} is empty, metadata is out of date", objectMetadata.objectId());
return new ArrayList<>();
logger.info("Object {} is out of date, will be deleted after compaction", objectMetadata.objectId());
return true;
}

return groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks);
cfs.addAll(groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks));
return true;
}

Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3ObjectMetadata objectMetadata,
Expand Down Expand Up @@ -414,8 +415,9 @@ Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3Obje
CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectToSplit)
throws CompletionException {
Collection<CompletableFuture<StreamObject>> cfs = splitStreamSetObject(streamMetadataList, objectToSplit);
if (cfs.isEmpty()) {
List<CompletableFuture<StreamObject>> cfs = new ArrayList<>();
boolean status = splitStreamSetObject(streamMetadataList, objectToSplit, cfs);
if (!status) {
logger.error("Force split object {} failed, no stream object generated", objectToSplit.objectId());
return null;
}
Expand Down Expand Up @@ -468,6 +470,12 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta
executeCompactionPlans(request, compactionPlans, objectsToCompact);
compactionPlans.forEach(c -> c.streamDataBlocksMap().values().forEach(v -> v.forEach(b -> compactedObjectIds.add(b.getObjectId()))));

// compact out-dated objects directly
streamDataBlockMap.entrySet().stream().filter(e -> e.getValue().isEmpty()).forEach(e -> {
logger.info("Object {} is out of date, will be deleted after compaction", e.getKey());
compactedObjectIds.add(e.getKey());
});

request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds));
List<S3ObjectMetadata> compactedObjectMetadata = objectsToCompact.stream()
.filter(e -> compactedObjectIds.contains(e.objectId())).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,37 @@ public void testForceSplit() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request));
}

@Test
public void testForceSplitWithOutDatedObject() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 999, 9999, StreamState.OPENED),
new StreamMetadata(STREAM_1, 0, 999, 9999, StreamState.OPENED),
new StreamMetadata(STREAM_2, 0, 999, 9999, StreamState.OPENED))));

List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0);
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);

CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());

request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());

request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());
}

@Test
public void testForceSplitWithException() {
S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
Expand Down Expand Up @@ -320,6 +351,26 @@ public void testCompactionWithDataTrimmed4() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactWithOutdatedObject() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 15, 20, StreamState.OPENED),
new StreamMetadata(STREAM_1, 0, 60, 500, StreamState.OPENED),
new StreamMetadata(STREAM_2, 0, 60, 270, StreamState.OPENED))));
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST);

assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds());
assertEquals(OBJECT_0, request.getOrderId());
assertTrue(request.getObjectId() > OBJECT_2);
request.getStreamObjects().forEach(s -> assertTrue(s.getObjectId() > OBJECT_2));
assertEquals(2, request.getStreamObjects().size());
assertEquals(2, request.getStreamRanges().size());

Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactWithNonExistStream() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
Expand Down