Skip to content

Commit

Permalink
uploads to s3 using multipart uploads. Testing seems to be working, b…
Browse files Browse the repository at this point in the history
…ut is incomplete.
  • Loading branch information
rw2 committed Aug 20, 2024
1 parent 6d3ca14 commit 5eed5d2
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 23 deletions.
92 changes: 83 additions & 9 deletions src/S3Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,18 +446,66 @@ bool AmazonRequest::SendS3Request(const std::string &payload) {
AmazonS3Upload::~AmazonS3Upload() {}

bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset,
size_t size) {
if (offset != 0 || size != 0) {
std::string range;
formatstr(range, "bytes=%lld-%lld", static_cast<long long int>(offset),
static_cast<long long int>(offset + size - 1));
headers["Range"] = range.c_str();
}
size_t size) {
if (offset != 0 || size != 0) {
std::string range;
formatstr(range, "bytes=%lld-%lld", static_cast<long long int>(offset),
static_cast<long long int>(offset + size - 1));
headers["Range"] = range.c_str();
}

httpVerb = "PUT";
return SendS3Request(payload);
}

// ---------------------------------------------------------------------------


AmazonS3CompleteMultipartUpload::~AmazonS3CompleteMultipartUpload() {}

bool AmazonS3CompleteMultipartUpload::SendRequest(const std::vector<std::string> &eTags,
int partNumber,
const std::string &uploadId) {
query_parameters["uploadId"] = uploadId;

httpVerb = "POST";
std::string payload;
payload += "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";
for (int i = 1; i < partNumber; i++) {
payload += "<Part>";
payload += "<ETag>" + eTags[i-1] + "</ETag>";
payload += "<PartNumber>" + std::to_string(i) + "</PartNumber>";
payload += "</Part>";
}
payload += "</CompleteMultipartUpload>";

httpVerb = "PUT";
return SendS3Request(payload);
return SendS3Request(payload);
}
// ---------------------------------------------------------------------------


AmazonS3CreateMultipartUpload::~AmazonS3CreateMultipartUpload() {}
AmazonS3SendMultipartPart::~AmazonS3SendMultipartPart() {}

bool AmazonS3CreateMultipartUpload::SendRequest() {
query_parameters["uploads"] = "";
query_parameters["x-id"] = "CreateMultipartUpload";

httpVerb = "POST";
return SendS3Request("");
}

bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId) {
query_parameters["partNumber"] = partNumber;
query_parameters["uploadId"] = uploadId;
includeResponseHeader = true;
httpVerb = "PUT";
return SendS3Request(payload);
}


// ---------------------------------------------------------------------------

AmazonS3Download::~AmazonS3Download() {}
Expand Down Expand Up @@ -505,6 +553,32 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) {
return SendS3Request("");
}

bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId,
std::string &errMsg) {
tinyxml2::XMLDocument doc;
auto err = doc.Parse(resultString.c_str());
if (err != tinyxml2::XML_SUCCESS) {
errMsg = doc.ErrorStr();
return false;
}

auto elem = doc.RootElement();
if (strcmp(elem->Name(), "InitiateMultipartUploadResult")) {
errMsg = "S3 Uploads response is not rooted with InitiateMultipartUploadResult "
"element";
return false;
}

for (auto child = elem->FirstChildElement(); child != nullptr;
child = child->NextSiblingElement()) {
if (!strcmp(child->Name(), "UploadId")) {
uploadId = child->GetText();
}
}
return true;
}


// Parse the results of the AWS directory listing
//
// S3 returns an XML structure for directory listings so we must pick it apart
Expand Down
73 changes: 73 additions & 0 deletions src/S3Commands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,79 @@ class AmazonS3Upload : public AmazonRequest {
std::string path;
};

class AmazonS3CreateMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CreateMultipartUpload(const S3AccessInfo &ai, const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CreateMultipartUpload(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o, const std::string &style,
XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId,
std::string &errMsg);

virtual ~AmazonS3CreateMultipartUpload();

virtual bool SendRequest();

protected:
//std::string path;
};

class AmazonS3CompleteMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CompleteMultipartUpload(const S3AccessInfo &ai, const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CompleteMultipartUpload(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o, const std::string &style,
XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

virtual ~AmazonS3CompleteMultipartUpload();

virtual bool SendRequest(const std::vector<std::string> &eTags, int partNumber, const std::string &uploadId);

protected:
};

class AmazonS3SendMultipartPart : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3SendMultipartPart(const S3AccessInfo &ai, const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3SendMultipartPart(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o, const std::string &style,
XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId,
std::string &errMsg);

virtual ~AmazonS3SendMultipartPart();

virtual bool SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId);

protected:
};


class AmazonS3Download : public AmazonRequest {
using AmazonRequest::SendRequest;

Expand Down
91 changes: 77 additions & 14 deletions src/S3File.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <sstream>
#include <string>
#include <vector>
#include <stdlib.h>

using namespace XrdHTTPServer;

Expand All @@ -46,10 +47,17 @@ S3FileSystem *g_s3_oss = nullptr;
XrdVERSIONINFO(XrdOssGetFileSystem, S3);

S3File::S3File(XrdSysError &log, S3FileSystem *oss)
: m_log(log), m_oss(oss), content_length(0), last_modified(0) {}
: m_log(log), m_oss(oss), content_length(0), last_modified(0), write_buffer(""), partNumber(1) {}

int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
if (m_log.getMsgMask() & XrdHTTPServer::Debug) {
if(Oflag && O_CREAT) {
m_log.Log(LogMask::Info, "File opened for creation: ", path);
}
if(Oflag && O_APPEND) {
m_log.Log(LogMask::Info, "File opened for append: ", path);
}

if (m_log.getMsgMask() & XrdHTTPServer::Debug) {
m_log.Log(LogMask::Warning, "S3File::Open", "Opening file", path);
}

Expand Down Expand Up @@ -79,7 +87,15 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
}
}

return 0;
AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log);
if (!startUpload.SendRequest()) {
m_log.Emsg("Open", "S3 multipart request failed");
return -ENOENT;
}
std::string errMsg;
startUpload.Results(uploadId, errMsg);

return 0;
}

ssize_t S3File::Read(void *buffer, off_t offset, size_t size) {
Expand Down Expand Up @@ -177,21 +193,68 @@ int S3File::Fstat(struct stat *buff) {
}

ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
AmazonS3Upload upload(m_ai, m_object, m_log);
std::string payload((char *)buffer, size);
write_buffer += payload;
write_buffer += (char *)buffer;

//XXX should this be configurable? 100mb gives us a TB of file size. It doesn't seem
//terribly useful to be much smaller and it's not clear the S3 API will work if
//it's much larger.
if (write_buffer.length() > 100000000) {
if (SendPart() == -ENOENT) {
return -ENOENT;
}
}
return size;
}

std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
}
int S3File::SendPart() {
int length = write_buffer.length();
AmazonS3SendMultipartPart upload_part_request = AmazonS3SendMultipartPart(m_ai, m_object, m_log);
if (!upload_part_request.SendRequest(write_buffer, std::to_string(partNumber), uploadId)) {
m_log.Emsg("SendPart", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("SendPart", "upload.SendRequest() succeeded");
std::string resultString = upload_part_request.getResultString();
std::size_t startPos = resultString.find("ETag:");
std::size_t endPos = resultString.find("\"", startPos + 7);
eTags.push_back(resultString.substr(startPos+7, endPos-startPos-7));

partNumber++;
write_buffer = "";
}

return length;
}

int S3File::Close(long long *retsz) {
m_log.Emsg("Close", "Closed our S3 file");
return 0;
if(SendPart() == -ENOENT) {
return -ENOENT;
} else {
m_log.Emsg("Close", "Closed our S3 file");
}

AmazonS3CompleteMultipartUpload complete_upload_request = AmazonS3CompleteMultipartUpload(m_ai, m_object, m_log);
if (!complete_upload_request.SendRequest(eTags, partNumber, uploadId)) {
m_log.Emsg("SendPart", "close.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("SendPart", "close.SendRequest() succeeded");
}

return 0;


/* Original write code
std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
} */
}

extern "C" {
Expand Down
8 changes: 8 additions & 0 deletions src/S3File.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class S3File : public XrdOssDF {
time_t getLastModified() { return last_modified; }

private:
int SendPart();
XrdSysError &m_log;
S3FileSystem *m_oss;

Expand All @@ -103,4 +104,11 @@ class S3File : public XrdOssDF {

size_t content_length;
time_t last_modified;

std::string write_buffer;
std::string uploadId;
int partNumber;
std::vector<std::string> eTags;


};
1 change: 1 addition & 0 deletions src/S3FileSystem.hh
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,5 @@ class S3FileSystem : public XrdOss {
const std::string &source);
std::map<std::string, std::shared_ptr<S3AccessInfo>> m_s3_access_map;
std::string s3_url_style;

};

0 comments on commit 5eed5d2

Please sign in to comment.