Skip to content

Commit

Permalink
feat(s3stream): isolate s3 readwrite (#499)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 29, 2023
1 parent 61db39b commit 748daa6
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.17-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.18-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.17-SNAPSHOT</version>
<version>0.1.18-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
public class DefaultS3Operator implements S3Operator {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3Operator.class);
private final String bucket;
private final S3AsyncClient s3;
private final S3AsyncClient writeS3Client;
private final S3AsyncClient readS3Client;
private final List<ReadTask> waitingReadTasks = new LinkedList<>();
private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter;
private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter;
Expand All @@ -90,42 +91,35 @@ public class DefaultS3Operator implements S3Operator {
ThreadUtils.createThreadFactory("s3-write-cb-executor", true));

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey) {
this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null);
this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false);
}

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, String secretKey,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter) {
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(
() -> AwsBasicCredentials.create(accessKey, secretKey),
InstanceProfileCredentialsProvider.create(),
AnonymousCredentialsProvider.create()
).build()
);
this.s3 = builder.build();
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client;
this.bucket = bucket;
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
checkConfig();
checkAvailable();
LOGGER.info("S3Operator init with endpoint={} region={} bucket={}", endpoint, region, bucket);
}

public static Builder builder() {
return new Builder();
}

// used for test only.
DefaultS3Operator(S3AsyncClient s3, String bucket) {
this(s3, bucket, false);
DefaultS3Operator(S3AsyncClient s3Client, String bucket) {
this(s3Client, bucket, false);
}

// used for test only.
DefaultS3Operator(S3AsyncClient s3, String bucket, boolean manualMergeRead) {
this.s3 = s3;
DefaultS3Operator(S3AsyncClient s3Client, String bucket, boolean manualMergeRead) {
this.writeS3Client = s3Client;
this.readS3Client = s3Client;
this.bucket = bucket;
this.networkInboundBandwidthLimiter = null;
this.networkOutboundBandwidthLimiter = null;
Expand All @@ -137,7 +131,10 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
@Override
public void close() {
// TODO: complete in-flight CompletableFuture with ClosedException.
s3.close();
writeS3Client.close();
if (readS3Client != writeS3Client) {
readS3Client.close();
}
scheduler.shutdown();
}

Expand Down Expand Up @@ -232,7 +229,7 @@ CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
TimerUtil timerUtil = new TimerUtil();
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();
s3.getObject(request, AsyncResponseTransformer.toPublisher())
readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
Expand Down Expand Up @@ -275,7 +272,7 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
int objectSize = data.readableBytes();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
s3.putObject(request, body).thenAccept(putObjectResponse -> {
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationTime.update(timerUtil.elapsed());
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsed());
Expand Down Expand Up @@ -305,7 +302,7 @@ public Writer writer(String path, ThrottleStrategy throttleStrategy) {
public CompletableFuture<Void> delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return s3.deleteObject(request).thenAccept(deleteObjectResponse -> {
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationTime.update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsed());
Expand All @@ -330,7 +327,7 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
.delete(Delete.builder().objects(toDeleteKeys).build())
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.s3.deleteObjects(request).thenApply(resp -> {
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationTime.update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsed());
Expand All @@ -353,7 +350,7 @@ public CompletableFuture<String> createMultipartUpload(String path) {
void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
s3.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed());
cf.complete(createMultipartUploadResponse.uploadId());
Expand Down Expand Up @@ -394,7 +391,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
CompletableFuture<UploadPartResponse> uploadPartCf = s3.uploadPart(request, body);
CompletableFuture<UploadPartResponse> uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationTime.update(timerUtil.elapsed());
Expand Down Expand Up @@ -426,7 +423,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
long inclusiveEnd = end - 1;
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
s3.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationTime.update(timerUtil.elapsed());
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
Expand Down Expand Up @@ -458,7 +455,7 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build();
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

s3.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed());
cf.complete(null);
Expand Down Expand Up @@ -538,6 +535,22 @@ private void checkAvailable() {
}
}

private static S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey, String secretKey) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(
() -> AwsBasicCredentials.create(accessKey, secretKey),
InstanceProfileCredentialsProvider.create(),
AnonymousCredentialsProvider.create()
).build()
);
return builder.build();
}

static class MergedReadTask {
static final int MAX_MERGE_READ_SIZE = 16 * 1024 * 1024;
Expand Down Expand Up @@ -593,4 +606,66 @@ public ReadTask(String path, long start, long end, CompletableFuture<ByteBuf> cf
this.cf = cf;
}
}

public static class Builder {
private String endpoint;
private String region;
private String bucket;
private boolean forcePathStyle;
private String accessKey;
private String secretKey;
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;

public Builder endpoint(String endpoint) {
this.endpoint = endpoint;
return this;
}

public Builder region(String region) {
this.region = region;
return this;
}

public Builder bucket(String bucket) {
this.bucket = bucket;
return this;
}

public Builder forcePathStyle(boolean forcePathStyle) {
this.forcePathStyle = forcePathStyle;
return this;
}

public Builder accessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public Builder secretKey(String secretKey) {
this.secretKey = secretKey;
return this;
}

public Builder inboundLimiter(AsyncNetworkBandwidthLimiter inboundLimiter) {
this.inboundLimiter = inboundLimiter;
return this;
}

public Builder outboundLimiter(AsyncNetworkBandwidthLimiter outboundLimiter) {
this.outboundLimiter = outboundLimiter;
return this;
}

public Builder readWriteIsolate(boolean readWriteIsolate) {
this.readWriteIsolate = readWriteIsolate;
return this;
}

public DefaultS3Operator build() {
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey,
inboundLimiter, outboundLimiter, readWriteIsolate);
}
}
}

0 comments on commit 748daa6

Please sign in to comment.