From 748daa60e5f8f4ee68e40e882896119d7d2e2151 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sun, 29 Oct 2023 19:09:32 +0800 Subject: [PATCH] feat(s3stream): isolate s3 readwrite (#499) Signed-off-by: Robin Han --- pom.xml | 2 +- s3stream/pom.xml | 2 +- .../stream/s3/operator/DefaultS3Operator.java | 135 ++++++++++++++---- 3 files changed, 107 insertions(+), 32 deletions(-) diff --git a/pom.xml b/pom.xml index fe0695ac7..5b57cff23 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ 32.0.1-jre 2.0.9 2.2 - 0.1.17-SNAPSHOT + 0.1.18-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index bee8b5e5a..31935e7b1 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.1.17-SNAPSHOT + 0.1.18-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index f126d7977..b69dd9ebe 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -78,7 +78,8 @@ public class DefaultS3Operator implements S3Operator { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3Operator.class); private final String bucket; - private final S3AsyncClient s3; + private final S3AsyncClient writeS3Client; + private final S3AsyncClient readS3Client; private final List waitingReadTasks = new LinkedList<>(); private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter; private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter; @@ -90,27 +91,15 @@ public class DefaultS3Operator implements S3Operator { ThreadUtils.createThreadFactory("s3-write-cb-executor", true)); public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey) { - this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null); + this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false); } public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey, - AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter) { + AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) { this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter; this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter; - S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region)); - if (StringUtils.isNotBlank(endpoint)) { - builder.endpointOverride(URI.create(endpoint)); - } - builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle)); - builder.credentialsProvider(AwsCredentialsProviderChain.builder() - .reuseLastProviderEnabled(true) - .credentialsProviders( - () -> AwsBasicCredentials.create(accessKey, secretKey), - InstanceProfileCredentialsProvider.create(), - AnonymousCredentialsProvider.create() - ).build() - ); - this.s3 = builder.build(); + this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey); + this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client; this.bucket = bucket; scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS); checkConfig(); @@ -118,14 +107,19 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean LOGGER.info("S3Operator init with endpoint={} region={} bucket={}", endpoint, region, bucket); } + public static Builder builder() { + return new Builder(); + } + // used for test only. - DefaultS3Operator(S3AsyncClient s3, String bucket) { - this(s3, bucket, false); + DefaultS3Operator(S3AsyncClient s3Client, String bucket) { + this(s3Client, bucket, false); } // used for test only. - DefaultS3Operator(S3AsyncClient s3, String bucket, boolean manualMergeRead) { - this.s3 = s3; + DefaultS3Operator(S3AsyncClient s3Client, String bucket, boolean manualMergeRead) { + this.writeS3Client = s3Client; + this.readS3Client = s3Client; this.bucket = bucket; this.networkInboundBandwidthLimiter = null; this.networkOutboundBandwidthLimiter = null; @@ -137,7 +131,10 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean @Override public void close() { // TODO: complete in-flight CompletableFuture with ClosedException. - s3.close(); + writeS3Client.close(); + if (readS3Client != writeS3Client) { + readS3Client.close(); + } scheduler.shutdown(); } @@ -232,7 +229,7 @@ CompletableFuture mergedRangeRead(String path, long start, long end) { void mergedRangeRead0(String path, long start, long end, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build(); - s3.getObject(request, AsyncResponseTransformer.toPublisher()) + readS3Client.getObject(request, AsyncResponseTransformer.toPublisher()) .thenAccept(responsePublisher -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed()); @@ -275,7 +272,7 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { int objectSize = data.readableBytes(); PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); - s3.putObject(request, body).thenAccept(putObjectResponse -> { + writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationTime.update(timerUtil.elapsed()); LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsed()); @@ -305,7 +302,7 @@ public Writer writer(String path, ThrottleStrategy throttleStrategy) { public CompletableFuture delete(String path) { TimerUtil timerUtil = new TimerUtil(); DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build(); - return s3.deleteObject(request).thenAccept(deleteObjectResponse -> { + return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationTime.update(timerUtil.elapsed()); LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsed()); @@ -330,7 +327,7 @@ public CompletableFuture> delete(List objectKeys) { .delete(Delete.builder().objects(toDeleteKeys).build()) .build(); // TODO: handle not exist object, should we regard it as deleted or ignore it. - return this.s3.deleteObjects(request).thenApply(resp -> { + return this.writeS3Client.deleteObjects(request).thenApply(resp -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationTime.update(timerUtil.elapsed()); LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsed()); @@ -353,7 +350,7 @@ public CompletableFuture createMultipartUpload(String path) { void createMultipartUpload0(String path, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build(); - s3.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { + writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed()); cf.complete(createMultipartUploadResponse.uploadId()); @@ -394,7 +391,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers()); UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId) .partNumber(partNumber).build(); - CompletableFuture uploadPartCf = s3.uploadPart(request, body); + CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); uploadPartCf.thenAccept(uploadPartResponse -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationTime.update(timerUtil.elapsed()); @@ -426,7 +423,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en long inclusiveEnd = end - 1; UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); - s3.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> { + writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationTime.update(timerUtil.elapsed()); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber) @@ -458,7 +455,7 @@ public void completeMultipartUpload0(String path, String uploadId, List { + writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed()); cf.complete(null); @@ -538,6 +535,22 @@ private void checkAvailable() { } } + private static S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey, String secretKey) { + S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region)); + if (StringUtils.isNotBlank(endpoint)) { + builder.endpointOverride(URI.create(endpoint)); + } + builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle)); + builder.credentialsProvider(AwsCredentialsProviderChain.builder() + .reuseLastProviderEnabled(true) + .credentialsProviders( + () -> AwsBasicCredentials.create(accessKey, secretKey), + InstanceProfileCredentialsProvider.create(), + AnonymousCredentialsProvider.create() + ).build() + ); + return builder.build(); + } static class MergedReadTask { static final int MAX_MERGE_READ_SIZE = 16 * 1024 * 1024; @@ -593,4 +606,66 @@ public ReadTask(String path, long start, long end, CompletableFuture cf this.cf = cf; } } + + public static class Builder { + private String endpoint; + private String region; + private String bucket; + private boolean forcePathStyle; + private String accessKey; + private String secretKey; + private AsyncNetworkBandwidthLimiter inboundLimiter; + private AsyncNetworkBandwidthLimiter outboundLimiter; + private boolean readWriteIsolate; + + public Builder endpoint(String endpoint) { + this.endpoint = endpoint; + return this; + } + + public Builder region(String region) { + this.region = region; + return this; + } + + public Builder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + public Builder forcePathStyle(boolean forcePathStyle) { + this.forcePathStyle = forcePathStyle; + return this; + } + + public Builder accessKey(String accessKey) { + this.accessKey = accessKey; + return this; + } + + public Builder secretKey(String secretKey) { + this.secretKey = secretKey; + return this; + } + + public Builder inboundLimiter(AsyncNetworkBandwidthLimiter inboundLimiter) { + this.inboundLimiter = inboundLimiter; + return this; + } + + public Builder outboundLimiter(AsyncNetworkBandwidthLimiter outboundLimiter) { + this.outboundLimiter = outboundLimiter; + return this; + } + + public Builder readWriteIsolate(boolean readWriteIsolate) { + this.readWriteIsolate = readWriteIsolate; + return this; + } + + public DefaultS3Operator build() { + return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, + inboundLimiter, outboundLimiter, readWriteIsolate); + } + } }