Skip to content

Commit

Permalink
fix: the lifetime error of StreamScraper object when deconstruct Scra…
Browse files Browse the repository at this point in the history
…peScheduler (#2023)
  • Loading branch information
catdogpandas authored Jan 9, 2025
1 parent 2c91170 commit 2cc1c9e
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 71 deletions.
10 changes: 5 additions & 5 deletions core/ebpf/handler/ObserveHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void OtelMeterHandler::handle(const std::vector<std::unique_ptr<ApplicationBatch
continue;
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) != QueueStatus::OK) {
LOG_WARNING(sLogger,
("configName", mCtx->GetConfigName())("pluginIdx",
mPluginIdx)("[Otel Metrics] push queue failed!", ""));
Expand Down Expand Up @@ -121,7 +121,7 @@ void OtelSpanHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchS
continue;
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) != QueueStatus::OK) {
LOG_WARNING(
sLogger,
("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("[Span] push queue failed!", ""));
Expand Down Expand Up @@ -159,7 +159,7 @@ void EventHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchEven
continue;
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) != QueueStatus::OK) {
LOG_WARNING(
sLogger,
("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("[Event] push queue failed!", ""));
Expand Down Expand Up @@ -276,7 +276,7 @@ void ArmsSpanHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchS
continue;
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) != QueueStatus::OK) {
LOG_WARNING(
sLogger,
("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("[Span] push queue failed!", ""));
Expand Down Expand Up @@ -321,7 +321,7 @@ void ArmsMeterHandler::handle(const std::vector<std::unique_ptr<ApplicationBatch
continue;
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) != QueueStatus::OK) {
LOG_WARNING(
sLogger,
("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("[Metrics] push queue failed!", ""));
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/handler/SecurityHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void SecurityHandler::handle(std::vector<std::unique_ptr<AbstractSecurityEvent>>
std::unique_ptr<ProcessQueueItem> item
= std::unique_ptr<ProcessQueueItem>(new ProcessQueueItem(std::move(event_group), mPluginIdx));

if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) != QueueStatus::OK) {
LOG_WARNING(
sLogger,
("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("Push queue failed!", events.size()));
Expand Down
8 changes: 4 additions & 4 deletions core/pipeline/queue/ExactlyOnceQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,16 @@ bool ExactlyOnceQueueManager::IsValidToPushProcessQueue(QueueKey key) const {
return iter->second->IsValidToPush();
}

int ExactlyOnceQueueManager::PushProcessQueue(QueueKey key, unique_ptr<ProcessQueueItem>&& item) {
QueueStatus ExactlyOnceQueueManager::PushProcessQueue(QueueKey key, unique_ptr<ProcessQueueItem>&& item) {
lock_guard<mutex> lock(mProcessQueueMux);
auto iter = mProcessQueues.find(key);
if (iter == mProcessQueues.end()) {
return 2;
return QueueStatus::QUEUE_NOT_EXIST;
}
if (!iter->second->Push(std::move(item))) {
return 1;
return QueueStatus::QUEUE_FULL;
}
return 0;
return QueueStatus::OK;
}

bool ExactlyOnceQueueManager::IsAllProcessQueueEmpty() const {
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ExactlyOnceQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ExactlyOnceQueueManager {

bool IsValidToPushProcessQueue(QueueKey key) const;
// 0: success, 1: queue is full, 2: queue not found
int PushProcessQueue(QueueKey key, std::unique_ptr<ProcessQueueItem>&& item);
QueueStatus PushProcessQueue(QueueKey key, std::unique_ptr<ProcessQueueItem>&& item);
bool IsAllProcessQueueEmpty() const;
void DisablePopProcessQueue(const std::string& configName, bool isPipelineRemoving);
void EnablePopProcessQueue(const std::string& configName);
Expand Down
10 changes: 5 additions & 5 deletions core/pipeline/queue/ProcessQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,23 @@ bool ProcessQueueManager::IsValidToPush(QueueKey key) const {
return ExactlyOnceQueueManager::GetInstance()->IsValidToPushProcessQueue(key);
}

int ProcessQueueManager::PushQueue(QueueKey key, unique_ptr<ProcessQueueItem>&& item) {
QueueStatus ProcessQueueManager::PushQueue(QueueKey key, unique_ptr<ProcessQueueItem>&& item) {
{
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
if (!(*iter->second.first)->Push(std::move(item))) {
return 1;
return QueueStatus::QUEUE_FULL;
}
} else {
int res = ExactlyOnceQueueManager::GetInstance()->PushProcessQueue(key, std::move(item));
if (res != 0) {
auto res = ExactlyOnceQueueManager::GetInstance()->PushProcessQueue(key, std::move(item));
if (res != QueueStatus::OK) {
return res;
}
}
}
Trigger();
return 0;
return QueueStatus::OK;
}

bool ProcessQueueManager::PopItem(int64_t threadNo, unique_ptr<ProcessQueueItem>& item, string& configName) {
Expand Down
4 changes: 3 additions & 1 deletion core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

namespace logtail {

enum class QueueStatus { OK, QUEUE_FULL, QUEUE_NOT_EXIST };

class ProcessQueueManager : public FeedbackInterface {
public:
using ProcessQueueIterator = std::list<std::unique_ptr<ProcessQueueInterface>>::iterator;
Expand All @@ -58,7 +60,7 @@ class ProcessQueueManager : public FeedbackInterface {
bool DeleteQueue(QueueKey key);
bool IsValidToPush(QueueKey key) const;
// 0: success, 1: queue is full, 2: queue not found
int PushQueue(QueueKey key, std::unique_ptr<ProcessQueueItem>&& item);
QueueStatus PushQueue(QueueKey key, std::unique_ptr<ProcessQueueItem>&& item);
bool PopItem(int64_t threadNo, std::unique_ptr<ProcessQueueItem>& item, std::string& configName);
bool IsAllQueueEmpty() const;
bool SetDownStreamQueues(QueueKey key, std::vector<BoundedSenderQueueInterface*>&& ques);
Expand Down
14 changes: 9 additions & 5 deletions core/prometheus/component/StreamScraper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

#include "Flags.h"
#include "Labels.h"
#include "Logger.h"
#include "common/StringTools.h"
#include "models/PipelineEventGroup.h"
#include "pipeline/queue/ProcessQueueItem.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "prometheus/Utils.h"
#include "runner/ProcessorRunner.h"

DEFINE_FLAG_INT64(prom_stream_bytes_size, "stream bytes size", 1024 * 1024);

Expand Down Expand Up @@ -85,7 +87,12 @@ void StreamScraper::PushEventGroup(PipelineEventGroup&& eGroup) const {
return;
#endif
while (true) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) == 0) {
auto res = ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item));
if (res == QueueStatus::OK) {
break;
}
if (res == QueueStatus::QUEUE_NOT_EXIST) {
LOG_DEBUG(sLogger, ("prometheus stream scraper", "queue not exist"));
break;
}
usleep(10 * 1000);
Expand Down Expand Up @@ -126,8 +133,5 @@ void StreamScraper::SetAutoMetricMeta(double scrapeDurationSeconds, bool upState
std::string StreamScraper::GetId() {
return mHash;
}
void StreamScraper::SetScrapeTime(std::chrono::system_clock::time_point scrapeTime) {
mScrapeTimestampMilliSec
= std::chrono::duration_cast<std::chrono::milliseconds>(scrapeTime.time_since_epoch()).count();
}

} // namespace logtail::prom
24 changes: 17 additions & 7 deletions core/prometheus/component/StreamScraper.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>
#include <string>
#include <utility>

#include "Labels.h"
#include "models/PipelineEventGroup.h"
Expand All @@ -16,25 +17,30 @@
namespace logtail::prom {
class StreamScraper {
public:
StreamScraper(Labels labels, QueueKey queueKey, size_t inputIndex)
StreamScraper(Labels labels,
QueueKey queueKey,
size_t inputIndex,
std::string hash,
EventPool* eventPool,
std::chrono::system_clock::time_point scrapeTime)
: mEventGroup(PipelineEventGroup(std::make_shared<SourceBuffer>())),
mHash(std::move(hash)),
mEventPool(eventPool),
mQueueKey(queueKey),
mInputIndex(inputIndex),
mTargetLabels(std::move(labels)) {}
mTargetLabels(std::move(labels)) {
mScrapeTimestampMilliSec
= std::chrono::duration_cast<std::chrono::milliseconds>(scrapeTime.time_since_epoch()).count();
}

static size_t MetricWriteCallback(char* buffer, size_t size, size_t nmemb, void* data);
void FlushCache();
void SendMetrics();
void Reset();
void SetAutoMetricMeta(double scrapeDurationSeconds, bool upState, const std::string& scrapeState);

void SetScrapeTime(std::chrono::system_clock::time_point scrapeTime);

std::string mHash;
size_t mRawSize = 0;
uint64_t mStreamIndex = 0;
uint64_t mScrapeSamplesScraped = 0;
EventPool* mEventPool = nullptr;

private:
void AddEvent(const char* line, size_t len);
Expand All @@ -46,6 +52,10 @@ class StreamScraper {
std::string mCache;
PipelineEventGroup mEventGroup;

std::string mHash;
uint64_t mScrapeSamplesScraped = 0;
EventPool* mEventPool = nullptr;

// pipeline
QueueKey mQueueKey;
size_t mInputIndex;
Expand Down
30 changes: 15 additions & 15 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,18 @@ ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
Labels labels,
QueueKey queueKey,
size_t inputIndex)
: mPromStreamScraper(labels, queueKey, inputIndex),
mScrapeConfigPtr(std::move(scrapeConfigPtr)),
: mScrapeConfigPtr(std::move(scrapeConfigPtr)),
mHost(std::move(host)),
mPort(port),
mQueueKey(queueKey) {
mQueueKey(queueKey),
mInputIndex(inputIndex),
mTargetLabels(labels) {
string tmpTargetURL = mScrapeConfigPtr->mScheme + "://" + mHost + ":" + ToString(mPort)
+ mScrapeConfigPtr->mMetricsPath
+ (mScrapeConfigPtr->mQueryString.empty() ? "" : "?" + mScrapeConfigPtr->mQueryString);
mHash = mScrapeConfigPtr->mJobName + tmpTargetURL + ToString(labels.Hash());
mInstance = mHost + ":" + ToString(mPort);
mInterval = mScrapeConfigPtr->mScrapeIntervalSeconds;

mPromStreamScraper.mHash = mHash;
}

void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
Expand All @@ -66,9 +65,10 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
auto scrapeTimestampMilliSec
= chrono::duration_cast<chrono::milliseconds>(mLatestScrapeTime.time_since_epoch()).count();
auto scrapeDurationMilliSeconds = now - scrapeTimestampMilliSec;
auto* streamScraper = response.GetBody<prom::StreamScraper>();

mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL, response.GetStatusCode());
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), mPromStreamScraper.mRawSize);
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), streamScraper->mRawSize);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, response.GetStatusCode(), scrapeDurationMilliSeconds);

const auto& networkStatus = response.GetNetworkStatus();
Expand All @@ -93,11 +93,11 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {

auto mScrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
auto mUpState = response.GetStatusCode() == 200;
mPromStreamScraper.mStreamIndex++;
mPromStreamScraper.FlushCache();
mPromStreamScraper.SetAutoMetricMeta(mScrapeDurationSeconds, mUpState, scrapeState);
mPromStreamScraper.SendMetrics();
mPromStreamScraper.Reset();
streamScraper->mStreamIndex++;
streamScraper->FlushCache();
streamScraper->SetAutoMetricMeta(mScrapeDurationSeconds, mUpState, scrapeState);
streamScraper->SendMetrics();
streamScraper->Reset();

mPluginTotalDelayMs->Add(scrapeDurationMilliSeconds);
}
Expand All @@ -110,7 +110,6 @@ string ScrapeScheduler::GetId() const {
void ScrapeScheduler::SetComponent(shared_ptr<Timer> timer, EventPool* eventPool) {
mTimer = std::move(timer);
mEventPool = eventPool;
mPromStreamScraper.mEventPool = mEventPool;
}

void ScrapeScheduler::ScheduleNext() {
Expand Down Expand Up @@ -144,7 +143,6 @@ void ScrapeScheduler::ScheduleNext() {
mIsContextValidFuture = isContextValidFuture;
}

mPromStreamScraper.SetScrapeTime(mLatestScrapeTime);
auto event = BuildScrapeTimerEvent(GetNextExecTime());
mTimer->PushEvent(std::move(event));
}
Expand All @@ -167,7 +165,7 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
if (retry > 0) {
retry -= 1;
}
mPromStreamScraper.SetScrapeTime(mLatestScrapeTime);

auto request = std::make_unique<PromHttpRequest>(
HTTP_GET,
mScrapeConfigPtr->mScheme == prometheus::HTTPS,
Expand All @@ -178,7 +176,9 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
&mPromStreamScraper, [](void*) {}, prom::StreamScraper::MetricWriteCallback),
new prom::StreamScraper(mTargetLabels, mQueueKey, mInputIndex, mHash, mEventPool, mLatestScrapeTime),
[](void* p) { delete static_cast<prom::StreamScraper*>(p); },
prom::StreamScraper::MetricWriteCallback),
mScrapeConfigPtr->mScrapeTimeoutSeconds,
retry,
this->mFuture,
Expand Down
5 changes: 3 additions & 2 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class ScrapeScheduler : public BaseScheduler {
private:
std::unique_ptr<TimerEvent> BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime);

prom::StreamScraper mPromStreamScraper;

std::shared_ptr<ScrapeConfig> mScrapeConfigPtr;
std::string mHash;
std::string mHost;
Expand All @@ -68,6 +66,9 @@ class ScrapeScheduler : public BaseScheduler {

// pipeline
QueueKey mQueueKey;
size_t mInputIndex;

Labels mTargetLabels;

// auto metrics
uint64_t mScrapeTimestampMilliSec = 0;
Expand Down
2 changes: 1 addition & 1 deletion core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void ProcessorRunner::Stop() {
bool ProcessorRunner::PushQueue(QueueKey key, size_t inputIndex, PipelineEventGroup&& group, uint32_t retryTimes) {
unique_ptr<ProcessQueueItem> item = make_unique<ProcessQueueItem>(std::move(group), inputIndex);
for (size_t i = 0; i < retryTimes; ++i) {
if (ProcessQueueManager::GetInstance()->PushQueue(key, std::move(item)) == 0) {
if (ProcessQueueManager::GetInstance()->PushQueue(key, std::move(item)) == QueueStatus::OK) {
return true;
}
if (i % 100 == 0) {
Expand Down
Loading

0 comments on commit 2cc1c9e

Please sign in to comment.