Skip to content

Commit

Permalink
Merge pull request #115 from nisuraaa/multipart-backport
Browse files Browse the repository at this point in the history
[2201.4.x] Backport Multipart Upload Features
  • Loading branch information
NipunaRanasinghe authored May 3, 2024
2 parents b8f21a0 + 94e73ef commit 3ad9d9d
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 0 deletions.
175 changes: 175 additions & 0 deletions s3/client.bal
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,181 @@ public isolated client class Client {
return string `${HTTPS}${self.amazonHost}/${bucketName}/${objectName}?${canonicalQueryString}&${X_AMZ_SIGNATURE
}=${signature}`;
}

# Initiates a multipart upload and returns an upload ID.
#
# + objectName - The name of the object
# + bucketName - The name of the bucket
# + cannedACL - The access control list of the new object
# + multipartUploadHeaders - Optional headers for multipart uploads
# + return - If success, the upload ID, else an error
remote isolated function createMultipartUpload(
@display {label: "Object Name"} string objectName,
@display {label: "Bucket Name"} string bucketName,
@display {label: "Grant"} CannedACL? cannedACL = (),
@display {label: "Multipart Upload Headers"} MultipartUploadHeaders? multipartUploadHeaders = ())
returns string|error {

if objectName == EMPTY_STRING {
return error(EMPTY_OBJECT_NAME_ERROR_MSG);
}
if bucketName == EMPTY_STRING {
return error(EMPTY_BUCKET_NAME_ERROR_MSG);
}

http:Request request = new;

string requestURI = string `/${bucketName}/${objectName}`;
string queryParamStr = string `?uploads`;
map<string> requestHeaders = setDefaultHeaders(self.amazonHost);
if cannedACL is CannedACL {
requestHeaders[X_AMZ_ACL] = cannedACL;
}
populateMultipartUploadHeaders(requestHeaders, multipartUploadHeaders);

check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, POST, requestURI, UNSIGNED_PAYLOAD,
requestHeaders, request, queryParams = {"uploads": EMPTY_STRING});
requestURI = string `${requestURI}${queryParamStr}`;
http:Response httpResponse = check self.amazonS3->post(requestURI, request);

xml XMLPayload = check httpResponse.getXmlPayload();
if httpResponse.statusCode == http:STATUS_OK {
return getUploadId(XMLPayload);
} else {
return error(XMLPayload.toString());
}
}

# Completes a multipart upload by assembling previously uploaded parts.
#
# + objectName - The name of the object
# + bucketName - The name of the bucket
# + payload - The file content that needed to be added to the bucket
# + uploadId - The upload ID of the multipart upload
# + partNumber - The part number of the object
# + uploadPartHeaders - Optional headers for the upload
# + return - An error on failure or else `()`
remote isolated function uploadPart(
@display {label: "Object Name"} string objectName,
@display {label: "Bucket Name"} string bucketName,
@display {label: "File Content"} string|xml|json|byte[]|stream<io:Block, io:Error?> payload,
@display {label: "Upload ID"} string uploadId,
@display {label: "Part Number"} int partNumber,
@display {label: "UploadPart Headers"} UploadPartHeaders? uploadPartHeaders = ())
returns CompletedPart|error {

if objectName == EMPTY_STRING {
return error(EMPTY_OBJECT_NAME_ERROR_MSG);
}
if bucketName == EMPTY_STRING {
return error(EMPTY_BUCKET_NAME_ERROR_MSG);
}
http:Request request = new;

string requestURI = string `/${bucketName}/${objectName}`;
string queryParamStr = string `?partNumber=${partNumber}&uploadId=${uploadId}`;

map<string> requestHeaders = setDefaultHeaders(self.amazonHost);

populateUploadPartHeaders(requestHeaders, uploadPartHeaders);

if payload is byte[] {
request.setBinaryPayload(payload);
} else if payload is stream<io:Block, io:Error?> {
request.setByteStream(payload);
} else {
request.setPayload(payload);
}

check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, PUT, requestURI, UNSIGNED_PAYLOAD,
requestHeaders, request, queryParams = {"partNumber": partNumber.toString(), "uploadId": uploadId});
requestURI = string `${requestURI}${queryParamStr}`;

http:Response httpResponse = check self.amazonS3->put(requestURI, request);
if httpResponse.statusCode == http:STATUS_OK {
string ETag = check httpResponse.getHeader("ETag");
return {partNumber, ETag};
} else {
xml XMLPayload = check httpResponse.getXmlPayload();
return error(XMLPayload.toString());
}
}

# Completes a multipart upload by assembling previously uploaded parts.
#
# + objectName - The name of the object
# + bucketName - The name of the bucket
# + uploadId - The upload ID of the multipart upload
# + completedParts - An array containing the part number and ETag of each uploaded part
# + return - An error on failure or else `()`
remote isolated function completeMultipartUpload(
@display {label: "Object Name"} string objectName,
@display {label: "Bucket Name"} string bucketName,
@display {label: "Upload ID"} string uploadId,
@display {label: "Completed Parts"} CompletedPart[] completedParts)
returns error? {

if objectName == EMPTY_STRING {
return error(EMPTY_OBJECT_NAME_ERROR_MSG);
}
if bucketName == EMPTY_STRING {
return error(EMPTY_BUCKET_NAME_ERROR_MSG);
}

http:Request request = new;

string requestURI = string `/${bucketName}/${objectName}`;
string queryParamStr = string `?uploadId=${uploadId}`;

map<string> requestHeaders = setDefaultHeaders(self.amazonHost);

check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, POST, requestURI,
UNSIGNED_PAYLOAD, requestHeaders, request, queryParams = {"uploadId": uploadId});
requestURI = string `${requestURI}${queryParamStr}`;

string payload = string `<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`;
foreach CompletedPart part in completedParts {
payload = string `${payload}<Part><PartNumber>${part.partNumber.toString()}</PartNumber><ETag>${part.ETag}</ETag></Part>`;
}
payload = string `${payload}</CompleteMultipartUpload>`;
request.setPayload(payload);

http:Response httpResponse = check self.amazonS3->post(requestURI, request);
return handleHttpResponse(httpResponse);
}

# Aborts a multipart upload.
#
# + objectName - The name of the object
# + bucketName - The name of the bucket
# + uploadId - The upload ID of the multipart upload
# + return - An error on failure or else `()`
remote isolated function abortMultipartUpload(
@display {label: "Object Name"} string objectName,
@display {label: "Bucket Name"} string bucketName,
@display {label: "Upload ID"} string uploadId)
returns error? {

if objectName == EMPTY_STRING {
return error(EMPTY_OBJECT_NAME_ERROR_MSG);
}
if bucketName == EMPTY_STRING {
return error(EMPTY_BUCKET_NAME_ERROR_MSG);
}

http:Request request = new;

string requestURI = string `/${bucketName}/${objectName}`;
map<string> requestHeaders = setDefaultHeaders(self.amazonHost);

check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, DELETE, requestURI,
UNSIGNED_PAYLOAD, requestHeaders, request, queryParams = {"uploadId": uploadId});

requestURI = string `${requestURI}?uploadId=${uploadId}`;

http:Response httpResponse = check self.amazonS3->delete(requestURI, request);
return handleHttpResponse(httpResponse);
}
}

isolated function setDefaultHeaders(string amazonHost) returns map<string> {
Expand Down
1 change: 1 addition & 0 deletions s3/constants.bal
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const X_AMZ_SIGNATURE = "X-Amz-Signature";
// HTTP verbs.
const string GET = "GET";
const string PUT = "PUT";
const POST = "POST";
const string DELETE = "DELETE";
const string TRUE = "TRUE";
const string FALSE = "FALSE";
Expand Down
4 changes: 4 additions & 0 deletions s3/data_mappings.bal
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ isolated function getS3ObjectsList(xml response) returns S3Object[] {
}
return s3Objects;
}

isolated function getUploadId(xml response) returns string {
return (response/<UploadId>/*).toString();
}
46 changes: 46 additions & 0 deletions s3/records.bal
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,49 @@ public type GetHeaders record {
GET method = GET;
ObjectRetrievalHeaders headers;
};

# Represents the optional headers specific to `CreateMultipartUpload` function.
#
# + cacheControl - Can be used to specify caching behavior along the request/reply chain
# + contentDisposition - Specifies presentational information for the object
# + contentEncoding - Specifies what content encodings have been applied to the object and thus what decoding mechanisms
# must be applied to obtain the media-type referenced by the Content-Type header field
# + contentLanguage - The language the content is in
# + contentType - The MIME type of the content
# + expires - The date and time at which the object is no longer cacheable
public type MultipartUploadHeaders record {
@display {label: "Cache Control"}
string cacheControl?;
@display {label: "Content Disposition"}
string contentDisposition?;
@display {label: "Content Encoding"}
string contentEncoding?;
@display {label: "Content Language"}
string contentLanguage?;
@display {label: "Content Type"}
string contentType?;
@display {label: "Expires"}
string expires?;
};

# Represents the details of a part uploaded through the `UploadPart` function.
#
# + partNumber - The part number of the file part
# + ETag - Represents the hash value of the object, which reflects modifications made exclusively to the contents of the object
public type CompletedPart record {
@display {label: "Part Number"}
int partNumber;
@display {label: "ETag"}
string ETag;
};

# Represents the optional headers specific to `UploadPart` function.
#
# + contentLength - The size of the object, in bytes
# + contentMD5 - The base64-encoded 128-bit MD5 digest of the message (without the headers)
public type UploadPartHeaders record {
@display {label: "Content Length"}
string contentLength?;
@display {label: "Content MD5"}
string contentMD5?;
};
52 changes: 52 additions & 0 deletions s3/tests/test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ configurable string accessKeyId = os:getEnv("ACCESS_KEY_ID");
configurable string secretAccessKey = os:getEnv("SECRET_ACCESS_KEY");
configurable string region = os:getEnv("REGION");
string fileName = "test.txt";
string fileName2 = "test2.txt";
string content = "Sample content";
string uploadId = "";
CompletedPart[] parts = [];

ConnectionConfig amazonS3Config = {
accessKeyId: accessKeyId,
Expand Down Expand Up @@ -193,6 +196,55 @@ function testDeleteObject() {
}
}

@test:Config {
dependsOn: [testListObjects]
}
function testCreateMultipartUpload() returns error? {
log:printInfo("amazonS3Client->createMultipartUpload()");
Client amazonS3Client = check new (amazonS3Config);
uploadId = check amazonS3Client->createMultipartUpload(fileName2, testBucketName);
test:assertTrue(uploadId.length() > 0, "Failed to create multipart upload");
}

@test:Config {
dependsOn: [testCreateMultipartUpload]
}
function testUploadPart() returns error? {
log:printInfo("amazonS3Client->uploadPart()");
Client amazonS3Client = check new (amazonS3Config);
CompletedPart response = check amazonS3Client->uploadPart(fileName2, testBucketName, content, uploadId, 1);
parts.push(response);
test:assertTrue(response.ETag.length() > 0, msg = "Failed to upload part");
}

@test:Config {
dependsOn: [testUploadPart]
}
function testCompleteMultipartUpload() returns error? {
log:printInfo("amazonS3Client->completeMultipartUpload()");
Client amazonS3Client = check new (amazonS3Config);
_ = check amazonS3Client->completeMultipartUpload(fileName2, testBucketName, uploadId, parts);
}

@test:Config {
dependsOn: [testCompleteMultipartUpload]
}
function testDeleteMultipartUpload() returns error? {
log:printInfo("amazonS3Client->deleteObject() for multipart upload");
Client amazonS3Client = check new (amazonS3Config);
_ = check amazonS3Client->deleteObject(testBucketName, fileName2);
}

@test:Config {
dependsOn: [testListBuckets],
before: testCreateMultipartUpload
}
function testAbortFileUpload() returns error? {
log:printInfo("amazonS3Client->abortMultipartUpload()");
Client amazonS3Client = check new (amazonS3Config);
_ = check amazonS3Client->abortMultipartUpload(fileName2, testBucketName, uploadId);
}

@test:AfterSuite {}
function testDeleteBucket() {
log:printInfo("amazonS3Client->deleteBucket()");
Expand Down
47 changes: 47 additions & 0 deletions s3/utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,53 @@ isolated function populateOptionalParameters(map<string> queryParamsMap, string?
return queryParamsStr;
}

isolated function populateMultipartUploadHeaders(
map<string> requestHeaders,
MultipartUploadHeaders? multipartUploadHeaders) {
if multipartUploadHeaders is () {
return;
}
string? cacheControl = multipartUploadHeaders?.cacheControl;
if cacheControl is string {
requestHeaders[IF_MODIFIED_SINCE] = cacheControl;
}
string? contentDisposition = multipartUploadHeaders?.contentDisposition;
if contentDisposition is string {
requestHeaders[IF_UNMODIFIED_SINCE] = contentDisposition;
}
string? contentEncoding = multipartUploadHeaders?.contentEncoding;
if contentEncoding is string {
requestHeaders[IF_MATCH] = contentEncoding;
}
string? contentLanguage = multipartUploadHeaders?.contentLanguage;
if contentLanguage is string {
requestHeaders[IF_NONE_MATCH] = contentLanguage;
}
string? contentType = multipartUploadHeaders?.contentType;
if contentType is string {
requestHeaders[RANGE] = contentType;
}
string? expires = multipartUploadHeaders?.expires;
if expires is string {
requestHeaders[RANGE] = expires;
}
}

isolated function populateUploadPartHeaders(map<string> requestHeaders, UploadPartHeaders? uploadPartHeaders) {

if uploadPartHeaders is () {
return;
}
string? contentMD5 = uploadPartHeaders?.contentMD5;
if contentMD5 is string {
requestHeaders[CONTENT_MD5] = contentMD5;
}
string? contentLength = uploadPartHeaders?.contentLength;
if contentLength is string {
requestHeaders[CONTENT_LENGTH] = contentLength;
}
}

isolated function handleHttpResponse(http:Response httpResponse) returns @tainted error? {
int statusCode = httpResponse.statusCode;
if (statusCode != http:STATUS_OK && statusCode != http:STATUS_NO_CONTENT) {
Expand Down

0 comments on commit 3ad9d9d

Please sign in to comment.