Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uploads to s3 using multipart uploads. #41

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions src/S3Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,52 @@ bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset,

// ---------------------------------------------------------------------------

AmazonS3CompleteMultipartUpload::~AmazonS3CompleteMultipartUpload() {}

bool AmazonS3CompleteMultipartUpload::SendRequest(
const std::vector<std::string> &eTags, int partNumber,
const std::string &uploadId) {
query_parameters["uploadId"] = uploadId;

httpVerb = "POST";
std::string payload;
payload += "<CompleteMultipartUpload "
"xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";
for (int i = 1; i < partNumber; i++) {
payload += "<Part>";
payload += "<ETag>" + eTags[i - 1] + "</ETag>";
payload += "<PartNumber>" + std::to_string(i) + "</PartNumber>";
payload += "</Part>";
}
payload += "</CompleteMultipartUpload>";

return SendS3Request(payload);
}
// ---------------------------------------------------------------------------

AmazonS3CreateMultipartUpload::~AmazonS3CreateMultipartUpload() {}
AmazonS3SendMultipartPart::~AmazonS3SendMultipartPart() {}

bool AmazonS3CreateMultipartUpload::SendRequest() {
query_parameters["uploads"] = "";
query_parameters["x-id"] = "CreateMultipartUpload";

httpVerb = "POST";
return SendS3Request("");
}

bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId) {
query_parameters["partNumber"] = partNumber;
query_parameters["uploadId"] = uploadId;
includeResponseHeader = true;
httpVerb = "PUT";
return SendS3Request(payload);
}

// ---------------------------------------------------------------------------

AmazonS3Download::~AmazonS3Download() {}

bool AmazonS3Download::SendRequest(off_t offset, size_t size) {
Expand Down Expand Up @@ -505,6 +551,32 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) {
return SendS3Request("");
}

bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId,
std::string &errMsg) {
tinyxml2::XMLDocument doc;
auto err = doc.Parse(resultString.c_str());
if (err != tinyxml2::XML_SUCCESS) {
errMsg = doc.ErrorStr();
return false;
}

auto elem = doc.RootElement();
if (strcmp(elem->Name(), "InitiateMultipartUploadResult")) {
errMsg = "S3 Uploads response is not rooted with "
"InitiateMultipartUploadResult "
"element";
return false;
}

for (auto child = elem->FirstChildElement(); child != nullptr;
child = child->NextSiblingElement()) {
if (!strcmp(child->Name(), "UploadId")) {
uploadId = child->GetText();
}
}
return true;
}

// Parse the results of the AWS directory listing
//
// S3 returns an XML structure for directory listings so we must pick it apart
Expand Down
74 changes: 74 additions & 0 deletions src/S3Commands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,80 @@ class AmazonS3Upload : public AmazonRequest {
std::string path;
};

class AmazonS3CreateMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CreateMultipartUpload(const S3AccessInfo &ai,
const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CreateMultipartUpload(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o,
const std::string &style, XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId, std::string &errMsg);

virtual ~AmazonS3CreateMultipartUpload();

virtual bool SendRequest();

protected:
// std::string path;
};

class AmazonS3CompleteMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CompleteMultipartUpload(const S3AccessInfo &ai,
const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CompleteMultipartUpload(const std::string &s,
const std::string &akf,
const std::string &skf,
const std::string &b, const std::string &o,
const std::string &style, XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

virtual ~AmazonS3CompleteMultipartUpload();

virtual bool SendRequest(const std::vector<std::string> &eTags,
int partNumber, const std::string &uploadId);

protected:
};

class AmazonS3SendMultipartPart : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3SendMultipartPart(const S3AccessInfo &ai,
const std::string &objectName, XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3SendMultipartPart(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o, const std::string &style,
XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId, std::string &errMsg);

virtual ~AmazonS3SendMultipartPart();

virtual bool SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId);

protected:
};

class AmazonS3Download : public AmazonRequest {
using AmazonRequest::SendRequest;

Expand Down
92 changes: 84 additions & 8 deletions src/S3File.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <memory>
#include <mutex>
#include <sstream>
#include <stdlib.h>
#include <string>
#include <vector>

Expand All @@ -46,9 +47,17 @@ S3FileSystem *g_s3_oss = nullptr;
XrdVERSIONINFO(XrdOssGetFileSystem, S3);

S3File::S3File(XrdSysError &log, S3FileSystem *oss)
: m_log(log), m_oss(oss), content_length(0), last_modified(0) {}
: m_log(log), m_oss(oss), content_length(0), last_modified(0),
write_buffer(""), partNumber(1) {}

int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
if (Oflag && O_CREAT) {
m_log.Log(LogMask::Info, "File opened for creation: ", path);
}
if (Oflag && O_APPEND) {
m_log.Log(LogMask::Info, "File opened for append: ", path);
}

if (m_log.getMsgMask() & XrdHTTPServer::Debug) {
m_log.Log(LogMask::Warning, "S3File::Open", "Opening file", path);
}
Expand Down Expand Up @@ -79,6 +88,14 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
}
}

AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log);
if (!startUpload.SendRequest()) {
m_log.Emsg("Open", "S3 multipart request failed");
return -ENOENT;
}
std::string errMsg;
startUpload.Results(uploadId, errMsg);

return 0;
}

Expand Down Expand Up @@ -177,21 +194,80 @@ int S3File::Fstat(struct stat *buff) {
}

ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
AmazonS3Upload upload(m_ai, m_object, m_log);

std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
size_t payload_size = payload.length();
if (payload_size != size) {
return -ENOENT;
}
write_buffer += payload;

// XXX should this be configurable? 100mb gives us a TB of file size. It
// doesn't seem terribly useful to be much smaller and it's not clear the S3
// API will work if it's much larger.
if (write_buffer.length() > 100000000) {
if (SendPart() == -ENOENT) {
return -ENOENT;
}
}
return size;
}

int S3File::SendPart() {
int length = write_buffer.length();
AmazonS3SendMultipartPart upload_part_request =
AmazonS3SendMultipartPart(m_ai, m_object, m_log);
if (!upload_part_request.SendRequest(
write_buffer, std::to_string(partNumber), uploadId)) {
m_log.Emsg("SendPart", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
m_log.Emsg("SendPart", "upload.SendRequest() succeeded");
std::string resultString = upload_part_request.getResultString();
std::size_t startPos = resultString.find("ETag:");
std::size_t endPos = resultString.find("\"", startPos + 7);
eTags.push_back(
resultString.substr(startPos + 7, endPos - startPos - 7));

partNumber++;
write_buffer = "";
}

return length;
}

int S3File::Close(long long *retsz) {
m_log.Emsg("Close", "Closed our S3 file");
// this is only true if a buffer exists that needs to be drained
if (write_buffer.length() > 0) {
if (SendPart() == -ENOENT) {
return -ENOENT;
} else {
m_log.Emsg("Close", "Closed our S3 file");
jhiemstrawisc marked this conversation as resolved.
Show resolved Hide resolved
}
}
// this is only true if some parts have been written and need to be
// finalized
if (partNumber > 1) {
AmazonS3CompleteMultipartUpload complete_upload_request =
AmazonS3CompleteMultipartUpload(m_ai, m_object, m_log);
if (!complete_upload_request.SendRequest(eTags, partNumber, uploadId)) {
m_log.Emsg("SendPart", "close.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("SendPart", "close.SendRequest() succeeded");
}
}

return 0;

/* Original write code
std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
} */
}

extern "C" {
Expand Down
6 changes: 6 additions & 0 deletions src/S3File.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class S3File : public XrdOssDF {
time_t getLastModified() { return last_modified; }

private:
int SendPart();
XrdSysError &m_log;
S3FileSystem *m_oss;

Expand All @@ -103,4 +104,9 @@ class S3File : public XrdOssDF {

size_t content_length;
time_t last_modified;

std::string write_buffer;
std::string uploadId;
int partNumber;
std::vector<std::string> eTags;
};
Loading