Skip to content

Commit

Permalink
update ebpf export file to avoid memory corruption (#1951)
Browse files Browse the repository at this point in the history
* update export file

Signed-off-by: qianlu.kk <[email protected]>

* fix: fix memory management issues when passing config to observer library

* set cid filter to false

---------

Signed-off-by: qianlu.kk <[email protected]>
Co-authored-by: xunfei <[email protected]>
Co-authored-by: Tom Yu <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent a88cb59 commit c4a0666
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 98 deletions.
2 changes: 1 addition & 1 deletion core/ebpf/SelfMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void eBPFSelfMonitorMgr::Suspend(const nami::PluginType type) {
mInited[int(type)] = false;
}

void eBPFSelfMonitorMgr::HandleStatistic(std::vector<nami::eBPFStatistics>&& stats) {
void eBPFSelfMonitorMgr::HandleStatistic(std::vector<nami::eBPFStatistics>& stats) {
for (auto& stat : stats) {
if (!stat.updated_) {
continue;
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/SelfMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class eBPFSelfMonitorMgr {
void Init(const nami::PluginType type, PluginMetricManagerPtr mgr, const std::string& name, const std::string& project);
void Release(const nami::PluginType type);
void Suspend(const nami::PluginType type);
void HandleStatistic(std::vector<nami::eBPFStatistics>&& stats);
void HandleStatistic(std::vector<nami::eBPFStatistics>& stats);
private:
// `mLock` is used to protect mSelfMonitors
ReadWriteLock mLock;
Expand Down
36 changes: 19 additions & 17 deletions core/ebpf/SourceManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ bool SourceManager::StartPlugin(nami::PluginType plugin_type, std::unique_ptr<na
return false;
}
auto init_f = (init_func)f;
int res = init_f(conf.release());
int res = init_f(conf.get());
if (!res) mRunning[int(plugin_type)] = true;
return !res;
}
Expand All @@ -199,6 +199,7 @@ bool SourceManager::UpdatePlugin(nami::PluginType plugin_type, std::unique_ptr<n
return false;
}

LOG_INFO(sLogger, ("begin to update plugin, type", int(plugin_type)));
conf->type = UpdataType::SECURE_UPDATE_TYPE_CONFIG_CHAGE;
FillCommonConf(conf);
#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -212,20 +213,21 @@ bool SourceManager::UpdatePlugin(nami::PluginType plugin_type, std::unique_ptr<n
}

auto update_f = (update_func)f;
int res = update_f(conf.release());
if (!res) mRunning[int(plugin_type)] = true;
int res = update_f(conf.get());
return !res;
}

bool SourceManager::StopAll() {
if (!DynamicLibSuccess()) {
LOG_WARNING(sLogger, ("dynamic lib not load, just exit", "need check"));
return true;
LOG_WARNING(sLogger, ("dynamic lib not load, just exit", "need check"));
return true;
}

for (size_t i = 0; i < mRunning.size(); i ++) {
auto& x = mRunning[i];
if (!x) continue;
if (!x) {
continue;
}
// stop plugin
StopPlugin(static_cast<nami::PluginType>(i));
}
Expand All @@ -241,13 +243,13 @@ bool SourceManager::StopAll() {
}

bool SourceManager::SuspendPlugin(nami::PluginType plugin_type) {
if (!CheckPluginRunning(plugin_type)) {
LOG_WARNING(sLogger, ("plugin not started, cannot suspend. type", int(plugin_type)));
return false;
}
auto config = std::make_unique<nami::eBPFConfig>();
config->plugin_type_ = plugin_type;
config->type = UpdataType::SECURE_UPDATE_TYPE_SUSPEND_PROBE;
if (!CheckPluginRunning(plugin_type)) {
LOG_WARNING(sLogger, ("plugin not started, cannot suspend. type", int(plugin_type)));
return false;
}
auto config = std::make_unique<nami::eBPFConfig>();
config->plugin_type_ = plugin_type;
config->type = UpdataType::SECURE_UPDATE_TYPE_SUSPEND_PROBE;
#ifdef APSARA_UNIT_TEST_MAIN
mConfig = std::move(config);
return true;
Expand All @@ -260,15 +262,15 @@ bool SourceManager::SuspendPlugin(nami::PluginType plugin_type) {
}

auto suspend_f = (suspend_func)f;
int res = suspend_f(config.release());
int res = suspend_f(config.get());

return !res;
}

bool SourceManager::StopPlugin(nami::PluginType plugin_type) {
if (!CheckPluginRunning(plugin_type)) {
LOG_WARNING(sLogger, ("plugin not started, do nothing. type", int(plugin_type)));
return true;
LOG_WARNING(sLogger, ("plugin not started, do nothing. type", int(plugin_type)));
return true;
}

auto config = std::make_unique<nami::eBPFConfig>();
Expand All @@ -288,7 +290,7 @@ bool SourceManager::StopPlugin(nami::PluginType plugin_type) {
}

auto remove_f = (remove_func)f;
int res = remove_f(config.release());
int res = remove_f(config.get());
if (!res) mRunning[int(plugin_type)] = false;
return !res;
}
Expand Down
8 changes: 4 additions & 4 deletions core/ebpf/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ bool SecurityOptions::Init(SecurityProbeType probeType,
}
nami::SecurityOption thisSecurityOption;
GetSecurityProbeDefaultCallName(probeType, thisSecurityOption.call_names_);
mOptionList.emplace_back(thisSecurityOption);
mOptionList.emplace_back(std::move(thisSecurityOption));
return true;
}
auto innerConfig = config["ProbeConfig"];
const auto& innerConfig = config["ProbeConfig"];
nami::SecurityOption thisSecurityOption;
// Genral Filter (Optional)
std::variant<std::monostate, nami::SecurityFileFilter, nami::SecurityNetworkFilter> thisFilter;
Expand Down Expand Up @@ -402,8 +402,8 @@ bool SecurityOptions::Init(SecurityProbeType probeType,
mContext->GetRegion());
}
thisSecurityOption.filter_ = thisFilter;
GetSecurityProbeDefaultCallName(probeType, thisSecurityOption.call_names_);
mOptionList.emplace_back(thisSecurityOption);
GetSecurityProbeDefaultCallName(probeType, thisSecurityOption.call_names_);
mOptionList.emplace_back(std::move(thisSecurityOption));
mProbeType = probeType;
return true;
}
Expand Down
41 changes: 19 additions & 22 deletions core/ebpf/eBPFServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void eBPFServer::Stop() {
for (int i = 0; i < int(nami::PluginType::MAX); i ++) {
UpdatePipelineName(static_cast<nami::PluginType>(i), "", "");
}

// UpdateContext must after than StopPlugin
if (mEventCB) mEventCB->UpdateContext(nullptr, -1, -1);
if (mMeterCB) mMeterCB->UpdateContext(nullptr, -1, -1);
Expand All @@ -199,11 +199,12 @@ void eBPFServer::Stop() {
if (mFileSecureCB) mFileSecureCB->UpdateContext(nullptr, -1, -1);
}

bool eBPFServer::StartPluginInternal(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr) {

bool eBPFServer::StartPluginInternal(const std::string& pipeline_name,
uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options,
PluginMetricManagerPtr mgr) {
std::string prev_pipeline_name = CheckLoadedPipelineName(type);
if (prev_pipeline_name.size() && prev_pipeline_name != pipeline_name) {
LOG_WARNING(sLogger, ("pipeline already loaded, plugin type", int(type))
Expand All @@ -217,62 +218,59 @@ bool eBPFServer::StartPluginInternal(const std::string& pipeline_name, uint32_t
mMonitorMgr->Init(type, mgr, pipeline_name, ctx->GetProjectName());

// step1: convert options to export type
std::variant<nami::NetworkObserveConfig, nami::ProcessConfig, nami::NetworkSecurityConfig, nami::FileSecurityConfig> config;
bool ret = false;
auto eBPFConfig = std::make_unique<nami::eBPFConfig>();
eBPFConfig->plugin_type_ = type;
eBPFConfig->stats_handler_ = [this](auto stats){ return mMonitorMgr->HandleStatistic(std::move(stats)); };
eBPFConfig->stats_handler_ = [this](auto& stats){ return mMonitorMgr->HandleStatistic(stats); };
// call update function
// step2: call init function
switch(type) {
case nami::PluginType::PROCESS_SECURITY: {
nami::ProcessConfig pconfig;
pconfig.process_security_cb_ = [this](auto events) { return mProcessSecureCB->handle(std::move(events)); };
pconfig.process_security_cb_ = [this](std::vector<std::unique_ptr<AbstractSecurityEvent>>& events) { return mProcessSecureCB->handle(events); };
SecurityOptions* opts = std::get<SecurityOptions*>(options);
pconfig.options_ = opts->mOptionList;
config = std::move(pconfig);
// UpdateContext must ahead of StartPlugin
mProcessSecureCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
eBPFConfig->config_ = config;
eBPFConfig->config_ = std::move(pconfig);
ret = mSourceManager->StartPlugin(type, std::move(eBPFConfig));
break;
}

case nami::PluginType::NETWORK_OBSERVE:{
nami::NetworkObserveConfig nconfig;
nconfig.enable_cid_filter = false;
nami::ObserverNetworkOption* opts = std::get<nami::ObserverNetworkOption*>(options);
if (opts->mEnableMetric) {
nconfig.enable_metric_ = true;
nconfig.measure_cb_ = [this](auto events, auto ts) { return mMeterCB->handle(std::move(events), ts); };
nconfig.measure_cb_ = [this](std::vector<std::unique_ptr<ApplicationBatchMeasure>>& events, auto ts) { return mMeterCB->handle(events, ts); };
nconfig.enable_metric_ = true;
mMeterCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
}
if (opts->mEnableSpan) {
nconfig.enable_span_ = true;
nconfig.span_cb_ = [this](auto events) { return mSpanCB->handle(std::move(events)); };
nconfig.span_cb_ = [this](std::vector<std::unique_ptr<ApplicationBatchSpan>>& events) { return mSpanCB->handle(events); };
nconfig.enable_span_ = true;
mSpanCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
}
if (opts->mEnableLog) {
nconfig.enable_event_ = true;
nconfig.event_cb_ = [this](auto events) { return mEventCB->handle(std::move(events)); };
nconfig.event_cb_ = [this](std::vector<std::unique_ptr<ApplicationBatchEvent>>& events) { return mEventCB->handle(events); };
nconfig.enable_event_ = true;
mEventCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
}

config = std::move(nconfig);
eBPFConfig->config_ = config;
eBPFConfig->config_ = std::move(nconfig);
ret = mSourceManager->StartPlugin(type, std::move(eBPFConfig));
break;
}

case nami::PluginType::NETWORK_SECURITY:{
nami::NetworkSecurityConfig nconfig;
nconfig.network_security_cb_ = [this](auto events) { return mNetworkSecureCB->handle(std::move(events)); };
nconfig.network_security_cb_ = [this](std::vector<std::unique_ptr<AbstractSecurityEvent>>& events) { return mNetworkSecureCB->handle(events); };
SecurityOptions* opts = std::get<SecurityOptions*>(options);
nconfig.options_ = opts->mOptionList;
config = std::move(nconfig);
eBPFConfig->config_ = config;
eBPFConfig->config_ = std::move(nconfig);
// UpdateContext must ahead of StartPlugin
mNetworkSecureCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
ret = mSourceManager->StartPlugin(type, std::move(eBPFConfig));
Expand All @@ -281,11 +279,10 @@ bool eBPFServer::StartPluginInternal(const std::string& pipeline_name, uint32_t

case nami::PluginType::FILE_SECURITY:{
nami::FileSecurityConfig fconfig;
fconfig.file_security_cb_ = [this](auto events) { return mFileSecureCB->handle(std::move(events)); };
fconfig.file_security_cb_ = [this](std::vector<std::unique_ptr<AbstractSecurityEvent>>& events) { return mFileSecureCB->handle(events); };
SecurityOptions* opts = std::get<SecurityOptions*>(options);
fconfig.options_ = opts->mOptionList;
config = std::move(fconfig);
eBPFConfig->config_ = config;
eBPFConfig->config_ = std::move(fconfig);
// UpdateContext must ahead of StartPlugin
mFileSecureCB->UpdateContext(ctx, ctx->GetProcessQueueKey(), plugin_index);
ret = mSourceManager->StartPlugin(type, std::move(eBPFConfig));
Expand Down
44 changes: 22 additions & 22 deletions core/ebpf/handler/ObserveHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace ebpf {
#define ADD_STATUS_METRICS(METRIC_NAME, FIELD_NAME, VALUE) \
{if (!inner->FIELD_NAME) return; \
auto event = group.AddMetricEvent(); \
for (auto& tag : measure->tags_) { \
for (const auto& tag : measure->tags_) { \
event->SetTag(tag.first, tag.second); \
} \
event->SetTag(std::string("status_code"), std::string(VALUE)); \
Expand All @@ -47,25 +47,25 @@ void FUNC_NAME(PipelineEventGroup& group, std::unique_ptr<Measure>& measure, uin
auto inner = static_cast<INNER_TYPE*>(measure->inner_measure_.get()); \
if (!inner->FIELD_NAME) return; \
auto event = group.AddMetricEvent(); \
for (auto& tag : measure->tags_) { \
for (const auto& tag : measure->tags_) { \
event->SetTag(tag.first, tag.second); \
} \
event->SetName(METRIC_NAME); \
event->SetTimestamp(ts); \
event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \
}

void OtelMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>&& measures, uint64_t timestamp) {
void OtelMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) {
if (measures.empty()) return;

for (auto& appBatchMeasures : measures) {
for (const auto& appBatchMeasures : measures) {
PipelineEventGroup eventGroup(std::make_shared<SourceBuffer>());
for (auto& measure : appBatchMeasures->measures_) {
for (const auto& measure : appBatchMeasures->measures_) {
auto type = measure->type_;
if (type == MeasureType::MEASURE_TYPE_APP) {
auto inner = static_cast<AppSingleMeasure*>(measure->inner_measure_.get());
auto event = eventGroup.AddMetricEvent();
for (auto& tag : measure->tags_) {
for (const auto& tag : measure->tags_) {
event->SetTag(tag.first, tag.second);
}
event->SetName("service_requests_total");
Expand All @@ -86,15 +86,15 @@ void OtelMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasur
return;
}

void OtelSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&& spans) {
void OtelSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>& spans) {
if (spans.empty()) return;

for (auto& span : spans) {
for (const auto& span : spans) {
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
for (auto& x : span->single_spans_) {
for (const auto& x : span->single_spans_) {
auto spanEvent = eventGroup.AddSpanEvent();
for (auto& tag : x->tags_) {
for (const auto& tag : x->tags_) {
spanEvent->SetTag(tag.first, tag.second);
}
spanEvent->SetName(x->span_name_);
Expand All @@ -118,24 +118,24 @@ void OtelSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&
return;
}

void EventHandler::handle(std::vector<std::unique_ptr<ApplicationBatchEvent>>&& events) {
void EventHandler::handle(std::vector<std::unique_ptr<ApplicationBatchEvent>>& events) {
if (events.empty()) return;

for (auto& appEvents : events) {
for (const auto& appEvents : events) {
if (!appEvents || appEvents->events_.empty()) continue;
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
for (auto& event : appEvents->events_) {
for (const auto& event : appEvents->events_) {
if (!event || event->GetAllTags().empty()) continue;
auto logEvent = eventGroup.AddLogEvent();
for (auto& tag : event->GetAllTags()) {
for (const auto& tag : event->GetAllTags()) {
logEvent->SetContent(tag.first, tag.second);
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(event->GetTimestamp()));
logEvent->SetTimestamp(seconds.count(), event->GetTimestamp() - seconds.count() * 1e9);
}
mProcessTotalCnt ++;
}
for (auto& tag : appEvents->tags_) {
for (const auto& tag : appEvents->tags_) {
eventGroup.SetTag(tag.first, tag.second);
}
#ifdef APSARA_UNIT_TEST_MAIN
Expand Down Expand Up @@ -195,16 +195,16 @@ GENERATE_METRICS(GenerateTcpRecvBytesTotalMetrics, MeasureType::MEASURE_TYPE_NET
GENERATE_METRICS(GenerateTcpSendPktsTotalMetrics, MeasureType::MEASURE_TYPE_NET, NetSingleMeasure, npm_send_pkt_total, send_pkt_total_)
GENERATE_METRICS(GenerateTcpSendBytesTotalMetrics, MeasureType::MEASURE_TYPE_NET, NetSingleMeasure, npm_send_byte_total, send_byte_total_)

void ArmsSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&& spans) {
void ArmsSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>& spans) {
if (spans.empty()) return;

for (auto& span : spans) {
for (const auto& span : spans) {
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
eventGroup.SetTag(app_id_key, span->app_id_);
for (auto& x : span->single_spans_) {
for (const auto& x : span->single_spans_) {
auto spanEvent = eventGroup.AddSpanEvent();
for (auto& tag : x->tags_) {
for (const auto& tag : x->tags_) {
spanEvent->SetTag(tag.first, tag.second);
}
spanEvent->SetName(x->span_name_);
Expand All @@ -227,17 +227,17 @@ void ArmsSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&
return;
}

void ArmsMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>&& measures, uint64_t timestamp) {
void ArmsMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) {
if (measures.empty()) return;

for (auto& appBatchMeasures : measures) {
for (const auto& appBatchMeasures : measures) {
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();;
PipelineEventGroup eventGroup(sourceBuffer);

// source_ip
eventGroup.SetTag(std::string(app_id_key), appBatchMeasures->app_id_);
eventGroup.SetTag(std::string(ip_key), appBatchMeasures->ip_);
for (auto& measure : appBatchMeasures->measures_) {
for (const auto& measure : appBatchMeasures->measures_) {
auto type = measure->type_;
if (type == MeasureType::MEASURE_TYPE_APP) {
GenerateRequestsTotalMetrics(eventGroup, measure, timestamp);
Expand Down
Loading

0 comments on commit c4a0666

Please sign in to comment.