Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add development doc for native plugin #1907

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/plugin/flusher/sls/SLSResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool SLSResponse::Parse(const HttpResponse& response) {
mStatusCode = response.GetStatusCode();

if (mStatusCode == 0) {
mErrorCode = sdk::LOG_REQUEST_TIMEOUT;
mErrorCode = sdk::LOGE_REQUEST_TIMEOUT;
mErrorMsg = "Request timeout";
} else if (mStatusCode != 200) {
try {
Expand Down
10 changes: 6 additions & 4 deletions docs/cn/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,12 @@
* [Checkpoint接口](developer-guide/plugin-development/checkpoint-api.md)
* [Logger接口](developer-guide/plugin-development/logger-api.md)
* [自监控指标接口](developer-guide/plugin-development/plugin-self-monitor-guide.md)
* [如何开发Input插件](developer-guide/plugin-development/how-to-write-input-plugins.md)
* [如何开发Processor插件](developer-guide/plugin-development/how-to-write-processor-plugins.md)
* [如何开发Aggregator插件](developer-guide/plugin-development/how-to-write-aggregator-plugins.md)
* [如何开发Flusher插件](developer-guide/plugin-development/how-to-write-flusher-plugins.md)
* [如何开发原生Input插件](developer-guide/plugin-development/how-to-write-native-input-plugins.md)
* [如何开发原生Flusher插件](developer-guide/plugin-development/how-to-write-native-flusher-plugins.md)
* [如何开发扩展Input插件](developer-guide/plugin-development/how-to-write-input-plugins.md)
* [如何开发扩展Processor插件](developer-guide/plugin-development/how-to-write-processor-plugins.md)
* [如何开发扩展Aggregator插件](developer-guide/plugin-development/how-to-write-aggregator-plugins.md)
* [如何开发扩展Flusher插件](developer-guide/plugin-development/how-to-write-flusher-plugins.md)
* [如何生成插件文档](developer-guide/plugin-development/how-to-genernate-plugin-docs.md)
* [插件文档规范](docs/cn/developer-guide/plugin-development/plugin-doc-templete.md)
* [纯插件模式启动](developer-guide/plugin-development/pure-plugin-start.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# 如何开发原生Flusher插件

## 接口定义

```c++
class Flusher : public Plugin {
public:
// 用于初始化插件参数,同时根据参数初始化Flusher级的组件
virtual bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) = 0;
virtual bool Start();
virtual bool Stop(bool isPipelineRemoving);
// 用于将处理插件的输出经过聚合、序列化和压缩处理后,放入发送队列
virtual void Send(PipelineEventGroup&& g) = 0;
// 用于将聚合组件内指定聚合队列内的数据进行强制发送
virtual void Flush(size_t key) = 0;
// 用于将聚合组件内的所有数据进行强制发送
virtual void FlushAll() = 0;
};
```

对于使用Http协议发送数据的Flusher,进一步定义了HttpFlusher,接口如下:

```c++
class HttpFlusher : public Flusher {
public:
// 用于将待发送数据打包成http请求
virtual bool BuildRequest(SenderQueueItem* item, std::unique_ptr<HttpSinkRequest>& req, bool* keepItem) const = 0;
// 用于发送完成后进行记录和处理
virtual void OnSendDone(const HttpResponse& response, SenderQueueItem* item) = 0;
};
```

## Flusher级组件

### 聚合(必选)

* 作用:将多个小的PipelineEventGroup根据tag异同合并成一个大的group,提升发送效率

* 参数:

可以在flusher的参数中配置Batch字段,该字段的类型为map,其中允许包含的字段如下:

| **名称** | **类型** | **默认值** | **说明** |
| --- | --- | --- | --- |
| MinCnt | uint | 每个Flusher自定义 | 每个聚合队列最少包含的event数量 |
| MinSizeBytes | uint | 每个Flusher自定义 | 每个聚合队列最小的尺寸 |
| TimeoutSecs | uint | 每个Flusher自定义 | 每个聚合队列在第一个event加入后,在被输出前最多等待的时间 |

* 类接口:

```c++
template <typename T = EventBatchStatus>
class Batcher {
public:
bool Init(const Json::Value& config,
Flusher* flusher,
const DefaultFlushStrategyOptions& strategy,
bool enableGroupBatch = false);
void Add(PipelineEventGroup&& g, std::vector<BatchedEventsList>& res);
void FlushQueue(size_t key, BatchedEventsList& res);
void FlushAll(std::vector<BatchedEventsList>& res);
}
```

### 序列化(必选)

* 作用:对聚合模块的输出进行序列化,分为2个层级:

* event级:对每一个event单独进行序列化

* event group级:对多个event进行批量的序列化

* 类接口:

```c++
// T: PipelineEventPtr, BatchedEvents, BatchedEventsList
template <typename T>
class Serializer {
private:
virtual bool Serialize(T&& p, std::string& res, std::string& errorMsg) = 0;
};

using EventSerializer = Serializer<PipelineEventPtr>;
using EventGroupSerializer = Serializer<BatchedEvents>;
```

### 压缩(可选)

* 作用:对序列化后的结果进行压缩

* 类接口:

```c++
class Compressor {
private:
virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
};
```

## 开发步骤

下面以开发一个HttpFlusher为例,说明整个开发步骤:

1. 在plugin/flusher目录下新建一个Flusherxxx.h和Flusherxxx.cpp文件,用于派生HttpFlusher接口生成具体的插件类;

2. 在Flusherxxx.h文件中定义新的输出插件类Flusherxxx,满足以下要求:

a. 所有的可配置参数的权限为public,其余参数的权限均为private

b. 新增一个聚合组件:`Batcher<> mBatcher;`

c. 新增一个序列化组件:`std::unique_ptr<T> mSerializer;`,其中T为`EventSerializer`和`EventGroupSerializer`中的一种

d. 如果需要压缩,则新增一个压缩组件:`std::unique_ptr<Compressor> mCompressor;`

3. 在pipeline/serializer目录下新建一个xxxSerializer.h和xxxSerializer.cpp文件,用于派生`Serializer` 接口生成具体类;

4. (可选)如果需要压缩组件,且现有压缩组件库中没有所需算法,则新增一个压缩组件:

a. 在common/compression/CompressType.h文件中,扩展CompressType类用以标识新的压缩算法;

b. 在common/compression目录下新建一个xxxCompressor.h和xxxCompressor.cpp文件,用于派生`Compressor`接口生成具体类;

c. 在common/compression/CompressorFactory.cpp文件的各个函数中注册该压缩组件;

5. 在Flusherxxx.cpp文件中实现插件类

a. `Init`函数:

i. 根据入参初始化插件,针对非法参数,根据非法程度和影响决定是跳过该参数、使用默认值或直接拒绝加载插件。

ii. 调用相关函数完成聚合、序列化和压缩组件的初始化

b. `SerializeAndPush(BatchedEventsList&&)`函数:

```c++
void Flusherxxx::SerializeAndPush(BatchedEventsList&& groupLists) {
// 1. 使用mSerializer->Serialize函数对入参序列化
// 2. 如果需要压缩,则使用mCompressor->Compress函数对序列化结果进行压缩
// 3. 构建发送队列元素,其中,
// a. data为待发送内容
// b. 如果没用压缩组件,则rawSize=data.size();否则,rawSize为压缩前(序列化后)数据的长度
// c. mLogstoreKey为发送队列的key
auto item = make_unique<SenderQueueItem>(std::move(data),
rawSize,
this,
mQueueKey);
Flusher::PushToQueue(std::move(item));
}
```

c. `SerializeAndPush(vector<BatchedEventsList>&&)`函数:

```c++
void Flusherxxx::SerializeAndPush(vector<BatchedEventsList>&& groupLists) {
for (auto& groupList : groupLists) {
SerializeAndPush(std::move(groupList));
}
}
```

d. `Send`函数:

```c++
void Flusherxxx::Send(PipelineEventGroup&& g) {
vector<BatchedEventsList> res;
mBatcher.Add(std::move(g), res);
SerializeAndPush(std::move(res));
}
```

e. `Flush`函数:

```c++
void Flusherxxx::Flush(size_t key) {
BatchedEventsList res;
mBatcher.FlushQueue(key, res);
SerializeAndPush(std::move(res));
}
```

f. `FlushAll`函数:

```c++
void Flusherxxx::FlushAll() {
vector<BatchedEventsList> res;
mBatcher.FlushAll(res);
SerializeAndPush(std::move(res));
}

```

g. `BuildRequest`函数:将待发送数据包装成一个Http请求,如果请求构建失败,使用`keepItem`参数记录是否要保留数据供以后重试。

h. `OnSendDone`函数:根据返回的http response进行相应的记录和操作。

6. 在`PluginRegistry`类中注册该插件:

7. 在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行:

```c++
#include "plugin/flusher/Flusherxxx.h"
```

8. 在`PluginRegistry`类的`LoadStaticPlugins()`函数中新增如下行:

```c++
RegisterFlusherCreator(new StaticFlusherCreator<Flusherxxx>());
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# 如何开发原生Input插件

## 工作模式

同一输入类型的所有插件实例共享同一个线程来获取数据,插件实例只负责保存插件配置。

## 接口定义

```c++
class Input : public Plugin {
public:
// 初始化插件,入参为插件参数
virtual bool Init(const Json::Value& config) = 0;
// 负责向管理类注册配置
virtual bool Start() = 0;
// 负责向管理类注销配置
virtual bool Stop(bool isPipelineRemoving) = 0;
};
```

## 开发步骤

1. 在plugin/input目录下新建一个Inputxxx.h和Inputxxx.cpp文件,用于派生Input接口生成具体的插件类;

2. 在Inputxxx.h文件中定义新的输入插件类Inputxxx,满足以下规范:

a. 所有的可配置参数的权限为public,其余参数的权限均为private。

3. 在Inputxxx.cpp文件中实现`Init`函数,即根据入参初始化插件,针对非法参数,根据非法程度和影响决定是跳过该参数、使用默认值或直接拒绝加载插件。

4. 在根目录下新增一个目录,用于创建当前输入插件的管理类及其他辅助类,该管理类需要继承InputRunner接口:

```c++
class InputRunner {
public:
// 调用点:由插件的Start函数调用
// 作用:初始化管理类,并至少启动一个线程用于采集数据
// 注意:该函数必须是可重入的,因此需要在函数开头判断是否已经启动线程,如果是则直接退出
virtual void Init() = 0;
// 调用点:进程退出时,或配置热加载结束后无注册插件时由框架调用
// 作用:停止管理类,并进行扫尾工作,如资源回收、checkpoint记录等
virtual void Stop() = 0;
// 调用点:每次配置热加载结束后由框架调用
// 作用:判断是否有插件注册,若无,则框架将调用Stop函数对线程资源进行回收
virtual bool HasRegisteredPlugin() const = 0;
}
```

管理类是输入插件线程资源的实际拥有者,其最基本的运行流程如下:

- 依次访问每个注册的配置,根据配置情况抓取数据;

- 根据数据类型将源数据转换为PipelineEvent子类中的一种,并将一批数据组装成PipelineEventGroup;

- 将PipelineEventGroup发送到相应配置的处理队列中:

```c++
ProcessorRunner::GetInstance()->PushQueue(queueKey, inputIdx, std::move(group));
```

其中,

- queueKey是队列的key,可以从相应流水线的PipelineContext类的`GetProcessQueueKey()`方法来获取。

- inputIdx是当前数据所属输入插件在该流水线所有输入插件的位置(即配置中第几个,从0开始计数)

- group是待发送的数据包

最后,为了支持插件向管理类注册,管理类还需要提供注册和注销函数供插件使用,从性能的角度考虑,**该注册和注销过程应当是独立的,即某个插件的注册和注销不应当影响整个线程的运转**。

5. 在Inputxxx.cpp文件中实现其余接口函数:

```c++
bool Inputxxx::Start() {
// 1. 调用管理类的Start函数
// 2. 将当前插件注册到管理类中
}

bool Inputxxx::Stop(bool isPipelineRemoving) {
// 将当前插件从管理类中注销
}
```

6. 在`PluginRegistry`类中注册该插件:

a. 在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行:

```c++
#include "plugin/input/Inputxxx.h"
```

b. 在`PluginRegistry`类的`LoadStaticPlugins()`函数中新增如下行:

```c++
RegisterInputCreator(new StaticInputCreator<Inputxxx>());
```

c. 在`PipelineManager`类的构造函数中注册该插件的管理类
Loading