Skip to content

Commit

Permalink
chore: update development image (#1855)
Browse files Browse the repository at this point in the history
* chore: refactor spl to loongcollector spl pipeline

* fix

* fix ut

* enable spl in CI

* fix

* fix

* fix

* fix

* fix

* fix

* rename ilogtail to loongcollector

* fix
  • Loading branch information
Abingcbc authored Nov 26, 2024
1 parent aab3058 commit f539907
Show file tree
Hide file tree
Showing 32 changed files with 202 additions and 939 deletions.
14 changes: 1 addition & 13 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail-build-linux:2.0.3
FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/loongcollector-community-edition/loongcollector-build-linux:2.0.5

ARG USERNAME=admin
USER root
Expand All @@ -28,18 +28,6 @@ RUN wget http://mirrors.ustc.edu.cn/gnu/libc/glibc-2.18.tar.gz && \
cd ../ && \
rm -fr glibc-2.18*

# install python3.8
RUN cd /opt && curl -O https://mirrors.aliyun.com/python-release/source/Python-3.8.12.tgz && \
tar -zxvf Python-3.8.12.tgz && cd Python-3.8.12 && \
mkdir /usr/local/python3 && \
./configure --prefix=/usr/local/python3 && \
make clean && make && make install && \
cp /usr/local/python3/bin/python3.8 /usr/bin/python3
# install gcovr
RUN python3 -m pip install --upgrade pip
RUN cp /usr/local/python3/bin/pip3 /usr/bin/pip3 && pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && pip3 install gcovr==7.0
RUN cp /usr/local/python3/bin/gcovr /usr/bin/gcovr

# Create the user
COPY .env /tmp/.env
RUN source /tmp/.env && rm /tmp/.env; \
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-core-ut.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
ENABLE_STATIC_LINK_CRT: ON
WITHOUTGDB: ON
MAKE_JOBS: 16
WITHSPL: ON
# BUILD_TYPE: Debug # TODO: Uncomment when memory management is refined
run: CURRENT_DIR=$(pwd) && sed -i "s|/src|$CURRENT_DIR|g" docker/Dockerfile_build && make core PATH_IN_DOCKER=$(pwd)

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
ENABLE_STATIC_LINK_CRT: ON
WITHOUTGDB: ON
MAKE_JOBS: 16
WITHSPL: ON
run: make core

- name: Check compatibility
Expand Down
2 changes: 1 addition & 1 deletion config_server/service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail-build-linux:2.0.3 as build
FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/loongcollector-community-edition/loongcollector-build-linux:2.0.5 as build

USER root
WORKDIR /src
Expand Down
13 changes: 0 additions & 13 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,6 @@ endif()

# Module includes & set files.
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories("/opt/logtail_spl/include")
if (LINUX)
if (WITHSPL)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/spl)
endif()
endif()

foreach (DIR_NAME ${SUB_DIRECTORIES_LIST})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/${DIR_NAME})
Expand Down Expand Up @@ -183,13 +177,6 @@ elseif(UNIX)
endif()
set(SRC_FILES ${SRC_FILES} ${FRAMEWORK_SOURCE_FILES} ${PLUGIN_SOURCE_FILES_CORE})

# Generate SPL library.
if (LINUX)
if (WITHSPL)
add_subdirectory(spl)
endif()
endif()

# Logtail executable or shared library.
if (BUILD_LOGTAIL)
if (ENABLE_ENTERPRISE)
Expand Down
1 change: 1 addition & 0 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ bool PipelineConfig::Parse() {
mLogstore,
mRegion);
}
mHasNativeProcessor = true;
} else {
mHasNativeProcessor = true;
}
Expand Down
255 changes: 131 additions & 124 deletions core/dependencies.cmake

Large diffs are not rendered by default.

137 changes: 35 additions & 102 deletions core/plugin/processor/ProcessorSPL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,17 @@
#include "plugin/processor/ProcessorSPL.h"

#include <curl/curl.h>
#ifdef FMT_HEADER_ONLY
#undef FMT_HEADER_ONLY
#endif
#include <spl/logger/Logger.h>
#include <spl/pipeline/SplPipeline.h>

#include <iostream>

#include "common/Flags.h"
#include "common/ParamExtractor.h"
#include "logger/Logger.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "spl/PipelineEventGroupInput.h"
#include "spl/PipelineEventGroupOutput.h"
#include "spl/SplConstants.h"

DEFINE_FLAG_INT32(logtail_spl_pipeline_quota, "", 16);
DEFINE_FLAG_INT32(logtail_spl_query_max_size, "", 65536);

using namespace apsara::sls::spl;

namespace logtail {

const std::string ProcessorSPL::sName = "processor_spl";
Expand All @@ -56,54 +46,44 @@ bool ProcessorSPL::Init(const Json::Value& config) {
}
if (!GetOptionalUIntParam(config, "TimeoutMilliSeconds", mTimeoutMills, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mTimeoutMills,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
mContext->GetAlarm(),
errorMsg,
mTimeoutMills,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
if (!GetOptionalUIntParam(config, "MaxMemoryBytes", mMaxMemoryBytes, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mMaxMemoryBytes,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
mContext->GetAlarm(),
errorMsg,
mMaxMemoryBytes,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

PipelineOptions splOptions;
// different parse mode support different spl operators
splOptions.parserMode = parser::ParserMode::LOGTAIL;
// spl pipeline的长度,多少管道
splOptions.pipelineQuota = INT32_FLAG(logtail_spl_pipeline_quota);
// spl pipeline语句的最大长度
splOptions.queryMaxSize = INT32_FLAG(logtail_spl_query_max_size);
// sampling for error
splOptions.errorSampling = true;

// this function is void and has no return
initSPL(&splOptions);

LoggerPtr logger;
logger = sLogger;
Error error;
mSPLPipelinePtr = std::make_shared<apsara::sls::spl::SplPipeline>(
mSpl, error, (u_int64_t)mTimeoutMills, (int64_t)mMaxMemoryBytes, logger);
if (error.code_ != StatusCode::OK) {
mSPLPipelinePtr = std::make_shared<LoongCollectorSplPipeline>();
errorMsg.clear();
ResultCode success = mSPLPipelinePtr->InitLoongCollectorSPL(mSpl,
INT32_FLAG(logtail_spl_pipeline_quota),
INT32_FLAG(logtail_spl_query_max_size),
errorMsg,
mTimeoutMills,
mMaxMemoryBytes);
if (success != ResultCode::OK) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
"failed to parse spl: " + mSpl + " error: " + error.msg_,
"failed to parse spl: " + mSpl + " error: " + errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
mContext->GetRegion());
}

mSplExcuteErrorCount = GetMetricsRecordRef().CreateCounter("proc_spl_excute_error_count");
Expand All @@ -124,10 +104,10 @@ bool ProcessorSPL::Init(const Json::Value& config) {


void ProcessorSPL::Process(PipelineEventGroup& logGroup) {
LOG_ERROR(
sLogger,
("ProcessorSPL error", "unexpected enter in ProcessorSPL::Process(PipelineEventGroup& logGroup)")("project", mContext->GetProjectName())("logstore", mContext->GetLogstoreName())(
"region", mContext->GetRegion())("configName", mContext->GetConfigName()));
LOG_ERROR(sLogger,
("ProcessorSPL error", "unexpected enter in ProcessorSPL::Process(PipelineEventGroup& logGroup)")(
"project", mContext->GetProjectName())("logstore", mContext->GetLogstoreName())(
"region", mContext->GetRegion())("configName", mContext->GetConfigName()));
}


Expand All @@ -139,55 +119,14 @@ void ProcessorSPL::Process(std::vector<PipelineEventGroup>& logGroupList) {
PipelineEventGroup logGroup = std::move(logGroupList[0]);
std::vector<PipelineEventGroup>().swap(logGroupList);

std::vector<std::string> colNames{FIELD_CONTENT};
// 根据spip->getInputSearches(),设置input数组
std::vector<Input*> inputs;
for (const auto& search : mSPLPipelinePtr->getInputSearches()) {
(void)search; //-Wunused-variable
PipelineEventGroupInput* input = new PipelineEventGroupInput(colNames, logGroup, *mContext);
if (!input) {
logGroupList.emplace_back(std::move(logGroup));
for (auto& input : inputs) {
delete input;
}
return;
}
inputs.push_back(input);
}
// 根据spip->getOutputLabels(),设置output数组
std::vector<Output*> outputs;
for (const auto& resultTaskLabel : mSPLPipelinePtr->getOutputLabels()) {
PipelineEventGroupOutput* output = new PipelineEventGroupOutput(logGroup, logGroupList, *mContext, resultTaskLabel);
if (!output) {
logGroupList.emplace_back(std::move(logGroup));
for (auto& input : inputs) {
delete input;
}
for (auto& output : outputs) {
delete output;
}
return;
}
outputs.emplace_back(output);
}

// 开始调用pipeline.execute
// 传入inputs, outputs
// 输出pipelineStats, error
PipelineStats pipelineStats;
auto errCode = mSPLPipelinePtr->execute(inputs, outputs, &errorMsg, &pipelineStats);
ResultCode result = mSPLPipelinePtr->Execute(std::move(logGroup), logGroupList, pipelineStats, mContext);

if (errCode != StatusCode::OK) {
LOG_ERROR(
sLogger,
("execute error", errorMsg)("project", mContext->GetProjectName())("logstore", mContext->GetLogstoreName())(
"region", mContext->GetRegion())("configName", mContext->GetConfigName()));
if (result != ResultCode::OK) {
mSplExcuteErrorCount->Add(1);
// 出现错误,把原数据放回来
logGroupList.emplace_back(std::move(logGroup));
if (errCode == StatusCode::TIMEOUT_ERROR) {
if (result == ResultCode::TIMEOUT_ERROR) {
mSplExcuteTimeoutErrorCount->Add(1);
} else if (errCode == StatusCode::MEM_EXCEEDED) {
} else if (result == ResultCode::MEM_EXCEEDED) {
mSplExcuteMemoryExceedErrorCount->Add(1);
}
} else {
Expand All @@ -200,12 +139,6 @@ void ProcessorSPL::Process(std::vector<PipelineEventGroup>& logGroupList) {
mFailTaskCount->Add(pipelineStats.failTaskCount_);
}

for (auto& input : inputs) {
delete input;
}
for (auto& output : outputs) {
delete output;
}
return;
}

Expand Down
9 changes: 3 additions & 6 deletions core/plugin/processor/ProcessorSPL.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

#include <string>

#include "spl/LoongCollectorSplPipeline.h"
#include "monitor/MetricManager.h"
#include "pipeline/plugin/interface/Processor.h"

namespace apsara::sls::spl {
class SplPipeline;
}

namespace logtail {

class ProcessorSPL : public Processor {
Expand All @@ -33,7 +30,7 @@ class ProcessorSPL : public Processor {
// Source field name.
std::string mSpl;
uint32_t mTimeoutMills = 1000;
uint32_t mMaxMemoryBytes = 50*1024*1024;
uint32_t mMaxMemoryBytes = 50 * 1024 * 1024;

bool Init(const Json::Value& config) override;
void Process(std::vector<PipelineEventGroup>& logGroupList) override;
Expand All @@ -43,7 +40,7 @@ class ProcessorSPL : public Processor {
bool IsSupportedEvent(const PipelineEventPtr& e) const override;

private:
std::shared_ptr<apsara::sls::spl::SplPipeline> mSPLPipelinePtr;
std::shared_ptr<LoongCollectorSplPipeline> mSPLPipelinePtr;

CounterPtr mSplExcuteErrorCount;
CounterPtr mSplExcuteTimeoutErrorCount;
Expand Down
1 change: 0 additions & 1 deletion core/plugin/processor/links.cmake
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ macro(processor_link target_name link_withspl)
link_re2(${target_name})
if (LINUX AND ${link_withspl})
link_spl(${target_name})
target_link_libraries(${target_name} spl)
endif ()
link_ssl(${target_name}) # must after link_spl
link_crypto(${target_name}) # must after link_spl
Expand Down
23 changes: 0 additions & 23 deletions core/spl/CMakeLists.txt

This file was deleted.

Loading

0 comments on commit f539907

Please sign in to comment.