diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 60477bb82..7621d7536 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -88,6 +88,7 @@ public void copyWrite(List dataBlock) { public CompletableFuture forceUpload() { uploadWaitingList(); + writer.copyOnWrite(); return CompletableFuture.allOf(waitingUploadBlockCfs.values().toArray(new CompletableFuture[0])); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java index db514773e..0a42d1ef8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java @@ -66,6 +66,11 @@ public CompletableFuture write(ByteBuf part) { return CompletableFuture.completedFuture(null); } + @Override + public void copyOnWrite() { + + } + @Override public boolean hasBatchingPart() { return false; diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index eda8db035..0091973ce 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -88,6 +88,13 @@ public CompletableFuture write(ByteBuf data) { return objectPart.getFuture(); } + @Override + public void copyOnWrite() { + if (objectPart != null) { + objectPart.copyOnWrite(); + } + } + @Override public boolean hasBatchingPart() { return objectPart != null; @@ -157,7 +164,7 @@ private List genCompleteParts() { class ObjectPart { private final int partNumber = nextPartNumber.getAndIncrement(); - private final CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer(); + private CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer(); private CompletableFuture lastRangeReadCf = CompletableFuture.completedFuture(null); private final CompletableFuture partCf = new CompletableFuture<>(); private long size; @@ -174,6 +181,17 @@ public void write(ByteBuf data) { this.lastRangeReadCf = lastRangeReadCf.thenAccept(nil -> partBuf.addComponent(true, data)); } + public void copyOnWrite() { + int size = partBuf.readableBytes(); + if (size > 0) { + ByteBuf buf = DirectByteBufAlloc.byteBuffer(size); + buf.writeBytes(partBuf.duplicate()); + CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf); + this.partBuf.release(); + this.partBuf = copy; + } + } + public void readAndWrite(String sourcePath, long start, long end) { size += end - start; // TODO: parallel read and sequence add. diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index b58413ab1..89ca78d3d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -62,6 +62,15 @@ public CompletableFuture write(ByteBuf part) { } } + @Override + public void copyOnWrite() { + if (multiPartWriter != null) { + multiPartWriter.copyOnWrite(); + } else { + objectWriter.copyOnWrite(); + } + } + @Override public void copyWrite(String sourcePath, long start, long end) { if (multiPartWriter == null) { @@ -110,6 +119,18 @@ public CompletableFuture write(ByteBuf part) { return cf; } + @Override + public void copyOnWrite() { + int size = data.readableBytes(); + if (size > 0) { + ByteBuf buf = DirectByteBufAlloc.byteBuffer(size); + buf.writeBytes(data.duplicate()); + CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf); + this.data.release(); + this.data = copy; + } + } + @Override public void copyWrite(String sourcePath, long start, long end) { throw new UnsupportedOperationException(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java index 3a89bf87c..70b09de2d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java @@ -53,6 +53,12 @@ public interface Writer { */ CompletableFuture write(ByteBuf part); + /** + * Make a copy of all cached buffer and release old one to prevent outside modification to underlying data and + * avoid holding buffer reference for too long. + */ + void copyOnWrite(); + /** * Copy a part of the object. *