From 8d9888d317a7cee416a0ea80fec97ccafd809cdc Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 18 Dec 2024 10:58:23 +0800 Subject: [PATCH 1/2] fix: use released pipeline pointer --- core/runner/ProcessorRunner.cpp | 7 ++++++- core/runner/sink/http/HttpSink.cpp | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index b8f18ad1a5..4bbef88f57 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -124,7 +124,8 @@ void ProcessorRunner::Run(uint32_t threadNo) { sInGroupDataSizeBytes->Add(item->mEventGroup.DataSize()); shared_ptr& pipeline = item->mPipeline; - if (!pipeline) { + bool hasOldPipeline = pipeline != nullptr; + if (!hasOldPipeline) { pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); } if (!pipeline) { @@ -139,6 +140,10 @@ void ProcessorRunner::Run(uint32_t threadNo) { vector eventGroupList; eventGroupList.emplace_back(std::move(item->mEventGroup)); pipeline->Process(eventGroupList, item->mInputIndex); + // if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline + if (hasOldPipeline) { + pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); + } if (pipeline->IsFlushingThroughGoPipeline()) { // TODO: diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 2bd2e77cb9..6777e3d3b0 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -238,6 +238,7 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { CURL* handler = msg->easy_handle; HttpSinkRequest* request = nullptr; curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request); + auto pipelinePlaceHolder = request->mItem->mPipeline; // keep pipeline alive auto responseTime = chrono::system_clock::now() - request->mLastSendTime; auto responseTimeMs = chrono::duration_cast(responseTime).count(); switch (msg->data.result) { From e85abc1aedfab958be525b82ff34caa1b97fdd83 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 18 Dec 2024 11:49:07 +0800 Subject: [PATCH 2/2] fix --- core/runner/ProcessorRunner.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 4bbef88f57..76c34afe28 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -143,6 +143,12 @@ void ProcessorRunner::Run(uint32_t threadNo) { // if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline if (hasOldPipeline) { pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); + if (!pipeline) { + LOG_INFO(sLogger, + ("pipeline not found during processing, perhaps due to config deletion", + "discard data")("config", configName)); + continue; + } } if (pipeline->IsFlushingThroughGoPipeline()) {