diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index f654df6ee..652b6b593 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -82,7 +82,7 @@ public class S3Storage implements Storage { private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER); private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPool( - 4, ThreadUtils.createThreadFactory("s3-storage-upload-wal", true), LOGGER); + 4, ThreadUtils.createThreadFactory("s3-storage-upload-wal-%d", true), LOGGER); private final Queue backoffRecords = new LinkedBlockingQueue<>(); private final ScheduledFuture drainBackoffTask; diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 5ebbb738c..42ffb7f87 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -83,16 +83,19 @@ public class DefaultS3Operator implements S3Operator { private final S3AsyncClient readS3Client; private final Semaphore inflightWriteLimiter; private final Semaphore inflightReadLimiter; - private final List waitingReadTasks = new LinkedList<>(); private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter; private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3operator", true)); - private final ExecutorService s3ReadCallbackExecutor = Executors.newSingleThreadExecutor( - ThreadUtils.createThreadFactory("s3-read-cb-executor", true)); - private final ExecutorService s3WriteCallbackExecutor = Executors.newSingleThreadExecutor( - ThreadUtils.createThreadFactory("s3-write-cb-executor", true)); + private final ExecutorService readLimiterCallbackExecutor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("s3-read-limiter-cb-executor", true)); + private final ExecutorService writeLimiterCallbackExecutor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("s3-write-limiter-cb-executor", true)); + private final ExecutorService readCallbackExecutor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("s3-read-cb-executor-%d", true)); + private final ExecutorService writeCallbackExecutor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("s3-write-cb-executor-%d", true)); public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey) { this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false); @@ -162,7 +165,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T } else { rangeRead0(path, start, end, cf); } - }, s3ReadCallbackExecutor); + }, readLimiterCallbackExecutor); } else { rangeRead0(path, start, end, cf); } @@ -276,7 +279,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy } else { write0(path, data, cf); } - }, s3WriteCallbackExecutor); + }, writeLimiterCallbackExecutor); } else { write0(path, data, cf); } @@ -402,7 +405,7 @@ public CompletableFuture uploadPart(String path, String uploadId, } else { uploadPart0(path, uploadId, partNumber, data, cf); } - }, s3WriteCallbackExecutor); + }, writeLimiterCallbackExecutor); } else { uploadPart0(path, uploadId, partNumber, data, cf); } @@ -586,6 +589,7 @@ private static S3AsyncClient newS3Client(String endpoint, String region, boolean /** * Acquire read permit, permit will auto release when cf complete. + * * @return retCf the retCf should be used as method return value to ensure release before following operations. */ CompletableFuture acquireReadPermit(CompletableFuture cf) { @@ -595,11 +599,13 @@ CompletableFuture acquireReadPermit(CompletableFuture cf) { CompletableFuture newCf = new CompletableFuture<>(); cf.whenComplete((rst, ex) -> { inflightReadLimiter.release(); - if (ex != null) { - newCf.completeExceptionally(ex); - } else { - newCf.complete(rst); - } + readCallbackExecutor.execute(() -> { + if (ex != null) { + newCf.completeExceptionally(ex); + } else { + newCf.complete(rst); + } + }); }); return newCf; } catch (InterruptedException e) { @@ -610,6 +616,7 @@ CompletableFuture acquireReadPermit(CompletableFuture cf) { /** * Acquire write permit, permit will auto release when cf complete. + * * @return retCf the retCf should be used as method return value to ensure release before following operations. */ CompletableFuture acquireWritePermit(CompletableFuture cf) { @@ -618,11 +625,13 @@ CompletableFuture acquireWritePermit(CompletableFuture cf) { CompletableFuture newCf = new CompletableFuture<>(); cf.whenComplete((rst, ex) -> { inflightWriteLimiter.release(); - if (ex != null) { - newCf.completeExceptionally(ex); - } else { - newCf.complete(rst); - } + writeCallbackExecutor.execute(() -> { + if (ex != null) { + newCf.completeExceptionally(ex); + } else { + newCf.complete(rst); + } + }); }); return newCf; } catch (InterruptedException e) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 0091973ce..8c3e662d5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -38,7 +38,7 @@ public class MultiPartWriter implements Writer { private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024; private final S3Operator operator; private final String path; - private final CompletableFuture uploadIdCf = new CompletableFuture<>(); + final CompletableFuture uploadIdCf = new CompletableFuture<>(); private String uploadId; private final List> parts = new LinkedList<>(); private final AtomicInteger nextPartNumber = new AtomicInteger(1); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java index 1b3e70e1f..2800221d6 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java @@ -54,7 +54,7 @@ class MultiPartWriterTest { private S3AsyncClient s3; private DefaultS3Operator operator; - private Writer writer; + private MultiPartWriter writer; @BeforeEach void setUp() { @@ -84,6 +84,7 @@ void testWrite() throws NoSuchMethodException, InvocationTargetException, Illega return CompletableFuture.completedFuture(builder.build()); }); when(s3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(CompletableFuture.completedFuture(null)); + writer.uploadIdCf.get(); List payloads = List.of( // case 2