diff --git a/pom.xml b/pom.xml
index fe0695ac7..5b57cff23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
32.0.1-jre
2.0.9
2.2
- 0.1.17-SNAPSHOT
+ 0.1.18-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index bee8b5e5a..31935e7b1 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.1.17-SNAPSHOT
+ 0.1.18-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index f126d7977..b69dd9ebe 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -78,7 +78,8 @@
public class DefaultS3Operator implements S3Operator {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3Operator.class);
private final String bucket;
- private final S3AsyncClient s3;
+ private final S3AsyncClient writeS3Client;
+ private final S3AsyncClient readS3Client;
private final List waitingReadTasks = new LinkedList<>();
private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter;
private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter;
@@ -90,27 +91,15 @@ public class DefaultS3Operator implements S3Operator {
ThreadUtils.createThreadFactory("s3-write-cb-executor", true));
public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey) {
- this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null);
+ this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false);
}
public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey,
- AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter) {
+ AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
- S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
- if (StringUtils.isNotBlank(endpoint)) {
- builder.endpointOverride(URI.create(endpoint));
- }
- builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
- builder.credentialsProvider(AwsCredentialsProviderChain.builder()
- .reuseLastProviderEnabled(true)
- .credentialsProviders(
- () -> AwsBasicCredentials.create(accessKey, secretKey),
- InstanceProfileCredentialsProvider.create(),
- AnonymousCredentialsProvider.create()
- ).build()
- );
- this.s3 = builder.build();
+ this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey);
+ this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client;
this.bucket = bucket;
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
checkConfig();
@@ -118,14 +107,19 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
LOGGER.info("S3Operator init with endpoint={} region={} bucket={}", endpoint, region, bucket);
}
+ public static Builder builder() {
+ return new Builder();
+ }
+
// used for test only.
- DefaultS3Operator(S3AsyncClient s3, String bucket) {
- this(s3, bucket, false);
+ DefaultS3Operator(S3AsyncClient s3Client, String bucket) {
+ this(s3Client, bucket, false);
}
// used for test only.
- DefaultS3Operator(S3AsyncClient s3, String bucket, boolean manualMergeRead) {
- this.s3 = s3;
+ DefaultS3Operator(S3AsyncClient s3Client, String bucket, boolean manualMergeRead) {
+ this.writeS3Client = s3Client;
+ this.readS3Client = s3Client;
this.bucket = bucket;
this.networkInboundBandwidthLimiter = null;
this.networkOutboundBandwidthLimiter = null;
@@ -137,7 +131,10 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
@Override
public void close() {
// TODO: complete in-flight CompletableFuture with ClosedException.
- s3.close();
+ writeS3Client.close();
+ if (readS3Client != writeS3Client) {
+ readS3Client.close();
+ }
scheduler.shutdown();
}
@@ -232,7 +229,7 @@ CompletableFuture mergedRangeRead(String path, long start, long end) {
void mergedRangeRead0(String path, long start, long end, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();
- s3.getObject(request, AsyncResponseTransformer.toPublisher())
+ readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
@@ -275,7 +272,7 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) {
int objectSize = data.readableBytes();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
- s3.putObject(request, body).thenAccept(putObjectResponse -> {
+ writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationTime.update(timerUtil.elapsed());
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsed());
@@ -305,7 +302,7 @@ public Writer writer(String path, ThrottleStrategy throttleStrategy) {
public CompletableFuture delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
- return s3.deleteObject(request).thenAccept(deleteObjectResponse -> {
+ return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationTime.update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsed());
@@ -330,7 +327,7 @@ public CompletableFuture> delete(List objectKeys) {
.delete(Delete.builder().objects(toDeleteKeys).build())
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
- return this.s3.deleteObjects(request).thenApply(resp -> {
+ return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationTime.update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsed());
@@ -353,7 +350,7 @@ public CompletableFuture createMultipartUpload(String path) {
void createMultipartUpload0(String path, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
- s3.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
+ writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed());
cf.complete(createMultipartUploadResponse.uploadId());
@@ -394,7 +391,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
- CompletableFuture uploadPartCf = s3.uploadPart(request, body);
+ CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationTime.update(timerUtil.elapsed());
@@ -426,7 +423,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
long inclusiveEnd = end - 1;
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
- s3.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
+ writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationTime.update(timerUtil.elapsed());
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
@@ -458,7 +455,7 @@ public void completeMultipartUpload0(String path, String uploadId, List {
+ writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed());
cf.complete(null);
@@ -538,6 +535,22 @@ private void checkAvailable() {
}
}
+ private static S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey, String secretKey) {
+ S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
+ if (StringUtils.isNotBlank(endpoint)) {
+ builder.endpointOverride(URI.create(endpoint));
+ }
+ builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
+ builder.credentialsProvider(AwsCredentialsProviderChain.builder()
+ .reuseLastProviderEnabled(true)
+ .credentialsProviders(
+ () -> AwsBasicCredentials.create(accessKey, secretKey),
+ InstanceProfileCredentialsProvider.create(),
+ AnonymousCredentialsProvider.create()
+ ).build()
+ );
+ return builder.build();
+ }
static class MergedReadTask {
static final int MAX_MERGE_READ_SIZE = 16 * 1024 * 1024;
@@ -593,4 +606,66 @@ public ReadTask(String path, long start, long end, CompletableFuture cf
this.cf = cf;
}
}
+
+ public static class Builder {
+ private String endpoint;
+ private String region;
+ private String bucket;
+ private boolean forcePathStyle;
+ private String accessKey;
+ private String secretKey;
+ private AsyncNetworkBandwidthLimiter inboundLimiter;
+ private AsyncNetworkBandwidthLimiter outboundLimiter;
+ private boolean readWriteIsolate;
+
+ public Builder endpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ public Builder region(String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder bucket(String bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public Builder forcePathStyle(boolean forcePathStyle) {
+ this.forcePathStyle = forcePathStyle;
+ return this;
+ }
+
+ public Builder accessKey(String accessKey) {
+ this.accessKey = accessKey;
+ return this;
+ }
+
+ public Builder secretKey(String secretKey) {
+ this.secretKey = secretKey;
+ return this;
+ }
+
+ public Builder inboundLimiter(AsyncNetworkBandwidthLimiter inboundLimiter) {
+ this.inboundLimiter = inboundLimiter;
+ return this;
+ }
+
+ public Builder outboundLimiter(AsyncNetworkBandwidthLimiter outboundLimiter) {
+ this.outboundLimiter = outboundLimiter;
+ return this;
+ }
+
+ public Builder readWriteIsolate(boolean readWriteIsolate) {
+ this.readWriteIsolate = readWriteIsolate;
+ return this;
+ }
+
+ public DefaultS3Operator build() {
+ return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey,
+ inboundLimiter, outboundLimiter, readWriteIsolate);
+ }
+ }
}