Skip to content

Commit

Permalink
feat(s3stream): force split one wal at a time, limit memory consumpti… (
Browse files Browse the repository at this point in the history
#501)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Oct 29, 2023
1 parent 54225fe commit 05f702d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.operator.Writer;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.LogContext;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand All @@ -48,6 +49,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -75,6 +77,8 @@ public class CompactionManager {
private final long networkInboundBandwidth;
private final boolean s3ObjectLogEnable;

private final Semaphore forceSplitLimit = new Semaphore(500 * 1024 * 1024);

public CompactionManager(Config config, ObjectManager objectManager, StreamManager streamManager, S3Operator s3Operator) {
String logPrefix = String.format("[CompactionManager id=%d] ", config.brokerId());
this.logger = new LogContext(logPrefix).logger(CompactionManager.class);
Expand Down Expand Up @@ -284,6 +288,11 @@ Collection<CompletableFuture<StreamObject>> splitWALObjects(List<StreamMetadata>
return new ArrayList<>();
}

//TODO: temp solution, optimize later
// take first object
Set<Long> objectIds = objectMetadataList.stream().map(S3ObjectMetadata::objectId).collect(Collectors.toSet());
objectMetadataList = Collections.singletonList(objectMetadataList.get(0));

Map<Long, List<StreamDataBlock>> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, objectMetadataList, s3Operator);
List<Pair<List<StreamDataBlock>, CompletableFuture<StreamObject>>> groupedDataBlocks = new ArrayList<>();
int totalStreamObjectNum = 0;
Expand Down Expand Up @@ -312,7 +321,7 @@ Collection<CompletableFuture<StreamObject>> splitWALObjects(List<StreamMetadata>
totalStreamObjectNum += groupedStreamDataBlocks.size();
}
// add objects that are excluded from split
excludedObjects.addAll(streamDataBlocksMap.keySet().stream().filter(e -> !includedObjects.contains(e)).collect(Collectors.toSet()));
excludedObjects.addAll(objectIds.stream().filter(e -> !includedObjects.contains(e)).collect(Collectors.toSet()));
logger.info("Force split {} WAL objects, expect to generate {} stream objects, max stream objects {}, objects excluded: {}",
objectMetadataList.size(), groupedDataBlocks.size(), maxStreamObjectNumPerCommit, excludedObjects);
if (groupedDataBlocks.isEmpty()) {
Expand All @@ -324,6 +333,18 @@ Collection<CompletableFuture<StreamObject>> splitWALObjects(List<StreamMetadata>
for (Pair<List<StreamDataBlock>, CompletableFuture<StreamObject>> pair : groupedDataBlocks) {
List<StreamDataBlock> streamDataBlocks = pair.getKey();
DataBlockWriter writer = new DataBlockWriter(objectId, s3Operator, kafkaConfig.s3ObjectPartSize());

StreamDataBlock start = streamDataBlocks.get(0);
StreamDataBlock end = streamDataBlocks.get(streamDataBlocks.size() - 1);
final int dataSize = (int) (end.getBlockEndPosition() + end.getBlockSize() - start.getBlockStartPosition());
if (dataSize < Writer.MIN_PART_SIZE) {
try {
forceSplitLimit.acquire(dataSize);
} catch (InterruptedException ignored) {

}
}

writer.copyWrite(streamDataBlocks);
final long objectIdFinal = objectId;
writer.close().thenAccept(v -> {
Expand All @@ -334,6 +355,9 @@ Collection<CompletableFuture<StreamObject>> splitWALObjects(List<StreamMetadata>
streamObject.setEndOffset(streamDataBlocks.get(streamDataBlocks.size() - 1).getEndOffset());
streamObject.setObjectSize(writer.size());
pair.getValue().complete(streamObject);
if (dataSize < Writer.MIN_PART_SIZE) {
forceSplitLimit.release(dataSize);
}
});
objectId++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,37 @@ public void testForceSplit() {
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
when(config.s3ObjectCompactionForceSplitPeriod()).thenReturn(0);
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, new HashSet<>());

Set<Long> excludedIds = new HashSet<>();
CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedIds);
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds());
Assertions.assertEquals(7, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request));
Assertions.assertEquals(Set.of(OBJECT_1, OBJECT_2), excludedIds);
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
Assertions.assertEquals(3, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request));


s3ObjectMetadata = s3ObjectMetadata.stream().filter(e -> excludedIds.contains(e.objectId())).collect(Collectors.toList());
excludedIds.clear();
request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedIds);
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(Set.of(OBJECT_2), excludedIds);
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
Assertions.assertEquals(2, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request));

s3ObjectMetadata = s3ObjectMetadata.stream().filter(e -> excludedIds.contains(e.objectId())).collect(Collectors.toList());
excludedIds.clear();
request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedIds);
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertTrue(excludedIds.isEmpty());
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
Assertions.assertEquals(2, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request));
}

@Test
@Disabled
public void testForceSplitWithLimit() {
when(config.s3ObjectMaxStreamObjectNumPerCommit()).thenReturn(3);
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
Expand Down

0 comments on commit 05f702d

Please sign in to comment.