Skip to content

Commit

Permalink
support custom http response body and body data write callback
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Oct 28, 2024
1 parent 19c681f commit a438de2
Show file tree
Hide file tree
Showing 24 changed files with 395 additions and 260 deletions.
2 changes: 1 addition & 1 deletion core/common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ endif ()
list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES})
# add memory in common
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpRequest.cpp)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/compression/Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/CompressorFactory.cpp ${CMAKE_SOURCE_DIR}/common/compression/LZ4Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/ZstdCompressor.cpp)
# remove several files in common
Expand Down
2 changes: 1 addition & 1 deletion core/common/http/AsynCurlRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) {
case CURLE_OK: {
long statusCode = 0;
curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode);
request->mResponse.mStatusCode = (int32_t)statusCode;
request->mResponse.SetStatusCode(statusCode);
request->OnSendDone(request->mResponse);
LOG_DEBUG(sLogger,
("send http request succeeded, request address",
Expand Down
19 changes: 4 additions & 15 deletions core/common/http/Curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@ using namespace std;

namespace logtail {

static size_t data_write_callback(char* buffer, size_t size, size_t nmemb, string* write_buf) {
unsigned long sizes = size * nmemb;

if (buffer == NULL) {
return 0;
}

write_buf->append(buffer, sizes);
return sizes;
}

static size_t header_write_callback(char* buffer,
size_t size,
size_t nmemb,
Expand Down Expand Up @@ -123,9 +112,9 @@ CURL* CreateCurlHandler(const std::string& method,
curl_easy_setopt(curl, CURLOPT_INTERFACE, intf.c_str());
}

curl_easy_setopt(curl, CURLOPT_WRITEDATA, &(response.mBody));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, data_write_callback);
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &(response.mHeader));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, response.mBody.get());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, response.mWriteCallback);
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &(response.GetHeader()));
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_write_callback);

curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
Expand Down Expand Up @@ -162,7 +151,7 @@ bool SendHttpRequest(std::unique_ptr<HttpRequest>&& request, HttpResponse& respo
if (res == CURLE_OK) {
long http_code = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
response.mStatusCode = (int32_t)http_code;
response.SetStatusCode(http_code);
success = true;
break;
} else if (request->mTryCnt < request->mMaxTryCnt) {
Expand Down
22 changes: 22 additions & 0 deletions core/common/http/HttpRequest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/http/HttpRequest.h"

DEFINE_FLAG_INT32(default_http_request_timeout_secs, "", 15);
DEFINE_FLAG_INT32(default_http_request_max_try_cnt, "", 3);

using namespace std;

namespace logtail {} // namespace logtail
25 changes: 14 additions & 11 deletions core/common/http/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
#include <map>
#include <string>

#include "common/Flags.h"
#include "common/http/HttpResponse.h"

namespace logtail {
DECLARE_FLAG_INT32(default_http_request_timeout_secs);
DECLARE_FLAG_INT32(default_http_request_max_try_cnt);

static constexpr uint32_t sDefaultTimeoutSec = 15;
static constexpr uint32_t sDefaultMaxTryCnt = 3;
namespace logtail {

struct HttpRequest {
std::string mMethod;
Expand All @@ -40,8 +41,8 @@ struct HttpRequest {
std::string mBody;
std::string mHost;
int32_t mPort;
uint32_t mTimeout = sDefaultTimeoutSec;
uint32_t mMaxTryCnt = sDefaultMaxTryCnt;
uint32_t mTimeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_secs));
uint32_t mMaxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt));

uint32_t mTryCnt = 1;
std::chrono::system_clock::time_point mLastSendTime;
Expand All @@ -54,8 +55,8 @@ struct HttpRequest {
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
uint32_t timeout = sDefaultTimeoutSec,
uint32_t maxTryCnt = sDefaultMaxTryCnt)
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_secs)),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt)))
: mMethod(method),
mHTTPSFlag(httpsFlag),
mUrl(url),
Expand All @@ -82,12 +83,14 @@ struct AsynHttpRequest : public HttpRequest {
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
uint32_t timeout = sDefaultTimeoutSec,
uint32_t maxTryCnt = sDefaultMaxTryCnt)
: HttpRequest(method, httpsFlag, host, port, url, query, header, body, timeout, maxTryCnt) {}
HttpResponse&& response = HttpResponse(),
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_secs)),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt)))
: HttpRequest(method, httpsFlag, host, port, url, query, header, body, timeout, maxTryCnt),
mResponse(std::move(response)) {}

virtual bool IsContextValid() const = 0;
virtual void OnSendDone(const HttpResponse& response) = 0;
virtual void OnSendDone(HttpResponse& response) = 0;
};


Expand Down
11 changes: 11 additions & 0 deletions core/common/http/HttpResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,15 @@ bool compareHeader(const std::string& lhs, const std::string& rhs) {
return lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), caseInsensitiveComp);
}

size_t DefaultWriteCallback(char* buffer, size_t size, size_t nmemb, void* data) {
unsigned long sizes = size * nmemb;

if (buffer == NULL) {
return 0;
}

static_cast<string*>(data)->append(buffer, sizes);
return sizes;
}

} // namespace logtail
53 changes: 49 additions & 4 deletions core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,66 @@
#pragma once

#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <string>

class curl_slist;

namespace logtail {

bool caseInsensitiveComp(const char lhs, const char rhs);

bool compareHeader(const std::string& lhs, const std::string& rhs);

struct HttpResponse {
size_t DefaultWriteCallback(char* buffer, size_t size, size_t nmemb, void* data);

class HttpResponse {
friend void* CreateCurlHandler(const std::string& method,
bool httpsFlag,
const std::string& host,
int32_t port,
const std::string& url,
const std::string& queryString,
const std::map<std::string, std::string>& header,
const std::string& body,
HttpResponse& response,
curl_slist*& headers,
uint32_t timeout,
bool replaceHostWithIp,
const std::string& intf);

public:
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
: mHeader(compareHeader), mBody(body, bodyDeleter), mWriteCallback(callback) {}

int32_t GetStatusCode() const { return mStatusCode; }
const std::map<std::string, std::string, decltype(compareHeader)*>& GetHeader() const { return mHeader; }

template <class T>
const T* GetBody() const {
return static_cast<const T*>(mBody.get());
}

template <class T>
T* GetBody() {
return static_cast<T*>(mBody.get());
}

void SetStatusCode(int32_t code) { mStatusCode = code; }

private:
int32_t mStatusCode = 0; // 0 means no response from server
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::string mBody;

HttpResponse(): mHeader(compareHeader) {}
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;
};

} // namespace logtail
Loading

0 comments on commit a438de2

Please sign in to comment.