diff --git a/s3/client.bal b/s3/client.bal index c03a856..03c90fd 100644 --- a/s3/client.bal +++ b/s3/client.bal @@ -341,6 +341,181 @@ public isolated client class Client { return string `${HTTPS}${self.amazonHost}/${bucketName}/${objectName}?${canonicalQueryString}&${X_AMZ_SIGNATURE }=${signature}`; } + + # Initiates a multipart upload and returns an upload ID. + # + # + objectName - The name of the object + # + bucketName - The name of the bucket + # + cannedACL - The access control list of the new object + # + multipartUploadHeaders - Optional headers for multipart uploads + # + return - If success, the upload ID, else an error + remote isolated function createMultipartUpload( + @display {label: "Object Name"} string objectName, + @display {label: "Bucket Name"} string bucketName, + @display {label: "Grant"} CannedACL? cannedACL = (), + @display {label: "Multipart Upload Headers"} MultipartUploadHeaders? multipartUploadHeaders = ()) + returns string|error { + + if objectName == EMPTY_STRING { + return error(EMPTY_OBJECT_NAME_ERROR_MSG); + } + if bucketName == EMPTY_STRING { + return error(EMPTY_BUCKET_NAME_ERROR_MSG); + } + + http:Request request = new; + + string requestURI = string `/${bucketName}/${objectName}`; + string queryParamStr = string `?uploads`; + map requestHeaders = setDefaultHeaders(self.amazonHost); + if cannedACL is CannedACL { + requestHeaders[X_AMZ_ACL] = cannedACL; + } + populateMultipartUploadHeaders(requestHeaders, multipartUploadHeaders); + + check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, POST, requestURI, UNSIGNED_PAYLOAD, + requestHeaders, request, queryParams = {"uploads": EMPTY_STRING}); + requestURI = string `${requestURI}${queryParamStr}`; + http:Response httpResponse = check self.amazonS3->post(requestURI, request); + + xml XMLPayload = check httpResponse.getXmlPayload(); + if httpResponse.statusCode == http:STATUS_OK { + return getUploadId(XMLPayload); + } else { + return error(XMLPayload.toString()); + } + } + + # Completes a multipart upload by assembling previously uploaded parts. + # + # + objectName - The name of the object + # + bucketName - The name of the bucket + # + payload - The file content that needed to be added to the bucket + # + uploadId - The upload ID of the multipart upload + # + partNumber - The part number of the object + # + uploadPartHeaders - Optional headers for the upload + # + return - An error on failure or else `()` + remote isolated function uploadPart( + @display {label: "Object Name"} string objectName, + @display {label: "Bucket Name"} string bucketName, + @display {label: "File Content"} string|xml|json|byte[]|stream payload, + @display {label: "Upload ID"} string uploadId, + @display {label: "Part Number"} int partNumber, + @display {label: "UploadPart Headers"} UploadPartHeaders? uploadPartHeaders = ()) + returns CompletedPart|error { + + if objectName == EMPTY_STRING { + return error(EMPTY_OBJECT_NAME_ERROR_MSG); + } + if bucketName == EMPTY_STRING { + return error(EMPTY_BUCKET_NAME_ERROR_MSG); + } + http:Request request = new; + + string requestURI = string `/${bucketName}/${objectName}`; + string queryParamStr = string `?partNumber=${partNumber}&uploadId=${uploadId}`; + + map requestHeaders = setDefaultHeaders(self.amazonHost); + + populateUploadPartHeaders(requestHeaders, uploadPartHeaders); + + if payload is byte[] { + request.setBinaryPayload(payload); + } else if payload is stream { + request.setByteStream(payload); + } else { + request.setPayload(payload); + } + + check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, PUT, requestURI, UNSIGNED_PAYLOAD, + requestHeaders, request, queryParams = {"partNumber": partNumber.toString(), "uploadId": uploadId}); + requestURI = string `${requestURI}${queryParamStr}`; + + http:Response httpResponse = check self.amazonS3->put(requestURI, request); + if httpResponse.statusCode == http:STATUS_OK { + string ETag = check httpResponse.getHeader("ETag"); + return {partNumber, ETag}; + } else { + xml XMLPayload = check httpResponse.getXmlPayload(); + return error(XMLPayload.toString()); + } + } + + # Completes a multipart upload by assembling previously uploaded parts. + # + # + objectName - The name of the object + # + bucketName - The name of the bucket + # + uploadId - The upload ID of the multipart upload + # + completedParts - An array containing the part number and ETag of each uploaded part + # + return - An error on failure or else `()` + remote isolated function completeMultipartUpload( + @display {label: "Object Name"} string objectName, + @display {label: "Bucket Name"} string bucketName, + @display {label: "Upload ID"} string uploadId, + @display {label: "Completed Parts"} CompletedPart[] completedParts) + returns error? { + + if objectName == EMPTY_STRING { + return error(EMPTY_OBJECT_NAME_ERROR_MSG); + } + if bucketName == EMPTY_STRING { + return error(EMPTY_BUCKET_NAME_ERROR_MSG); + } + + http:Request request = new; + + string requestURI = string `/${bucketName}/${objectName}`; + string queryParamStr = string `?uploadId=${uploadId}`; + + map requestHeaders = setDefaultHeaders(self.amazonHost); + + check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, POST, requestURI, + UNSIGNED_PAYLOAD, requestHeaders, request, queryParams = {"uploadId": uploadId}); + requestURI = string `${requestURI}${queryParamStr}`; + + string payload = string ``; + foreach CompletedPart part in completedParts { + payload = string `${payload}${part.partNumber.toString()}${part.ETag}`; + } + payload = string `${payload}`; + request.setPayload(payload); + + http:Response httpResponse = check self.amazonS3->post(requestURI, request); + return handleHttpResponse(httpResponse); + } + + # Aborts a multipart upload. + # + # + objectName - The name of the object + # + bucketName - The name of the bucket + # + uploadId - The upload ID of the multipart upload + # + return - An error on failure or else `()` + remote isolated function abortMultipartUpload( + @display {label: "Object Name"} string objectName, + @display {label: "Bucket Name"} string bucketName, + @display {label: "Upload ID"} string uploadId) + returns error? { + + if objectName == EMPTY_STRING { + return error(EMPTY_OBJECT_NAME_ERROR_MSG); + } + if bucketName == EMPTY_STRING { + return error(EMPTY_BUCKET_NAME_ERROR_MSG); + } + + http:Request request = new; + + string requestURI = string `/${bucketName}/${objectName}`; + map requestHeaders = setDefaultHeaders(self.amazonHost); + + check generateSignature(self.accessKeyId, self.secretAccessKey, self.region, DELETE, requestURI, + UNSIGNED_PAYLOAD, requestHeaders, request, queryParams = {"uploadId": uploadId}); + + requestURI = string `${requestURI}?uploadId=${uploadId}`; + + http:Response httpResponse = check self.amazonS3->delete(requestURI, request); + return handleHttpResponse(httpResponse); + } } isolated function setDefaultHeaders(string amazonHost) returns map { diff --git a/s3/constants.bal b/s3/constants.bal index 1887a6a..ea0234f 100644 --- a/s3/constants.bal +++ b/s3/constants.bal @@ -60,6 +60,7 @@ const X_AMZ_SIGNATURE = "X-Amz-Signature"; // HTTP verbs. const string GET = "GET"; const string PUT = "PUT"; +const POST = "POST"; const string DELETE = "DELETE"; const string TRUE = "TRUE"; const string FALSE = "FALSE"; diff --git a/s3/data_mappings.bal b/s3/data_mappings.bal index be08660..a19db8e 100644 --- a/s3/data_mappings.bal +++ b/s3/data_mappings.bal @@ -51,3 +51,7 @@ isolated function getS3ObjectsList(xml response) returns S3Object[] { } return s3Objects; } + +isolated function getUploadId(xml response) returns string { + return (response//*).toString(); +} diff --git a/s3/records.bal b/s3/records.bal index bb9cb6a..834f677 100644 --- a/s3/records.bal +++ b/s3/records.bal @@ -145,3 +145,49 @@ public type GetHeaders record { GET method = GET; ObjectRetrievalHeaders headers; }; + +# Represents the optional headers specific to `CreateMultipartUpload` function. +# +# + cacheControl - Can be used to specify caching behavior along the request/reply chain +# + contentDisposition - Specifies presentational information for the object +# + contentEncoding - Specifies what content encodings have been applied to the object and thus what decoding mechanisms +# must be applied to obtain the media-type referenced by the Content-Type header field +# + contentLanguage - The language the content is in +# + contentType - The MIME type of the content +# + expires - The date and time at which the object is no longer cacheable +public type MultipartUploadHeaders record { + @display {label: "Cache Control"} + string cacheControl?; + @display {label: "Content Disposition"} + string contentDisposition?; + @display {label: "Content Encoding"} + string contentEncoding?; + @display {label: "Content Language"} + string contentLanguage?; + @display {label: "Content Type"} + string contentType?; + @display {label: "Expires"} + string expires?; +}; + +# Represents the details of a part uploaded through the `UploadPart` function. +# +# + partNumber - The part number of the file part +# + ETag - Represents the hash value of the object, which reflects modifications made exclusively to the contents of the object +public type CompletedPart record { + @display {label: "Part Number"} + int partNumber; + @display {label: "ETag"} + string ETag; +}; + +# Represents the optional headers specific to `UploadPart` function. +# +# + contentLength - The size of the object, in bytes +# + contentMD5 - The base64-encoded 128-bit MD5 digest of the message (without the headers) +public type UploadPartHeaders record { + @display {label: "Content Length"} + string contentLength?; + @display {label: "Content MD5"} + string contentMD5?; +}; diff --git a/s3/tests/test.bal b/s3/tests/test.bal index 53e1028..ca84530 100644 --- a/s3/tests/test.bal +++ b/s3/tests/test.bal @@ -27,7 +27,10 @@ configurable string accessKeyId = os:getEnv("ACCESS_KEY_ID"); configurable string secretAccessKey = os:getEnv("SECRET_ACCESS_KEY"); configurable string region = os:getEnv("REGION"); string fileName = "test.txt"; +string fileName2 = "test2.txt"; string content = "Sample content"; +string uploadId = ""; +CompletedPart[] parts = []; ConnectionConfig amazonS3Config = { accessKeyId: accessKeyId, @@ -193,6 +196,55 @@ function testDeleteObject() { } } +@test:Config { + dependsOn: [testListObjects] +} +function testCreateMultipartUpload() returns error? { + log:printInfo("amazonS3Client->createMultipartUpload()"); + Client amazonS3Client = check new (amazonS3Config); + uploadId = check amazonS3Client->createMultipartUpload(fileName2, testBucketName); + test:assertTrue(uploadId.length() > 0, "Failed to create multipart upload"); +} + +@test:Config { + dependsOn: [testCreateMultipartUpload] +} +function testUploadPart() returns error? { + log:printInfo("amazonS3Client->uploadPart()"); + Client amazonS3Client = check new (amazonS3Config); + CompletedPart response = check amazonS3Client->uploadPart(fileName2, testBucketName, content, uploadId, 1); + parts.push(response); + test:assertTrue(response.ETag.length() > 0, msg = "Failed to upload part"); +} + +@test:Config { + dependsOn: [testUploadPart] +} +function testCompleteMultipartUpload() returns error? { + log:printInfo("amazonS3Client->completeMultipartUpload()"); + Client amazonS3Client = check new (amazonS3Config); + _ = check amazonS3Client->completeMultipartUpload(fileName2, testBucketName, uploadId, parts); +} + +@test:Config { + dependsOn: [testCompleteMultipartUpload] +} +function testDeleteMultipartUpload() returns error? { + log:printInfo("amazonS3Client->deleteObject() for multipart upload"); + Client amazonS3Client = check new (amazonS3Config); + _ = check amazonS3Client->deleteObject(testBucketName, fileName2); +} + +@test:Config { + dependsOn: [testListBuckets], + before: testCreateMultipartUpload +} +function testAbortFileUpload() returns error? { + log:printInfo("amazonS3Client->abortMultipartUpload()"); + Client amazonS3Client = check new (amazonS3Config); + _ = check amazonS3Client->abortMultipartUpload(fileName2, testBucketName, uploadId); +} + @test:AfterSuite {} function testDeleteBucket() { log:printInfo("amazonS3Client->deleteBucket()"); diff --git a/s3/utils.bal b/s3/utils.bal index 9768c7a..85c00c3 100644 --- a/s3/utils.bal +++ b/s3/utils.bal @@ -353,6 +353,53 @@ isolated function populateOptionalParameters(map queryParamsMap, string? return queryParamsStr; } +isolated function populateMultipartUploadHeaders( + map requestHeaders, + MultipartUploadHeaders? multipartUploadHeaders) { + if multipartUploadHeaders is () { + return; + } + string? cacheControl = multipartUploadHeaders?.cacheControl; + if cacheControl is string { + requestHeaders[IF_MODIFIED_SINCE] = cacheControl; + } + string? contentDisposition = multipartUploadHeaders?.contentDisposition; + if contentDisposition is string { + requestHeaders[IF_UNMODIFIED_SINCE] = contentDisposition; + } + string? contentEncoding = multipartUploadHeaders?.contentEncoding; + if contentEncoding is string { + requestHeaders[IF_MATCH] = contentEncoding; + } + string? contentLanguage = multipartUploadHeaders?.contentLanguage; + if contentLanguage is string { + requestHeaders[IF_NONE_MATCH] = contentLanguage; + } + string? contentType = multipartUploadHeaders?.contentType; + if contentType is string { + requestHeaders[RANGE] = contentType; + } + string? expires = multipartUploadHeaders?.expires; + if expires is string { + requestHeaders[RANGE] = expires; + } +} + +isolated function populateUploadPartHeaders(map requestHeaders, UploadPartHeaders? uploadPartHeaders) { + + if uploadPartHeaders is () { + return; + } + string? contentMD5 = uploadPartHeaders?.contentMD5; + if contentMD5 is string { + requestHeaders[CONTENT_MD5] = contentMD5; + } + string? contentLength = uploadPartHeaders?.contentLength; + if contentLength is string { + requestHeaders[CONTENT_LENGTH] = contentLength; + } +} + isolated function handleHttpResponse(http:Response httpResponse) returns @tainted error? { int statusCode = httpResponse.statusCode; if (statusCode != http:STATUS_OK && statusCode != http:STATUS_NO_CONTENT) {