Skip to content

Commit

Permalink
fix(s3stream): acquire deadlock (#504)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 30, 2023
1 parent 8ebaf69 commit e3c0c87
Showing 1 changed file with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,12 @@ private int availableReadPermit() {
CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
end = end - 1;
CompletableFuture<ByteBuf> cf = new CompletableFuture<>();
if (!acquireReadPermit(cf)) {
return cf;
CompletableFuture<ByteBuf> retCf = acquireReadPermit(cf);
if (retCf.isDone()) {
return retCf;
}
mergedRangeRead0(path, start, end, cf);
return cf.whenComplete((rst, ex) -> inflightReadLimiter.release());
return retCf;
}

void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
Expand Down Expand Up @@ -264,8 +265,9 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
@Override
public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy throttleStrategy) {
CompletableFuture<Void> cf = new CompletableFuture<>();
if (!acquireWritePermit(cf)) {
return cf;
CompletableFuture<Void> retCf = acquireWritePermit(cf);
if (retCf.isDone()) {
return retCf;
}
if (networkOutboundBandwidthLimiter != null) {
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
Expand All @@ -278,7 +280,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
} else {
write0(path, data, cf);
}
return cf.whenComplete((rst, ex) -> inflightWriteLimiter.release());
return retCf;
}

private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
Expand Down Expand Up @@ -357,11 +359,12 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
@Override
public CompletableFuture<String> createMultipartUpload(String path) {
CompletableFuture<String> cf = new CompletableFuture<>();
if (!acquireWritePermit(cf)) {
return cf;
CompletableFuture<String> retCf = acquireWritePermit(cf);
if (retCf.isDone()) {
return retCf;
}
createMultipartUpload0(path, cf);
return cf.whenComplete((rst, ex) -> inflightWriteLimiter.release());
return retCf;
}

void createMultipartUpload0(String path, CompletableFuture<String> cf) {
Expand All @@ -388,8 +391,9 @@ void createMultipartUpload0(String path, CompletableFuture<String> cf) {
@Override
public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId, int partNumber, ByteBuf data, ThrottleStrategy throttleStrategy) {
CompletableFuture<CompletedPart> cf = new CompletableFuture<>();
if (!acquireWritePermit(cf)) {
return cf;
CompletableFuture<CompletedPart> refCf = acquireWritePermit(cf);
if (refCf.isDone()) {
return refCf;
}
if (networkOutboundBandwidthLimiter != null) {
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
Expand All @@ -403,7 +407,7 @@ public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId,
uploadPart0(path, uploadId, partNumber, data, cf);
}
cf.whenComplete((rst, ex) -> data.release());
return cf.whenComplete((rst, ex) -> inflightWriteLimiter.release());
return refCf;
}

private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf part, CompletableFuture<CompletedPart> cf) {
Expand Down Expand Up @@ -434,11 +438,12 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
@Override
public CompletableFuture<CompletedPart> uploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber) {
CompletableFuture<CompletedPart> cf = new CompletableFuture<>();
if (!acquireWritePermit(cf)) {
return cf;
CompletableFuture<CompletedPart> retCf = acquireWritePermit(cf);
if (retCf.isDone()) {
return retCf;
}
uploadPartCopy0(sourcePath, path, start, end, uploadId, partNumber, cf);
return cf.whenComplete((rst, ex) -> inflightWriteLimiter.release());
return retCf;
}

private void uploadPartCopy0(String sourcePath, String path, long start, long end, String uploadId, int partNumber, CompletableFuture<CompletedPart> cf) {
Expand Down Expand Up @@ -469,11 +474,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
@Override
public CompletableFuture<Void> completeMultipartUpload(String path, String uploadId, List<CompletedPart> parts) {
CompletableFuture<Void> cf = new CompletableFuture<>();
if (!acquireWritePermit(cf)) {
return cf;
CompletableFuture<Void> retCf = acquireWritePermit(cf);
if (retCf.isDone()) {
return retCf;
}
completeMultipartUpload0(path, uploadId, parts, cf);
return cf.whenComplete((rst, ex) -> inflightWriteLimiter.release());
return retCf;
}

public void completeMultipartUpload0(String path, String uploadId, List<CompletedPart> parts, CompletableFuture<Void> cf) {
Expand Down Expand Up @@ -578,24 +584,50 @@ private static S3AsyncClient newS3Client(String endpoint, String region, boolean
return builder.build();
}

boolean acquireReadPermit(CompletableFuture<?> cf) {
/**
* Acquire read permit, permit will auto release when cf complete.
* @return retCf the retCf should be used as method return value to ensure release before following operations.
*/
<T> CompletableFuture<T> acquireReadPermit(CompletableFuture<T> cf) {
// TODO: async acquire?
try {

Check notice on line 593 in s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java

View workflow job for this annotation

GitHub Actions / qodana

Duplicated code fragment

Duplicated code
inflightReadLimiter.acquire();
return true;
CompletableFuture<T> newCf = new CompletableFuture<>();
cf.whenComplete((rst, ex) -> {
inflightReadLimiter.release();
if (ex != null) {
newCf.completeExceptionally(ex);
} else {
newCf.complete(rst);
}
});
return newCf;
} catch (InterruptedException e) {
cf.completeExceptionally(e);
return false;
return cf;
}
}

boolean acquireWritePermit(CompletableFuture<?> cf) {
/**
* Acquire write permit, permit will auto release when cf complete.
* @return retCf the retCf should be used as method return value to ensure release before following operations.
*/
<T> CompletableFuture<T> acquireWritePermit(CompletableFuture<T> cf) {
try {
inflightWriteLimiter.acquire();
return true;
CompletableFuture<T> newCf = new CompletableFuture<>();
cf.whenComplete((rst, ex) -> {
inflightWriteLimiter.release();
if (ex != null) {
newCf.completeExceptionally(ex);
} else {
newCf.complete(rst);
}
});
return newCf;
} catch (InterruptedException e) {
cf.completeExceptionally(e);
return false;
return cf;
}
}

Expand Down

0 comments on commit e3c0c87

Please sign in to comment.