-
Notifications
You must be signed in to change notification settings - Fork 417
Feature 936 Standalone connection pool
- Status: Design
- Author: Jianxiang Ran
- Co-author: Rocky Jin
- Discussion: https://github.com/lf-edge/ekuiper/issues/936
As data process middleware, eKuiper must exchange data with other components. In eKuiper, we use Source
to represent the data input and Sink
for the data output. In some cases, Source
and Sink
need share the same connection:
- Mqtt source and mqtt sink across all rules may want to use the same mqtt connection
- There is no way for EdgeX Sink to connect to secure redis. This could be a way to reuse the connection with Source
At existing implement, Source and Sink set up and hold the client instances by themselvs. We can introduce a client-manager
entity to maintain different client instances. Since the client instance are thread-safe, it can be used by different Sources and Sinks at the same time.
client-manager
is a key-value storage for client instances: connection type (mqtt or edgeX) and connection selector's combination as key, client instances as values.
- connection type: different Source/Sink types, like EdgeX, MQTT
- connection selector: user can configure Source or Sink address by choosing different endpointURL.
The client-manager
must have these capabilities:
- builder function to build different client types according to the connection type and configure key
- get the unique connection by the connection type and connection key
- when all Sink and Source have no reference to a specific connection, should close it
client-manager
should provide the following APIs:
// called anytime when need the client
GetConnection(connectionType string, configKey strgin) (interface{}, error)
// called when this source/sink exit
ReleaseConnection(connectionType string, configKey strgin) error
These two APIs will be provided to users by ctx API, Sink/Source can call in this way
// called anytime when need the client
ctx.GetConnection(connectionType string, configKey strgin) (interface{}, error)
// called when this source/sink exit
ctx.ReleaseConnection(connectionType string, configKey strgin) error
The implement is DefaultContext will add these two new APIs and client-manager
will be injected into Source/Sink Context, the ctx API will delegate to client-manager
APIs.
In this example, Source and Sink use MQTT. we need following steps:
-
in connection.yaml file, write the mqtt connection related configurations from which Source and Sink can choose.
-
in Source mqtt_source.yaml, create custom configuration items
demo-config
which select the connection configurationmqtt.conn2
pre-defined in step1 then create stream and specifyCONF_KEY=demo-config
. This will let Source usemqtt.conn2
to connect
demo (
USERID BIGINT,
FIRST_NAME STRING,
LAST_NAME STRING,
NICKNAMES ARRAY(STRING),
Gender BOOLEAN
) WITH (DATASOURCE="test/", FORMAT="JSON", TYPE="mqtt", CONF_KEY="demo-config");
- create sink actions and specify
connection-selector=mqtt.conn2
. This will let Sink usemqtt.conn2
to connect
"mqtt": {
"connection-selector": "mqtt.conn2",
"server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
"topic": "devices/result",
"qos": 1,
"clientId": "demo_001",
"certificationPath": "keys/d3807d9fa5-certificate.pem",
"privateKeyPath": "keys/d3807d9fa5-private.pem.key",
"insecureSkipVerify": false,
"retained": false
}
- write MQTT Source/Sink code
When Source or Sink have this configconnection-selector=mqtt.conn2
, Source/Sink need get connection fromclient-manager
; otherwise, just init the connection by itself.
Open(config) {
if config.connection-selector != "" {
mqtt_client := GetConnection("mqtt", "conn2")
} else {
mqtt_client := GetConnByMyself()
}
}
Close(config) {
if config.connection-selector != "" {
ReleaseConnection("mqtt", "conn2")
} else {
mqtt_client := DropConnByMyself()
}
}
- get metadata/connections
[
{
"name": "edgex",
"about": {
"trial": false,
"installed": true,
"author": {
"name": "EMQ",
"email": "[email protected]",
"company": "EMQ Technologies Co., Ltd",
"website": "https://www.emqx.io"
},
"helpUrl": {
"en": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/rules/sources/edgex.md",
"zh": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/edgex.md"
},
"description": {
"en": "Kuiper provides built-in support for EdgeX source stream, which can subscribe the message from EdgeX message bus and feed into the Kuiper streaming process pipeline.",
"zh": "Kuiper 提供了内置的 EdgeX 源支持,它可以被用来订阅来自于EdgeX 消息总线的数据,并且将数据放入 Kuiper 数据处理流水线中。"
}
}
},
{
"name": "mqtt",
"about": {
"trial": false,
"installed": true,
"author": {
"name": "EMQ",
"email": "[email protected]",
"company": "EMQ Technologies Co., Ltd",
"website": "https://www.emqx.io"
},
"helpUrl": {
"en": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/rules/sources/mqtt.md",
"zh": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/mqtt.md"
},
"description": {
"en": "Kuiper provides built-in support for MQTT source stream, which can subscribe the message from MQTT broker and feed into the Kuiper processing pipeline.",
"zh": "Kuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入Kuiper 处理管道。"
}
}
}
]
- get metadata/connections/mqtt/connectionSelectors
[
"default",
"application_conf"
]
- get metadata/connections/mqtt_template
[
{
"exist": false,
"name": "server",
"default": null,
"type": "string",
"control": "text",
"optional": false,
"values": null,
"hint": {
"en": "The broker address of the MQTT server, such as tcp://127.0.0.1:1883.",
"zh": "MQTT 服务器地址,例如 tcp://127.0.0.1:1883"
},
"label": {
"en": "MQTT broker address",
"zh": "MQTT 服务器地址"
}
},
{
"exist": false,
"name": "clientId",
"default": null,
"type": "string",
"control": "text",
"optional": true,
"values": null,
"hint": {
"en": "The client id for MQTT connection. If not specified, an uuid will be used",
"zh": "MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid"
},
"label": {
"en": "MQTT ClientID",
"zh": "MQTT 客户端标识符(ClinetID)"
}
},
{
"exist": false,
"name": "protocolVersion",
"default": null,
"type": "string",
"control": "select",
"optional": true,
"values": [
"3.1",
"3.1.1"
],
"hint": {
"en": "MQTT protocol version. 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4). If not specified, the default value is 3.1.",
"zh": "MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。"
},
"label": {
"en": "MQTT protocol version",
"zh": "MQTT 协议版本"
}
},
{
"exist": false,
"name": "qos",
"default": null,
"type": "list_int",
"control": "select",
"optional": true,
"values": [
0,
1,
2
],
"hint": {
"en": "The QoS for message delivery.",
"zh": "消息转发的服务质量"
},
"label": {
"en": "QoS",
"zh": "QoS"
}
},
{
"exist": false,
"name": "username",
"default": null,
"type": "string",
"control": "text",
"optional": true,
"values": null,
"hint": {
"en": "The username for the connection.",
"zh": "连接用户名"
},
"label": {
"en": "Username",
"zh": "用户名"
}
},
{
"exist": false,
"name": "password",
"default": null,
"type": "string",
"control": "text",
"optional": true,
"values": null,
"hint": {
"en": "The password for the connection.",
"zh": "连接密码。"
},
"label": {
"en": "Password",
"zh": "密码"
}
},
{
"exist": false,
"name": "certificationPath",
"default": null,
"type": "string",
"control": "text",
"optional": true,
"values": null,
"hint": {
"en": "The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the kuiperd command. For example, if you run bin/kuiperd from /var/kuiper, then the base path is /var/kuiper; If you run ./kuiperd from /var/kuiper/bin, then the base path is /var/kuiper/bin.",
"zh": "证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 kuiperd 命令的路径。比如,如果你在 /var/kuiper 中运行 bin/kuiperd ,那么父目录为 /var/kuiper; 如果运行从 /var/kuiper/bin 中运行./kuiperd,那么父目录为 /var/kuiper/bin"
},
"label": {
"en": "Certification path",
"zh": "证书路径"
}
},
{
"exist": false,
"name": "privateKeyPath",
"default": null,
"type": "string",
"control": "text",
"optional": true,
"values": null,
"hint": {
"en": "The private key path. It can be either absolute path, or relative path, which is similar to use of certificationPath.",
"zh": "私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 certificationPath 类似"
},
"label": {
"en": "Private key path",
"zh": "私钥路径"
}
},
{
"exist": false,
"name": "insecureSkipVerify",
"default": null,
"type": "bool",
"control": "radio",
"optional": true,
"values": null,
"hint": {
"en": "If InsecureSkipVerify is true, TLS accepts any certificate presented by the server and any host name in that certificate. In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is false. The configuration item can only be used with TLS connections.",
"zh": "如果 InsecureSkipVerify 设置为 true, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为false。配置项只能用于TLS连接。"
},
"label": {
"en": "Insecure skip verify",
"zh": "非安全跳过验证"
}
}
]
- get metadata/connections/mqtt/_name
{
"demo_conf":{"qos":0,"servers":["tcp://10.211.55.6:1883","tcp://127.0.0.1"]}
}
- post/put metadata/connections/mqtt/_name
{
"demo_conf":{"qos":0,"servers":["tcp://10.211.55.6:1883","tcp://127.0.0.1"]}
}
- delete metadata/connections/mqtt/_name