Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Assassin718 committed Sep 1, 2024
1 parent 7e98ac4 commit f7dd3bd
Show file tree
Hide file tree
Showing 80 changed files with 5,304 additions and 155 deletions.
152 changes: 64 additions & 88 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,101 +332,77 @@ bool PipelineConfig::Parse() {
mRegion);
}
const string pluginType = it->asString();
if (mHasGoInput) {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with extended input plugins",
noModule,
mName,
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
if (isCurrentPluginNative) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
// TODO: remove these special restrictions
if (mHasNativeInput && !hasObserverInput && !hasFileInput) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"extended processor plugins coexist with native input plugins other "
"than input_file or input_container_stdio",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
isCurrentPluginNative = false;
mHasGoProcessor = true;
} else {
} else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
}
} else {
if (isCurrentPluginNative) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
// TODO: remove these special restrictions
if (!hasObserverInput && !hasFileInput) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"extended processor plugins coexist with native input plugins other "
"than input_file or input_container_stdio",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
isCurrentPluginNative = false;
mHasGoProcessor = true;
} else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (pluginType == "processor_spl") {
if (i != 0 || itr->size() != 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (pluginType == "processor_spl") {
if (i != 0 || itr->size() != 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with spl processor",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
} else {
// TODO: remove these special restrictions
if (hasObserverInput) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with input_observer_network",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
mHasNativeProcessor = true;
alarm,
"native processor plugins coexist with spl processor",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
} else {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
// TODO: remove these special restrictions
if (hasObserverInput) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugin comes after extended processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
alarm,
"native processor plugins coexist with input_observer_network",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
mHasNativeProcessor = true;
}
} else {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugin comes after extended processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
}
}
mProcessors.push_back(&plugin);
Expand Down
2 changes: 1 addition & 1 deletion core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ macro(link_protobuf target_name)
endmacro()
logtail_define(protobuf_BIN "Absolute path to protoc" "${DEPS_BINARY_ROOT}/protoc")
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/log_pb")
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto)
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto ${PROTO_FILE_PATH}/pipeline_event.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})

# re2
Expand Down
109 changes: 106 additions & 3 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pipeline/PipelineManager.h"
#include "profile_sender/ProfileSender.h"
#include "queue/SenderQueueManager.h"
#include "queue/ProcessQueueManager.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -76,7 +77,7 @@ LogtailPlugin::~LogtailPlugin() {
DynamicLibLoader::CloseLib(mPluginAdapterPtr);
}

bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
LoadGoPipelineResp LogtailPlugin::LoadPipeline(const std::string& pipelineName,
const std::string& pipeline,
const std::string& project,
const std::string& logstore,
Expand All @@ -102,10 +103,10 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
goLogstore.p = logstore.c_str();
long long goLogStoreKey = static_cast<long long>(logstoreKey);

return mLoadConfigFun(goProject, goLogstore, goConfigName, goLogStoreKey, goPluginConfig) == 0;
return *mLoadConfigFun(goProject, goLogstore, goConfigName, goLogStoreKey, goPluginConfig);
}

return false;
return LoadGoPipelineResp{false, LoadGoPipelineResp::InputModeType::UNKNOWN};
}

void LogtailPlugin::HoldOn(bool exitFlag) {
Expand Down Expand Up @@ -253,6 +254,100 @@ int LogtailPlugin::ExecPluginCmd(
return 0;
}

int LogtailPlugin::IsValidToProcess(const char* configName, int configNameSize) {
string configNameStr(configName, configNameSize);
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(configNameStr);
if (!pipeline) {
LOG_ERROR(sLogger,
("pipeline not found during IsValidToProcess, perhaps due to config deletion",
"return invalid")("config", configName));
return -1;
}
auto processQueueKey = pipeline->GetContext().GetProcessQueueKey();
return ProcessQueueManager::GetInstance()->IsValidToPush(processQueueKey) ? 0 : -1;
}

int LogtailPlugin::PushQueue(const char* configName, int configNameSize, const char* pbBuffer, int pbSize) {
string configNameStr(configName, configNameSize);
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(configNameStr);
if (!pipeline) {
LOG_ERROR(sLogger,
("pipeline not found during PushQueue, perhaps due to config deletion",
"return invalid")("config", configName));
return -1;
}

string pbStr(pbBuffer, pbSize);
sls_logs::PipelineEventGroup eventGroupSrc;
if (!eventGroupSrc.ParseFromString(pbStr)) {
LOG_ERROR(sLogger, ("parse pb failed in PushQueue", "invalid pb"));
return -1;
}
logtail::PipelineEventGroup eventGroupDst(std::make_shared<SourceBuffer>());
for (auto& tag : eventGroupSrc.tags()) {
eventGroupDst.SetTag(tag.first, tag.second);
}
for (auto& metaData : eventGroupSrc.metadata()) {
if (metaData.first == "source") {
eventGroupDst.SetMetadata(logtail::EventGroupMetaKey::SOURCE_ID, metaData.second);
}
}
switch (eventGroupSrc.type())
{
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_LOG:
if (eventGroupSrc.logs_size() == 0) {
LOG_ERROR(sLogger, ("invalid event group, no logs", ""));
return -1;
}
for (auto& logSrc : eventGroupSrc.logs()) {
auto logDst = eventGroupDst.AddLogEvent();
std::optional<uint32_t> ns;
time_t t = time_t(logSrc.time());
if (logSrc.has_time_ns()) {
ns = logSrc.time_ns();
}
logDst->SetTimestamp(t, ns);
for (auto& content_pair : logSrc.contents()) {
logDst->SetContent(content_pair.key(), content_pair.value());
}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
if (eventGroupSrc.metrics_size() == 0) {
LOG_ERROR(sLogger, ("invalid event group, no metrics", ""));
return -1;
}
for (auto& metricSrc : eventGroupSrc.metrics()) {
auto metricDst = eventGroupDst.AddMetricEvent();
uint32_t t = metricSrc.time();
std::optional<uint32_t> ns;
if (metricSrc.has_time_ns()) {
ns = metricSrc.time_ns();
}
metricDst->SetTimestamp(t, ns);
metricDst->SetName(metricSrc.name());
switch (metricSrc.type()) {
case sls_logs::MetricEvent::MetricValueType::MetricEvent_MetricValueType_SINGLE:
metricDst->SetValue(UntypedSingleValue{metricSrc.singlevalue()});
break;
case sls_logs::MetricEvent::MetricValueType::MetricEvent_MetricValueType_MULTI:
LOG_ERROR(sLogger, ("metric value type mutivalue unsported", ""));
}
for (auto& tag_pair : metricSrc.tags()) {
metricDst->SetTag(tag_pair.first, tag_pair.second);
}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_SPAN:
break;
default:
LOG_ERROR(sLogger, ("invalid event type", eventGroupSrc.type()));
}

auto processQueueKey = pipeline->GetContext().GetProcessQueueKey();
return ProcessQueueManager::GetInstance()->PushQueue(processQueueKey, std::make_unique<ProcessQueueItem>(std::move(eventGroupDst), 0xFFFFFFFF));
}


bool LogtailPlugin::LoadPluginBase() {
if (mPluginValid) {
Expand Down Expand Up @@ -298,6 +393,14 @@ bool LogtailPlugin::LoadPluginBase() {
registerFun(LogtailPlugin::IsValidToSend, LogtailPlugin::SendPb, LogtailPlugin::ExecPluginCmd);
}

auto registerProcessFun = (RegisterLogtailProcessCallBack)loader.LoadMethod("RegisterLogtailProcessCallBack", error);
if (error.empty()) {
registerProcessFun(LogtailPlugin::IsValidToProcess, LogtailPlugin::PushQueue);
} else {
LOG_WARNING(sLogger, ("load RegisterLogtailProcessCallBack failed", error));
return mPluginValid;
}

mPluginAdapterPtr = loader.Release();
}

Expand Down
24 changes: 22 additions & 2 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "flusher/sls/FlusherSLS.h"
#include "log_pb/sls_logs.pb.h"
#include "log_pb/pipeline_event.pb.h"

extern "C" {
// The definition of Golang type is copied from PluginAdaptor.h that
Expand Down Expand Up @@ -130,9 +131,19 @@ struct K8sContainerMeta {
}
};

struct LoadGoPipelineResp {
int Code; // 0: success, 1: fail
enum InputModeType {
UNKNOWN,
PUSH,
PULL,
};
InputModeType InputMode;
};

// Methods export by plugin.
typedef GoInt (*LoadGlobalConfigFun)(GoString);
typedef GoInt (*LoadConfigFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef struct LoadGoPipelineResp* (*LoadConfigFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef GoInt (*UnloadConfigFun)(GoString p, GoString l, GoString c);
typedef GoInt (*ProcessRawLogFun)(GoString c, GoSlice l, GoString p, GoString t);
typedef GoInt (*ProcessRawLogV2Fun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
Expand Down Expand Up @@ -175,6 +186,11 @@ typedef void (*RegisterLogtailCallBackV2)(IsValidToSendFun checkFun,
PluginCtlCmdFun cmdFun);

typedef int (*PluginAdapterVersion)();

typedef int (*IsValidToProcessFun)(const char *configName, int configNameSize);
typedef int (*PushQueueFun)(const char *configName, int configNameSize, const char *pbBuffer, int pbSize);
typedef void (*RegisterLogtailProcessCallBack)(IsValidToProcessFun checkFun, PushQueueFun pushFun);

}

// Create by david zhang. 2017/09/02 22:22:12
Expand Down Expand Up @@ -208,7 +224,7 @@ class LogtailPlugin {

bool LoadPluginBase();
// void LoadConfig();
bool LoadPipeline(const std::string& pipelineName,
LoadGoPipelineResp LoadPipeline(const std::string& pipelineName,
const std::string& pipeline,
const std::string& project = "",
const std::string& logstore = "",
Expand Down Expand Up @@ -266,6 +282,10 @@ class LogtailPlugin {

void GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList);

static int IsValidToProcess(const char* configName, int configNameSize);

static int PushQueue(const char* configName, int configNameSize, const char* pbBuffer, int pbSize);

private:
void* mPluginBasePtr;
void* mPluginAdapterPtr;
Expand Down
Loading

0 comments on commit f7dd3bd

Please sign in to comment.