Skip to content

Commit

Permalink
init (#1905)
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 authored Nov 25, 2024
1 parent 0ff6f89 commit b11fd8f
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 16 deletions.
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ bool Pipeline::Init(PipelineConfig&& config) {
= PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false));
if (flusher) {
Json::Value optionalGoPipeline;
if (!flusher->Init(detail, mContext, optionalGoPipeline)) {
if (!flusher->Init(detail, mContext, i, optionalGoPipeline)) {
return false;
}
mFlushers.emplace_back(std::move(flusher));
Expand Down
16 changes: 11 additions & 5 deletions core/pipeline/batch/Batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Batcher {
}
if (mGroupQueue->IsEmpty()) {
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
0,
mFlusher->GetFlusherIndex(),
0,
mGroupFlushStrategy->GetTimeoutSecs(),
mFlusher);
Expand All @@ -193,8 +193,11 @@ class Batcher {
g.GetSourceBuffer(),
g.GetExactlyOnceCheckpoint(),
g.GetMetadata(EventGroupMetaKey::SOURCE_ID));
TimeoutFlushManager::GetInstance()->UpdateRecord(
mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher);
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
mFlusher->GetFlusherIndex(),
key,
mEventFlushStrategy.GetTimeoutSecs(),
mFlusher);
mBufferedGroupsTotal->Add(1);
mBufferedDataSizeByte->Add(item.DataSize());
} else if (i == 0) {
Expand Down Expand Up @@ -243,8 +246,11 @@ class Batcher {
mGroupQueue->Flush(res);
}
if (mGroupQueue->IsEmpty()) {
TimeoutFlushManager::GetInstance()->UpdateRecord(
mFlusher->GetContext().GetConfigName(), 0, 0, mGroupFlushStrategy->GetTimeoutSecs(), mFlusher);
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
mFlusher->GetFlusherIndex(),
0,
mGroupFlushStrategy->GetTimeoutSecs(),
mFlusher);
}
iter->second.Flush(mGroupQueue.value());
mEventQueueMap.erase(iter);
Expand Down
3 changes: 2 additions & 1 deletion core/pipeline/plugin/instance/FlusherInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ using namespace std;

namespace logtail {

bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) {
bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, size_t flusherIdx, Json::Value& optionalGoPipeline) {
mPlugin->SetContext(context);
mPlugin->SetPluginID(PluginID());
mPlugin->SetFlusherIndex(flusherIdx);
mPlugin->SetMetricsRecordRef(Name(), PluginID());
if (!mPlugin->Init(config, optionalGoPipeline)) {
return false;
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/plugin/instance/FlusherInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ namespace logtail {

class FlusherInstance : public PluginInstance {
public:
FlusherInstance(Flusher* plugin, const PluginInstance::PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}
FlusherInstance(Flusher* plugin, const PluginInstance::PluginMeta& pluginMeta)
: PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); };
const Flusher* GetPlugin() const { return mPlugin.get(); }

bool Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline);
bool Init(const Json::Value& config, PipelineContext& context, size_t flusherIdx, Json::Value& optionalGoPipeline);
bool Start() { return mPlugin->Start(); }
bool Stop(bool isPipelineRemoving) { return mPlugin->Stop(isPipelineRemoving); }
bool Send(PipelineEventGroup&& g);
Expand Down
3 changes: 3 additions & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Flusher : public Plugin {

QueueKey GetQueueKey() const { return mQueueKey; }
void SetPluginID(const std::string& pluginID) { mPluginID = pluginID; }
size_t GetFlusherIndex() { return mIndex; }
void SetFlusherIndex(size_t idx) { mIndex = idx; }
const std::string& GetPluginID() const { return mPluginID; }

protected:
Expand All @@ -54,6 +56,7 @@ class Flusher : public Plugin {

QueueKey mQueueKey;
std::string mPluginID;
size_t mIndex = 0;

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherInstanceUnittest;
Expand Down
12 changes: 6 additions & 6 deletions core/unittest/pipeline/PipelineUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2736,13 +2736,13 @@ void PipelineUnittest::TestSend() const {
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
vector<pair<size_t, const Json::Value*>> configs;
Expand Down Expand Up @@ -2788,13 +2788,13 @@ void PipelineUnittest::TestSend() const {
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}

Expand Down Expand Up @@ -2855,13 +2855,13 @@ void PipelineUnittest::TestFlushBatch() const {
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/plugin/FlusherInstanceUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void FlusherInstanceUnittest::TestInit() const {
= make_unique<FlusherInstance>(new FlusherMock(), PluginInstance::PluginMeta("0"));
Json::Value config, opt;
PipelineContext context;
APSARA_TEST_TRUE(flusher->Init(config, context, opt));
APSARA_TEST_TRUE(flusher->Init(config, context, 0, opt));
APSARA_TEST_EQUAL(&context, &flusher->GetPlugin()->GetContext());
}

Expand Down

0 comments on commit b11fd8f

Please sign in to comment.