diff --git a/src/S3Commands.cc b/src/S3Commands.cc index bb28928..f150b9f 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -446,18 +446,66 @@ bool AmazonRequest::SendS3Request(const std::string &payload) { AmazonS3Upload::~AmazonS3Upload() {} bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset, - size_t size) { - if (offset != 0 || size != 0) { - std::string range; - formatstr(range, "bytes=%lld-%lld", static_cast(offset), - static_cast(offset + size - 1)); - headers["Range"] = range.c_str(); - } + size_t size) { + if (offset != 0 || size != 0) { + std::string range; + formatstr(range, "bytes=%lld-%lld", static_cast(offset), + static_cast(offset + size - 1)); + headers["Range"] = range.c_str(); + } + + httpVerb = "PUT"; + return SendS3Request(payload); +} + +// --------------------------------------------------------------------------- + + +AmazonS3CompleteMultipartUpload::~AmazonS3CompleteMultipartUpload() {} + +bool AmazonS3CompleteMultipartUpload::SendRequest(const std::vector &eTags, + int partNumber, + const std::string &uploadId) { + query_parameters["uploadId"] = uploadId; + + httpVerb = "POST"; + std::string payload; + payload += ""; + for (int i = 1; i < partNumber; i++) { + payload += ""; + payload += "" + eTags[i-1] + ""; + payload += "" + std::to_string(i) + ""; + payload += ""; + } + payload += ""; - httpVerb = "PUT"; - return SendS3Request(payload); + return SendS3Request(payload); +} +// --------------------------------------------------------------------------- + + +AmazonS3CreateMultipartUpload::~AmazonS3CreateMultipartUpload() {} +AmazonS3SendMultipartPart::~AmazonS3SendMultipartPart() {} + +bool AmazonS3CreateMultipartUpload::SendRequest() { + query_parameters["uploads"] = ""; + query_parameters["x-id"] = "CreateMultipartUpload"; + + httpVerb = "POST"; + return SendS3Request(""); } +bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload, + const std::string &partNumber, + const std::string &uploadId) { + query_parameters["partNumber"] = partNumber; + query_parameters["uploadId"] = uploadId; + includeResponseHeader = true; + httpVerb = "PUT"; + return SendS3Request(payload); +} + + // --------------------------------------------------------------------------- AmazonS3Download::~AmazonS3Download() {} @@ -505,6 +553,32 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { return SendS3Request(""); } +bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId, + std::string &errMsg) { + tinyxml2::XMLDocument doc; + auto err = doc.Parse(resultString.c_str()); + if (err != tinyxml2::XML_SUCCESS) { + errMsg = doc.ErrorStr(); + return false; + } + + auto elem = doc.RootElement(); + if (strcmp(elem->Name(), "InitiateMultipartUploadResult")) { + errMsg = "S3 Uploads response is not rooted with InitiateMultipartUploadResult " + "element"; + return false; + } + + for (auto child = elem->FirstChildElement(); child != nullptr; + child = child->NextSiblingElement()) { + if (!strcmp(child->Name(), "UploadId")) { + uploadId = child->GetText(); + } + } + return true; +} + + // Parse the results of the AWS directory listing // // S3 returns an XML structure for directory listings so we must pick it apart diff --git a/src/S3Commands.hh b/src/S3Commands.hh index 8b2e016..c927ca3 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -137,6 +137,79 @@ class AmazonS3Upload : public AmazonRequest { std::string path; }; +class AmazonS3CreateMultipartUpload : public AmazonRequest { + using AmazonRequest::SendRequest; + +public: + AmazonS3CreateMultipartUpload(const S3AccessInfo &ai, const std::string &objectName, + XrdSysError &log) + : AmazonRequest(ai, objectName, log) {} + + AmazonS3CreateMultipartUpload(const std::string &s, const std::string &akf, + const std::string &skf, const std::string &b, + const std::string &o, const std::string &style, + XrdSysError &log) + : AmazonRequest(s, akf, skf, b, o, style, 4, log) {} + + bool Results(std::string &uploadId, + std::string &errMsg); + + virtual ~AmazonS3CreateMultipartUpload(); + + virtual bool SendRequest(); + +protected: + //std::string path; +}; + +class AmazonS3CompleteMultipartUpload : public AmazonRequest { + using AmazonRequest::SendRequest; + +public: + AmazonS3CompleteMultipartUpload(const S3AccessInfo &ai, const std::string &objectName, + XrdSysError &log) + : AmazonRequest(ai, objectName, log) {} + + AmazonS3CompleteMultipartUpload(const std::string &s, const std::string &akf, + const std::string &skf, const std::string &b, + const std::string &o, const std::string &style, + XrdSysError &log) + : AmazonRequest(s, akf, skf, b, o, style, 4, log) {} + + virtual ~AmazonS3CompleteMultipartUpload(); + + virtual bool SendRequest(const std::vector &eTags, int partNumber, const std::string &uploadId); + +protected: +}; + +class AmazonS3SendMultipartPart : public AmazonRequest { + using AmazonRequest::SendRequest; + +public: + AmazonS3SendMultipartPart(const S3AccessInfo &ai, const std::string &objectName, + XrdSysError &log) + : AmazonRequest(ai, objectName, log) {} + + AmazonS3SendMultipartPart(const std::string &s, const std::string &akf, + const std::string &skf, const std::string &b, + const std::string &o, const std::string &style, + XrdSysError &log) + : AmazonRequest(s, akf, skf, b, o, style, 4, log) {} + + bool Results(std::string &uploadId, + std::string &errMsg); + + virtual ~AmazonS3SendMultipartPart(); + + virtual bool SendRequest(const std::string &payload, + const std::string &partNumber, + const std::string &uploadId); + +protected: +}; + + class AmazonS3Download : public AmazonRequest { using AmazonRequest::SendRequest; diff --git a/src/S3File.cc b/src/S3File.cc index daf7ffb..49a7cf3 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -38,6 +38,7 @@ #include #include #include +#include using namespace XrdHTTPServer; @@ -46,10 +47,17 @@ S3FileSystem *g_s3_oss = nullptr; XrdVERSIONINFO(XrdOssGetFileSystem, S3); S3File::S3File(XrdSysError &log, S3FileSystem *oss) - : m_log(log), m_oss(oss), content_length(0), last_modified(0) {} + : m_log(log), m_oss(oss), content_length(0), last_modified(0), write_buffer(""), partNumber(1) {} int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { - if (m_log.getMsgMask() & XrdHTTPServer::Debug) { + if(Oflag && O_CREAT) { + m_log.Log(LogMask::Info, "File opened for creation: ", path); + } + if(Oflag && O_APPEND) { + m_log.Log(LogMask::Info, "File opened for append: ", path); + } + + if (m_log.getMsgMask() & XrdHTTPServer::Debug) { m_log.Log(LogMask::Warning, "S3File::Open", "Opening file", path); } @@ -79,7 +87,15 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { } } - return 0; + AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log); + if (!startUpload.SendRequest()) { + m_log.Emsg("Open", "S3 multipart request failed"); + return -ENOENT; + } + std::string errMsg; + startUpload.Results(uploadId, errMsg); + + return 0; } ssize_t S3File::Read(void *buffer, off_t offset, size_t size) { @@ -177,21 +193,68 @@ int S3File::Fstat(struct stat *buff) { } ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { - AmazonS3Upload upload(m_ai, m_object, m_log); + std::string payload((char *)buffer, size); + write_buffer += payload; + write_buffer += (char *)buffer; + + //XXX should this be configurable? 100mb gives us a TB of file size. It doesn't seem + //terribly useful to be much smaller and it's not clear the S3 API will work if + //it's much larger. + if (write_buffer.length() > 100000000) { + if (SendPart() == -ENOENT) { + return -ENOENT; + } + } + return size; +} - std::string payload((char *)buffer, size); - if (!upload.SendRequest(payload, offset, size)) { - m_log.Emsg("Open", "upload.SendRequest() failed"); - return -ENOENT; - } else { - m_log.Emsg("Open", "upload.SendRequest() succeeded"); - return 0; - } +int S3File::SendPart() { + int length = write_buffer.length(); + AmazonS3SendMultipartPart upload_part_request = AmazonS3SendMultipartPart(m_ai, m_object, m_log); + if (!upload_part_request.SendRequest(write_buffer, std::to_string(partNumber), uploadId)) { + m_log.Emsg("SendPart", "upload.SendRequest() failed"); + return -ENOENT; + } else { + m_log.Emsg("SendPart", "upload.SendRequest() succeeded"); + std::string resultString = upload_part_request.getResultString(); + std::size_t startPos = resultString.find("ETag:"); + std::size_t endPos = resultString.find("\"", startPos + 7); + eTags.push_back(resultString.substr(startPos+7, endPos-startPos-7)); + + partNumber++; + write_buffer = ""; + } + + return length; } int S3File::Close(long long *retsz) { - m_log.Emsg("Close", "Closed our S3 file"); - return 0; + if(SendPart() == -ENOENT) { + return -ENOENT; + } else { + m_log.Emsg("Close", "Closed our S3 file"); + } + + AmazonS3CompleteMultipartUpload complete_upload_request = AmazonS3CompleteMultipartUpload(m_ai, m_object, m_log); + if (!complete_upload_request.SendRequest(eTags, partNumber, uploadId)) { + m_log.Emsg("SendPart", "close.SendRequest() failed"); + return -ENOENT; + } else { + m_log.Emsg("SendPart", "close.SendRequest() succeeded"); + } + + return 0; + + + /* Original write code + std::string payload((char *)buffer, size); + if (!upload.SendRequest(payload, offset, size)) { + m_log.Emsg("Open", "upload.SendRequest() failed"); + return -ENOENT; + } else { + m_log.Emsg("Open", "upload.SendRequest() succeeded"); + return 0; + } */ } extern "C" { diff --git a/src/S3File.hh b/src/S3File.hh index e42fcb1..7cea40e 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -95,6 +95,7 @@ class S3File : public XrdOssDF { time_t getLastModified() { return last_modified; } private: + int SendPart(); XrdSysError &m_log; S3FileSystem *m_oss; @@ -103,4 +104,11 @@ class S3File : public XrdOssDF { size_t content_length; time_t last_modified; + + std::string write_buffer; + std::string uploadId; + int partNumber; + std::vector eTags; + + }; diff --git a/src/S3FileSystem.hh b/src/S3FileSystem.hh index 95c639e..bafd901 100644 --- a/src/S3FileSystem.hh +++ b/src/S3FileSystem.hh @@ -145,4 +145,5 @@ class S3FileSystem : public XrdOss { const std::string &source); std::map> m_s3_access_map; std::string s3_url_style; + };