Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update development image #1855

Merged
merged 14 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail-build-linux:2.0.3
FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail-build-linux:2.0.5

ARG USERNAME=admin
USER root
Expand All @@ -28,18 +28,6 @@ RUN wget http://mirrors.ustc.edu.cn/gnu/libc/glibc-2.18.tar.gz && \
cd ../ && \
rm -fr glibc-2.18*

# install python3.8
RUN cd /opt && curl -O https://mirrors.aliyun.com/python-release/source/Python-3.8.12.tgz && \
tar -zxvf Python-3.8.12.tgz && cd Python-3.8.12 && \
mkdir /usr/local/python3 && \
./configure --prefix=/usr/local/python3 && \
make clean && make && make install && \
cp /usr/local/python3/bin/python3.8 /usr/bin/python3
# install gcovr
RUN python3 -m pip install --upgrade pip
RUN cp /usr/local/python3/bin/pip3 /usr/bin/pip3 && pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && pip3 install gcovr==7.0
RUN cp /usr/local/python3/bin/gcovr /usr/bin/gcovr

# Create the user
COPY .env /tmp/.env
RUN source /tmp/.env && rm /tmp/.env; \
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-core-ut.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
ENABLE_STATIC_LINK_CRT: ON
WITHOUTGDB: ON
MAKE_JOBS: 16
WITHSPL: ON
# BUILD_TYPE: Debug # TODO: Uncomment when memory management is refined
run: CURRENT_DIR=$(pwd) && sed -i "s|/src|$CURRENT_DIR|g" docker/Dockerfile_build && make core PATH_IN_DOCKER=$(pwd)

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
ENABLE_STATIC_LINK_CRT: ON
WITHOUTGDB: ON
MAKE_JOBS: 16
WITHSPL: ON
run: make core

- name: Check compatibility
Expand Down
2 changes: 1 addition & 1 deletion config_server/service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail-build-linux:2.0.3 as build
FROM sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail-build-linux:2.0.5 as build

USER root
WORKDIR /src
Expand Down
18 changes: 5 additions & 13 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ endif ()
option(BUILD_LOGTAIL "Build Logtail executable and tools" ON)
# Used under the Android environment.
option(BUILD_LOGTAIL_SHARED_LIBRARY "Build Logtail shared library")
option(BUILD_LOGTAIL_SPL_LIBRARY "Build Logtail SPL library")
option(ENABLE_ENTERPRISE "enable enterprise feature")
cmake_dependent_option(ENABLE_COMPATIBLE_MODE "Build Logtail in compatible mode (for low version Linux)" OFF "LINUX" OFF)
cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT statically" OFF "LINUX" OFF)
Expand Down Expand Up @@ -139,12 +140,6 @@ endif()

# Module includes & set files.
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories("/opt/logtail_spl/include")
if (LINUX)
if (WITHSPL)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/spl)
endif()
endif()

foreach (DIR_NAME ${SUB_DIRECTORIES_LIST})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/${DIR_NAME})
Expand Down Expand Up @@ -182,13 +177,6 @@ elseif(UNIX)
endif()
set(SRC_FILES ${SRC_FILES} ${FRAMEWORK_SOURCE_FILES} ${PLUGIN_SOURCE_FILES_CORE})

# Generate SPL library.
if (LINUX)
if (WITHSPL)
add_subdirectory(spl)
endif()
endif()

# Logtail executable or shared library.
if (BUILD_LOGTAIL)
if (ENABLE_ENTERPRISE)
Expand Down Expand Up @@ -222,6 +210,10 @@ if (BUILD_LOGTAIL_SHARED_LIBRARY)
endif()
endif ()

if (BUILD_LOGTAIL_SPL_LIBRARY)
add_subdirectory(spl)
endif ()

# Generate independent libraries.
add_subdirectory(go_pipeline)
add_subdirectory(common)
Expand Down
257 changes: 132 additions & 125 deletions core/dependencies.cmake

Large diffs are not rendered by default.

137 changes: 35 additions & 102 deletions core/plugin/processor/ProcessorSPL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,17 @@
#include "plugin/processor/ProcessorSPL.h"

#include <curl/curl.h>
#ifdef FMT_HEADER_ONLY
#undef FMT_HEADER_ONLY
#endif
#include <spl/logger/Logger.h>
#include <spl/pipeline/SplPipeline.h>

#include <iostream>

#include "common/Flags.h"
#include "common/ParamExtractor.h"
#include "logger/Logger.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "spl/PipelineEventGroupInput.h"
#include "spl/PipelineEventGroupOutput.h"
#include "spl/SplConstants.h"

DEFINE_FLAG_INT32(logtail_spl_pipeline_quota, "", 16);
DEFINE_FLAG_INT32(logtail_spl_query_max_size, "", 65536);

using namespace apsara::sls::spl;

namespace logtail {

const std::string ProcessorSPL::sName = "processor_spl";
Expand All @@ -56,54 +46,44 @@ bool ProcessorSPL::Init(const Json::Value& config) {
}
if (!GetOptionalUIntParam(config, "TimeoutMilliSeconds", mTimeoutMills, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mTimeoutMills,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
mContext->GetAlarm(),
errorMsg,
mTimeoutMills,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
if (!GetOptionalUIntParam(config, "MaxMemoryBytes", mMaxMemoryBytes, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mMaxMemoryBytes,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
mContext->GetAlarm(),
errorMsg,
mMaxMemoryBytes,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

PipelineOptions splOptions;
// different parse mode support different spl operators
splOptions.parserMode = parser::ParserMode::LOGTAIL;
// spl pipeline的长度,多少管道
splOptions.pipelineQuota = INT32_FLAG(logtail_spl_pipeline_quota);
// spl pipeline语句的最大长度
splOptions.queryMaxSize = INT32_FLAG(logtail_spl_query_max_size);
// sampling for error
splOptions.errorSampling = true;

// this function is void and has no return
initSPL(&splOptions);

LoggerPtr logger;
logger = sLogger;
Error error;
mSPLPipelinePtr = std::make_shared<apsara::sls::spl::SplPipeline>(
mSpl, error, (u_int64_t)mTimeoutMills, (int64_t)mMaxMemoryBytes, logger);
if (error.code_ != StatusCode::OK) {
mSPLPipelinePtr = std::make_shared<LoongCollectorSplPipeline>();
errorMsg.clear();
ResultCode success = mSPLPipelinePtr->InitLoongCollectorSPL(mSpl,
INT32_FLAG(logtail_spl_pipeline_quota),
INT32_FLAG(logtail_spl_query_max_size),
errorMsg,
mTimeoutMills,
mMaxMemoryBytes);
if (success != ResultCode::OK) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
"failed to parse spl: " + mSpl + " error: " + error.msg_,
"failed to parse spl: " + mSpl + " error: " + errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
mContext->GetRegion());
}

mSplExcuteErrorCount = GetMetricsRecordRef().CreateCounter("proc_spl_excute_error_count");
Expand All @@ -124,10 +104,10 @@ bool ProcessorSPL::Init(const Json::Value& config) {


void ProcessorSPL::Process(PipelineEventGroup& logGroup) {
LOG_ERROR(
sLogger,
("ProcessorSPL error", "unexpected enter in ProcessorSPL::Process(PipelineEventGroup& logGroup)")("project", mContext->GetProjectName())("logstore", mContext->GetLogstoreName())(
"region", mContext->GetRegion())("configName", mContext->GetConfigName()));
LOG_ERROR(sLogger,
("ProcessorSPL error", "unexpected enter in ProcessorSPL::Process(PipelineEventGroup& logGroup)")(
"project", mContext->GetProjectName())("logstore", mContext->GetLogstoreName())(
"region", mContext->GetRegion())("configName", mContext->GetConfigName()));
}


Expand All @@ -139,55 +119,14 @@ void ProcessorSPL::Process(std::vector<PipelineEventGroup>& logGroupList) {
PipelineEventGroup logGroup = std::move(logGroupList[0]);
std::vector<PipelineEventGroup>().swap(logGroupList);

std::vector<std::string> colNames{FIELD_CONTENT};
// 根据spip->getInputSearches(),设置input数组
std::vector<Input*> inputs;
for (const auto& search : mSPLPipelinePtr->getInputSearches()) {
(void)search; //-Wunused-variable
PipelineEventGroupInput* input = new PipelineEventGroupInput(colNames, logGroup, *mContext);
if (!input) {
logGroupList.emplace_back(std::move(logGroup));
for (auto& input : inputs) {
delete input;
}
return;
}
inputs.push_back(input);
}
// 根据spip->getOutputLabels(),设置output数组
std::vector<Output*> outputs;
for (const auto& resultTaskLabel : mSPLPipelinePtr->getOutputLabels()) {
PipelineEventGroupOutput* output = new PipelineEventGroupOutput(logGroup, logGroupList, *mContext, resultTaskLabel);
if (!output) {
logGroupList.emplace_back(std::move(logGroup));
for (auto& input : inputs) {
delete input;
}
for (auto& output : outputs) {
delete output;
}
return;
}
outputs.emplace_back(output);
}

// 开始调用pipeline.execute
// 传入inputs, outputs
// 输出pipelineStats, error
PipelineStats pipelineStats;
auto errCode = mSPLPipelinePtr->execute(inputs, outputs, &errorMsg, &pipelineStats);
ResultCode result = mSPLPipelinePtr->Execute(std::move(logGroup), logGroupList, pipelineStats, mContext);

if (errCode != StatusCode::OK) {
LOG_ERROR(
sLogger,
("execute error", errorMsg)("project", mContext->GetProjectName())("logstore", mContext->GetLogstoreName())(
"region", mContext->GetRegion())("configName", mContext->GetConfigName()));
if (result != ResultCode::OK) {
mSplExcuteErrorCount->Add(1);
// 出现错误,把原数据放回来
logGroupList.emplace_back(std::move(logGroup));
if (errCode == StatusCode::TIMEOUT_ERROR) {
if (result == ResultCode::TIMEOUT_ERROR) {
mSplExcuteTimeoutErrorCount->Add(1);
} else if (errCode == StatusCode::MEM_EXCEEDED) {
} else if (result == ResultCode::MEM_EXCEEDED) {
mSplExcuteMemoryExceedErrorCount->Add(1);
}
} else {
Expand All @@ -200,12 +139,6 @@ void ProcessorSPL::Process(std::vector<PipelineEventGroup>& logGroupList) {
mFailTaskCount->Add(pipelineStats.failTaskCount_);
}

for (auto& input : inputs) {
delete input;
}
for (auto& output : outputs) {
delete output;
}
return;
}

Expand Down
9 changes: 3 additions & 6 deletions core/plugin/processor/ProcessorSPL.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

#include <string>

#include "spl/LoongCollectorSplPipeline.h"
#include "monitor/MetricManager.h"
#include "pipeline/plugin/interface/Processor.h"

namespace apsara::sls::spl {
class SplPipeline;
}

namespace logtail {

class ProcessorSPL : public Processor {
Expand All @@ -33,7 +30,7 @@ class ProcessorSPL : public Processor {
// Source field name.
std::string mSpl;
uint32_t mTimeoutMills = 1000;
uint32_t mMaxMemoryBytes = 50*1024*1024;
uint32_t mMaxMemoryBytes = 50 * 1024 * 1024;

bool Init(const Json::Value& config) override;
void Process(std::vector<PipelineEventGroup>& logGroupList) override;
Expand All @@ -43,7 +40,7 @@ class ProcessorSPL : public Processor {
bool IsSupportedEvent(const PipelineEventPtr& e) const override;

private:
std::shared_ptr<apsara::sls::spl::SplPipeline> mSPLPipelinePtr;
std::shared_ptr<LoongCollectorSplPipeline> mSPLPipelinePtr;

CounterPtr mSplExcuteErrorCount;
CounterPtr mSplExcuteTimeoutErrorCount;
Expand Down
3 changes: 1 addition & 2 deletions core/plugin/processor/links.cmake
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
macro(processor_link target_name link_withspl)
link_re2(${target_name})
if (LINUX AND ${link_withspl})
link_spl(${target_name})
target_link_libraries(${target_name} spl)
link_spl(${target_name} OFF)
endif ()
link_ssl(${target_name}) # must after link_spl
link_crypto(${target_name}) # must after link_spl
Expand Down
23 changes: 0 additions & 23 deletions core/spl/CMakeLists.txt

This file was deleted.

Loading
Loading