Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 30, 2024
1 parent 11b93da commit 891c75c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 50 deletions.
13 changes: 13 additions & 0 deletions core/runner/sink/http/HttpSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ using namespace std;

namespace logtail {

HttpSink* HttpSink::GetInstance() {
#ifndef APSARA_UNIT_TEST_MAIN
static HttpSink instance;
return &instance;
#else
return HttpSinkMock::GetInstance();
#endif
}

bool HttpSink::Init() {
#ifndef APSARA_UNIT_TEST_MAIN
mClient = curl_multi_init();
Expand Down Expand Up @@ -68,6 +77,7 @@ bool HttpSink::Init() {
}

void HttpSink::Stop() {
#ifdef APSARA_UNIT_TEST_MAIN
mIsFlush = true;
if (!mThreadRes.valid()) {
return;
Expand All @@ -78,6 +88,9 @@ void HttpSink::Stop() {
} else {
LOG_WARNING(sLogger, ("http sink", "forced to stopped"));
}
#else
HttpSinkMock::GetInstance()->Stop();
#endif
}

void HttpSink::Run() {
Expand Down
7 changes: 2 additions & 5 deletions core/runner/sink/http/HttpSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,19 @@
#include "monitor/MetricManager.h"
#include "runner/sink/Sink.h"
#include "runner/sink/http/HttpSinkRequest.h"

namespace logtail {

class HttpSink : public Sink<HttpSinkRequest> {
public:
HttpSink(const HttpSink&) = delete;
HttpSink& operator=(const HttpSink&) = delete;

static HttpSink* GetInstance() {
static HttpSink instance;
return &instance;
}
static HttpSink* GetInstance();

bool Init() override;
void Stop() override;

// rewrite for unittest
bool AddRequest(std::unique_ptr<HttpSinkRequest>&& request);

private:
Expand Down
10 changes: 2 additions & 8 deletions core/unittest/pipeline/HttpSinkMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class HttpSinkMock : public HttpSink {
} else {
LOG_WARNING(sLogger, ("http sink mock", "forced to stopped"));
}
ClearRequests();
}

void Run() {
Expand All @@ -61,16 +62,10 @@ class HttpSinkMock : public HttpSink {
if (mQueue.WaitAndPop(request, 500)) {
{
std::lock_guard<std::mutex> lock(mMutex);
std::string logstore = "default";
if (static_cast<HttpFlusher*>(request->mItem->mFlusher)->Name().find("sls") != std::string::npos) {
auto flusher = static_cast<FlusherSLS*>(request->mItem->mFlusher);
logstore = flusher->mLogstore;
}
mRequests.push_back(*(request->mItem));
}
request->mResponse.SetStatusCode(200);
request->mResponse.mHeader[sdk::X_LOG_REQUEST_ID] = "request_id";
static_cast<SLSSenderQueueItem*>(request->mItem)->mExactlyOnceCheckpoint = nullptr;
static_cast<HttpFlusher*>(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem);
FlusherRunner::GetInstance()->DecreaseHttpSendingCnt();
request.reset();
Expand All @@ -83,8 +78,7 @@ class HttpSinkMock : public HttpSink {
}

bool AddRequest(std::unique_ptr<HttpSinkRequest>&& request) {
mQueue.Push(std::move(request));
return true;
return Sink<HttpSinkRequest>::AddRequest(std::move(request));
}

std::vector<SenderQueueItem>& GetRequests() {
Expand Down
Loading

0 comments on commit 891c75c

Please sign in to comment.