Skip to content

Commit

Permalink
fix(s3stream): deadlock (#508)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 30, 2023
1 parent f06035b commit 60501f2
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class S3Storage implements Storage {
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPool(
4, ThreadUtils.createThreadFactory("s3-storage-upload-wal", true), LOGGER);
4, ThreadUtils.createThreadFactory("s3-storage-upload-wal-%d", true), LOGGER);

private final Queue<WalWriteRequest> backoffRecords = new LinkedBlockingQueue<>();
private final ScheduledFuture<?> drainBackoffTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,19 @@ public class DefaultS3Operator implements S3Operator {
private final S3AsyncClient readS3Client;
private final Semaphore inflightWriteLimiter;
private final Semaphore inflightReadLimiter;

private final List<ReadTask> waitingReadTasks = new LinkedList<>();
private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter;
private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3operator", true));
private final ExecutorService s3ReadCallbackExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("s3-read-cb-executor", true));
private final ExecutorService s3WriteCallbackExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("s3-write-cb-executor", true));
private final ExecutorService readLimiterCallbackExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("s3-read-limiter-cb-executor", true));
private final ExecutorService writeLimiterCallbackExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("s3-write-limiter-cb-executor", true));
private final ExecutorService readCallbackExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("s3-read-cb-executor-%d", true));
private final ExecutorService writeCallbackExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("s3-write-cb-executor-%d", true));

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey) {
this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false);
Expand Down Expand Up @@ -162,7 +165,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
} else {
rangeRead0(path, start, end, cf);
}
}, s3ReadCallbackExecutor);
}, readLimiterCallbackExecutor);
} else {
rangeRead0(path, start, end, cf);
}
Expand Down Expand Up @@ -276,7 +279,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
} else {
write0(path, data, cf);
}
}, s3WriteCallbackExecutor);
}, writeLimiterCallbackExecutor);
} else {
write0(path, data, cf);
}
Expand Down Expand Up @@ -402,7 +405,7 @@ public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId,
} else {
uploadPart0(path, uploadId, partNumber, data, cf);
}
}, s3WriteCallbackExecutor);
}, writeLimiterCallbackExecutor);
} else {
uploadPart0(path, uploadId, partNumber, data, cf);
}
Expand Down Expand Up @@ -586,6 +589,7 @@ private static S3AsyncClient newS3Client(String endpoint, String region, boolean

/**
* Acquire read permit, permit will auto release when cf complete.
*
* @return retCf the retCf should be used as method return value to ensure release before following operations.
*/
<T> CompletableFuture<T> acquireReadPermit(CompletableFuture<T> cf) {
Expand All @@ -595,11 +599,13 @@ <T> CompletableFuture<T> acquireReadPermit(CompletableFuture<T> cf) {
CompletableFuture<T> newCf = new CompletableFuture<>();
cf.whenComplete((rst, ex) -> {
inflightReadLimiter.release();
if (ex != null) {
newCf.completeExceptionally(ex);
} else {
newCf.complete(rst);
}
readCallbackExecutor.execute(() -> {
if (ex != null) {
newCf.completeExceptionally(ex);
} else {
newCf.complete(rst);
}
});
});
return newCf;
} catch (InterruptedException e) {
Expand All @@ -610,6 +616,7 @@ <T> CompletableFuture<T> acquireReadPermit(CompletableFuture<T> cf) {

/**
* Acquire write permit, permit will auto release when cf complete.
*
* @return retCf the retCf should be used as method return value to ensure release before following operations.
*/
<T> CompletableFuture<T> acquireWritePermit(CompletableFuture<T> cf) {
Expand All @@ -618,11 +625,13 @@ <T> CompletableFuture<T> acquireWritePermit(CompletableFuture<T> cf) {
CompletableFuture<T> newCf = new CompletableFuture<>();
cf.whenComplete((rst, ex) -> {
inflightWriteLimiter.release();
if (ex != null) {
newCf.completeExceptionally(ex);
} else {
newCf.complete(rst);
}
writeCallbackExecutor.execute(() -> {
if (ex != null) {
newCf.completeExceptionally(ex);
} else {
newCf.complete(rst);
}
});
});
return newCf;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MultiPartWriter implements Writer {
private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024;
private final S3Operator operator;
private final String path;
private final CompletableFuture<String> uploadIdCf = new CompletableFuture<>();
final CompletableFuture<String> uploadIdCf = new CompletableFuture<>();
private String uploadId;
private final List<CompletableFuture<CompletedPart>> parts = new LinkedList<>();
private final AtomicInteger nextPartNumber = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
class MultiPartWriterTest {
private S3AsyncClient s3;
private DefaultS3Operator operator;
private Writer writer;
private MultiPartWriter writer;

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -84,6 +84,7 @@ void testWrite() throws NoSuchMethodException, InvocationTargetException, Illega
return CompletableFuture.completedFuture(builder.build());
});
when(s3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(CompletableFuture.completedFuture(null));
writer.uploadIdCf.get();

List<ByteBuf> payloads = List.of(
// case 2
Expand Down

0 comments on commit 60501f2

Please sign in to comment.