Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add direct_upload option #5

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ bool StatCache::GetStat(string& key, struct stat* pst, headers_t* meta, bool ove
S3FS_PRN_DBG("stat cache not hit by ETag[path=%s][time=%jd][hit count=%lu][ETag(%s)!=(%s)]",
strpath.c_str(), (intmax_t)(ent->cache_date), ent->hit_count, petag ? petag : "null", ent->meta["ETag"].c_str());
}else{
// hit
// hit
S3FS_PRN_DBG("stat cache hit [path=%s][time=%jd][hit count=%lu]", strpath.c_str(), (intmax_t)(ent->cache_date), ent->hit_count);

if(pst!= NULL){
Expand Down Expand Up @@ -307,7 +307,7 @@ bool StatCache::IncSize(const std::string& key, ssize_t sz)
stat_cache_entry* entry = iter->second;
entry->stbuf.st_size += sz;
S3FS_PRN_INFO3(
"Update file size in stat cache. [path=%s][size=%ld][delta=%ld]",
"Update file size in stat cache. [path=%s][size=%ld][delta=%ld]",
key.c_str(), entry->stbuf.st_size, sz);
}

Expand Down
158 changes: 123 additions & 35 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ string S3fsCurl::LookupMimeType(string name)
}

// neither the last extension nor the second-to-last extension
// matched a mimeType, return the default mime type
// matched a mimeType, return the default mime type
return result;
}

Expand All @@ -617,7 +617,7 @@ bool S3fsCurl::LocateBundle(void)
// See if environment variable CURL_CA_BUNDLE is set
// if so, check it, if it is a good path, then set the
// curl_ca_bundle variable to it
char *CURL_CA_BUNDLE;
char *CURL_CA_BUNDLE;

if(0 == S3fsCurl::curl_ca_bundle.size()){
CURL_CA_BUNDLE = getenv("CURL_CA_BUNDLE");
Expand All @@ -629,7 +629,7 @@ bool S3fsCurl::LocateBundle(void)
return false;
}
BF.close();
S3fsCurl::curl_ca_bundle.assign(CURL_CA_BUNDLE);
S3fsCurl::curl_ca_bundle.assign(CURL_CA_BUNDLE);
return true;
}
}
Expand All @@ -650,10 +650,10 @@ bool S3fsCurl::LocateBundle(void)
// dnl /usr/local/share/certs/ca-root.crt FreeBSD
// dnl /etc/ssl/cert.pem OpenBSD
// dnl /etc/ssl/certs/ (ca path) SUSE
ifstream BF("/etc/pki/tls/certs/ca-bundle.crt");
ifstream BF("/etc/pki/tls/certs/ca-bundle.crt");
if(BF.good()){
BF.close();
S3fsCurl::curl_ca_bundle.assign("/etc/pki/tls/certs/ca-bundle.crt");
S3fsCurl::curl_ca_bundle.assign("/etc/pki/tls/certs/ca-bundle.crt");
}else{
S3FS_PRN_ERR("%s: /etc/pki/tls/certs/ca-bundle.crt is not readable", program_name.c_str());
return false;
Expand Down Expand Up @@ -926,7 +926,7 @@ bool S3fsCurl::SetSseKmsid(const char* kmsid)
}

// [NOTE]
// Because SSE is set by some options and environment,
// Because SSE is set by some options and environment,
// this function check the integrity of the SSE data finally.
bool S3fsCurl::FinalCheckSse(void)
{
Expand Down Expand Up @@ -955,7 +955,7 @@ bool S3fsCurl::FinalCheckSse(void)
}
return true;
}

bool S3fsCurl::LoadEnvSseCKeys(void)
{
char* envkeys = getenv("OSSSSECKEYS");
Expand Down Expand Up @@ -1093,7 +1093,7 @@ bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl)
// if(NULL == strstr(s3fscurl->headdata->str(), upper(s3fscurl->partdata.etag).c_str())){
// return false;
// }
S3FS_PRN_ERR("headdata is : %s", s3fscurl->headdata->str());
S3FS_PRN_INFO("headdata is : %s", s3fscurl->headdata->str());
string header_str(s3fscurl->headdata->str(), s3fscurl->headdata->size());
int pos = header_str.find("ETag: \"");
if (pos != std::string::npos) {
Expand All @@ -1103,14 +1103,105 @@ bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl)
} else {
s3fscurl->partdata.etag = header_str.substr(pos + 7, 32); // ETag get md5 value
}
S3FS_PRN_ERR("partdata.etag : %s", s3fscurl->partdata.etag.c_str());
S3FS_PRN_INFO("partdata.etag : %s", s3fscurl->partdata.etag.c_str());
}
s3fscurl->partdata.etaglist->at(s3fscurl->partdata.etagpos).assign(s3fscurl->partdata.etag);
s3fscurl->partdata.uploaded = true;

return true;
}

int S3fsCurl::ParallelMultipartUploadWithoutPreRequest(const char* tpath, headers_t& meta, int fd,
off_t offset, size_t size, string upload_id,
etaglist_t* list)
{
S3FS_PRN_DBG("tpath=%s, fd=%d, offset=%lu, size=%lu, upload_id=%s", tpath, fd, offset, size, upload_id.c_str());
int result;
struct stat st;
int fd2;
off_t remaining_bytes;
off_t cur_pos = offset;
S3fsCurl s3fscurl(true);

S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

if (NULL == list) {
S3FS_PRN_ERR("Invalid parameter, list could not be null.");
return -1;
}

// duplicate fd
if(-1 == (fd2 = dup(fd)) || 0 != lseek(fd2, 0, SEEK_SET)){
S3FS_PRN_ERR("Could not duplicate file descriptor(errno=%d)", errno);
if(-1 != fd2){
close(fd2);
}
return -errno;
}
if(-1 == fstat(fd2, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
close(fd2);
return -errno;
}

// cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = size; 0 < remaining_bytes; ){
S3fsMultiCurl curlmulti;
int para_cnt;
off_t chunk;

// Initialize S3fsMultiCurl
curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback);

// Loop for setup parallel upload(multipart) request.
for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){
// chunk size
// if remaining_bytes less than 2*multipart_size, upload all remaining bytes,
// in order to avoid sending part that less than S3fsCurl::multipart_size
chunk = remaining_bytes > 2 * S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;

// s3fscurl sub object
S3fsCurl* s3fscurl_para = new S3fsCurl(true);
s3fscurl_para->partdata.fd = fd2;
s3fscurl_para->partdata.startpos = cur_pos;
cur_pos += chunk;
s3fscurl_para->partdata.size = chunk;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(list);

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list->size(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
close(fd2);
delete s3fscurl_para;
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
close(fd2);
delete s3fscurl_para;
return -1;
}
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occuered in multi request(errno=%d).", result);
break;
}

// reinit for loop.
curlmulti.Clear();
}
close(fd2);

return 0;
}

S3fsCurl* S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
Expand Down Expand Up @@ -1436,7 +1527,7 @@ int S3fsCurl::CurlDebugFunc(CURL* hcurl, curl_infotype type, char* data, size_t
//-------------------------------------------------------------------
// Methods for S3fsCurl
//-------------------------------------------------------------------
S3fsCurl::S3fsCurl(bool ahbe) :
S3fsCurl::S3fsCurl(bool ahbe) :
hCurl(NULL), path(""), base_path(""), saved_path(""), url(""), requestHeaders(NULL),
bodydata(NULL), headdata(NULL), LastResponseCode(-1), postdata(NULL), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_infile(NULL), b_postdata(NULL), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
Expand Down Expand Up @@ -1679,7 +1770,7 @@ bool S3fsCurl::RemakeHandle(void)

case REQTYPE_CHKBUCKET:
curl_easy_setopt(hCurl, CURLOPT_URL, url.c_str());
// XXX
// XXX
//curl_easy_setopt(hCurl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(hCurl, CURLOPT_WRITEDATA, (void*)bodydata);
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
Expand Down Expand Up @@ -1780,8 +1871,8 @@ int S3fsCurl::RequestPerform(void)
for(int retrycnt = S3fsCurl::retries; 0 < retrycnt; retrycnt--){
// Requests
// XXX
//curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)&responseHeaders);
//curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
//curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)&responseHeaders);
//curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, HeaderCallback);
CURLcode curlCode = curl_easy_perform(hCurl);

// Check result
Expand All @@ -1799,7 +1890,7 @@ int S3fsCurl::RequestPerform(void)
if(500 <= LastResponseCode){
S3FS_PRN_INFO3("HTTP response code %ld", LastResponseCode);
sleep(4);
break;
break;
}

// Service response codes which are >= 400 && < 500
Expand Down Expand Up @@ -1829,38 +1920,38 @@ int S3fsCurl::RequestPerform(void)
case CURLE_WRITE_ERROR:
S3FS_PRN_ERR("### CURLE_WRITE_ERROR");
sleep(2);
break;
break;

case CURLE_OPERATION_TIMEDOUT:
S3FS_PRN_ERR("### CURLE_OPERATION_TIMEDOUT");
sleep(2);
break;
break;

case CURLE_COULDNT_RESOLVE_HOST:
S3FS_PRN_ERR("### CURLE_COULDNT_RESOLVE_HOST");
sleep(2);
break;
break;

case CURLE_COULDNT_CONNECT:
S3FS_PRN_ERR("### CURLE_COULDNT_CONNECT");
sleep(4);
break;
break;

case CURLE_GOT_NOTHING:
S3FS_PRN_ERR("### CURLE_GOT_NOTHING");
sleep(4);
break;
break;

case CURLE_ABORTED_BY_CALLBACK:
S3FS_PRN_ERR("### CURLE_ABORTED_BY_CALLBACK");
sleep(4);
S3fsCurl::curl_times[hCurl] = time(0);
break;
break;

case CURLE_PARTIAL_FILE:
S3FS_PRN_ERR("### CURLE_PARTIAL_FILE");
sleep(4);
break;
break;

case CURLE_SEND_ERROR:
S3FS_PRN_ERR("### CURLE_SEND_ERROR");
Expand Down Expand Up @@ -1920,7 +2011,7 @@ int S3fsCurl::RequestPerform(void)
}
S3FS_PRN_INFO3("HTTP response code =%ld", LastResponseCode);

// Let's try to retrieve the
// Let's try to retrieve the
if(404 == LastResponseCode){
return -ENOENT;
}
Expand Down Expand Up @@ -1981,14 +2072,14 @@ string S3fsCurl::CalcSignature(string method, string strMD5, string content_type
FormatString += get_canonical_headers(requestHeaders); // \n has been append

S3FS_PRN_INFO("Format string is : %s", FormatString.c_str());

const unsigned char* sdata = reinterpret_cast<const unsigned char*>(FormatString.data());
int sdata_len = FormatString.size();
unsigned char* md = NULL;
unsigned int md_len = 0;

string format_string_sha1 = s3fs_sha1_hex(sdata, sdata_len, &md, &md_len);
S3FS_PRN_ERR("format string sha1 : %s", format_string_sha1.c_str());
S3FS_PRN_INFO("format string sha1 : %s", format_string_sha1.c_str());
string StringToSign;
StringToSign += string("sha1\n");
StringToSign += q_key_time + "\n";
Expand Down Expand Up @@ -2296,7 +2387,7 @@ int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy)
} else if(STANDARD_IA == GetStorageClass()){
requestHeaders = curl_slist_sort_insert(requestHeaders, "x-cos-storage-class", "STANDARD_IA");
}

string date = get_date_rfc850();
requestHeaders = curl_slist_sort_insert(requestHeaders, "Date", date.c_str());

Expand Down Expand Up @@ -2614,14 +2705,12 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, string
{
S3FS_PRN_INFO("[tpath=%s]", SAFESTRPTR(tpath));

S3FS_PRN_ERR("PreMultipartPostRequest");
if(!tpath){
return -1;
}
if(!CreateCurlHandle(true)){
return -1;
}
S3FS_PRN_ERR("PreMultipartPostRequest1");
string resource;
string turl;
MakeUrlResource(get_realpath(tpath).c_str(), resource, turl);
Expand Down Expand Up @@ -2686,7 +2775,6 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, string
return result;
}

S3FS_PRN_ERR("PreMultipartPostRequest3");
// Parse XML body for UploadId
if(!S3fsCurl::GetUploadId(upload_id)){
delete bodydata;
Expand Down Expand Up @@ -2719,7 +2807,7 @@ int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, string& upload_id,
postContent += " <PartNumber>" + str(cnt + 1) + "</PartNumber>\n";
postContent += " <ETag>\"" + parts[cnt] + "\"</ETag>\n";
postContent += "</Part>\n";
}
}
postContent += "</CompleteMultipartUpload>\n";

// set postdata
Expand Down Expand Up @@ -2962,7 +3050,7 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, string
// request
if(0 == (result = RequestPerform())){
// check etag
// cos's no check etag
// cos's no check etag
// if(NULL != strstr(headdata->str(), upper(partdata.etag).c_str())){
// get etag from response header
S3FS_PRN_ERR("headdata is : %s", headdata->str());
Expand Down Expand Up @@ -3268,12 +3356,12 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
}

//-------------------------------------------------------------------
// Class S3fsMultiCurl
// Class S3fsMultiCurl
//-------------------------------------------------------------------
#define MAX_MULTI_HEADREQ 20 // default: max request count in readdir curl_multi.

//-------------------------------------------------------------------
// Class method for S3fsMultiCurl
// Class method for S3fsMultiCurl
//-------------------------------------------------------------------
int S3fsMultiCurl::max_multireq = MAX_MULTI_HEADREQ;

Expand All @@ -3285,7 +3373,7 @@ int S3fsMultiCurl::SetMaxMultiRequest(int max)
}

//-------------------------------------------------------------------
// method for S3fsMultiCurl
// method for S3fsMultiCurl
//-------------------------------------------------------------------
S3fsMultiCurl::S3fsMultiCurl() : hMulti(NULL), SuccessCallback(NULL), RetryCallback(NULL)
{
Expand Down Expand Up @@ -3334,14 +3422,14 @@ S3fsMultiSuccessCallback S3fsMultiCurl::SetSuccessCallback(S3fsMultiSuccessCallb
SuccessCallback = function;
return old;
}

S3fsMultiRetryCallback S3fsMultiCurl::SetRetryCallback(S3fsMultiRetryCallback function)
{
S3fsMultiRetryCallback old = RetryCallback;
RetryCallback = function;
return old;
}

bool S3fsMultiCurl::SetS3fsCurlObject(S3fsCurl* s3fscurl)
{
if(hMulti){
Expand Down
Loading