Skip to content

Commit

Permalink
feat(s3stream): add copyOnWrite interface to Writer (#497)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
Signed-off-by: Robin Han <[email protected]>
Co-authored-by: Robin Han <[email protected]>
  • Loading branch information
SCNieh and superhx authored Oct 28, 2023
1 parent 12d5d50 commit 0a8a4a6
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void copyWrite(List<StreamDataBlock> dataBlock) {

public CompletableFuture<Void> forceUpload() {
uploadWaitingList();
writer.copyOnWrite();
return CompletableFuture.allOf(waitingUploadBlockCfs.values().toArray(new CompletableFuture[0]));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public CompletableFuture<Void> write(ByteBuf part) {
return CompletableFuture.completedFuture(null);
}

@Override
public void copyOnWrite() {

}

@Override
public boolean hasBatchingPart() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public CompletableFuture<Void> write(ByteBuf data) {
return objectPart.getFuture();
}

@Override
public void copyOnWrite() {
if (objectPart != null) {
objectPart.copyOnWrite();
}
}

@Override
public boolean hasBatchingPart() {
return objectPart != null;
Expand Down Expand Up @@ -157,7 +164,7 @@ private List<CompletedPart> genCompleteParts() {

class ObjectPart {
private final int partNumber = nextPartNumber.getAndIncrement();
private final CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer();
private CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer();
private CompletableFuture<Void> lastRangeReadCf = CompletableFuture.completedFuture(null);
private final CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();
private long size;
Expand All @@ -174,6 +181,17 @@ public void write(ByteBuf data) {
this.lastRangeReadCf = lastRangeReadCf.thenAccept(nil -> partBuf.addComponent(true, data));
}

public void copyOnWrite() {
int size = partBuf.readableBytes();
if (size > 0) {
ByteBuf buf = DirectByteBufAlloc.byteBuffer(size);
buf.writeBytes(partBuf.duplicate());
CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf);
this.partBuf.release();
this.partBuf = copy;
}
}

public void readAndWrite(String sourcePath, long start, long end) {
size += end - start;
// TODO: parallel read and sequence add.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public CompletableFuture<Void> write(ByteBuf part) {
}
}

@Override
public void copyOnWrite() {
if (multiPartWriter != null) {
multiPartWriter.copyOnWrite();
} else {
objectWriter.copyOnWrite();
}
}

@Override
public void copyWrite(String sourcePath, long start, long end) {
if (multiPartWriter == null) {
Expand Down Expand Up @@ -110,6 +119,18 @@ public CompletableFuture<Void> write(ByteBuf part) {
return cf;
}

@Override
public void copyOnWrite() {
int size = data.readableBytes();
if (size > 0) {
ByteBuf buf = DirectByteBufAlloc.byteBuffer(size);
buf.writeBytes(data.duplicate());
CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf);
this.data.release();
this.data = copy;
}
}

@Override
public void copyWrite(String sourcePath, long start, long end) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public interface Writer {
*/
CompletableFuture<Void> write(ByteBuf part);

/**
* Make a copy of all cached buffer and release old one to prevent outside modification to underlying data and
* avoid holding buffer reference for too long.
*/
void copyOnWrite();

/**
* Copy a part of the object.
*
Expand Down

0 comments on commit 0a8a4a6

Please sign in to comment.