Skip to content

Commit

Permalink
feat(mqtt):filter tags with errors when uploadind
Browse files Browse the repository at this point in the history
  • Loading branch information
hxy7yx committed Dec 13, 2024
1 parent 3d6f0d0 commit 6df00a3
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 6 deletions.
2 changes: 1 addition & 1 deletion plugins/mqtt/azure_iot_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ static int azure_handle_trans_data(neu_plugin_t * plugin,
}

char *json_str =
generate_upload_json(plugin, trans_data, plugin->config.format);
generate_upload_json(plugin, trans_data, plugin->config.format, NULL);
if (NULL == json_str) {
plog_error(plugin, "generate upload json fail");
return NEU_ERR_EINTERNAL;
Expand Down
10 changes: 10 additions & 0 deletions plugins/mqtt/mqtt.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@
]
}
},
"upload_err": {
"name": "Upload Tag Error Code ",
"name_zh": "上报点位错误码",
"description": "When data tag collection reports an error, report the tag error code.",
"description_zh": "点位采集报错时,上报点位错误码。",
"attribute": "optional",
"type": "bool",
"default": true,
"valid": {}
},
"write-req-topic": {
"name": "Write Request Topic",
"name_zh": "写请求主题",
Expand Down
13 changes: 13 additions & 0 deletions plugins/mqtt/mqtt_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting,
neu_json_elem_t upload_drv_state_interval = {
.name = "upload_drv_state_interval", .t = NEU_JSON_INT
};
neu_json_elem_t upload_err = {
.name = "upload_err",
.t = NEU_JSON_BOOL,
.v.val_bool = true,
.attribute = NEU_JSON_ATTRIBUTE_OPTIONAL,
};

if (NULL == setting || NULL == config) {
plog_error(plugin, "invalid argument, null pointer");
Expand Down Expand Up @@ -435,6 +441,11 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting,
goto error;
}

ret = neu_parse_param(setting, NULL, 1, &upload_err);
if (0 != ret) {
plog_notice(plugin, "setting upload_err failed");
}

config->version = version.v.val_int;
config->client_id = client_id.v.val_str;
config->qos = qos.v.val_int;
Expand All @@ -459,6 +470,7 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting,
config->upload_drv_state = upload_drv_state.v.val_bool;
config->heartbeat_topic = upload_drv_state_topic.v.val_str;
config->heartbeat_interval = upload_drv_state_interval.v.val_int;
config->upload_err = upload_err.v.val_bool;

plog_notice(plugin, "config MQTT version : %d", config->version);
plog_notice(plugin, "config client-id : %s", config->client_id);
Expand All @@ -474,6 +486,7 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting,
config->driver_action_resp_topic);
plog_notice(plugin, "config upload-drv-state: %d",
config->upload_drv_state);
plog_notice(plugin, "config upload-err: %d", config->upload_err);
if (config->upload_drv_state) {
if (config->heartbeat_topic) {
plog_notice(plugin, "config upload-drv-state-topic: %s",
Expand Down
1 change: 1 addition & 0 deletions plugins/mqtt/mqtt_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef struct {
char * write_resp_topic; // write response topic
char * driver_action_req_topic; // driver action request topic
char * driver_action_resp_topic; // driver action response topic
bool upload_err; // Upload tag error code flag
bool upload_drv_state; // upload driver state flag
char * heartbeat_topic; // upload driver state topic
uint16_t heartbeat_interval; // upload driver state interval
Expand Down
40 changes: 37 additions & 3 deletions plugins/mqtt/mqtt_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,45 @@ static int tag_values_to_json(UT_array *tags, neu_json_read_resp_t *json)
return 0;
}

void filter_error_tags(neu_reqresp_trans_data_t *data)
{
if (!data || !data->tags) {
return;
}

UT_array *filtered_tags;
utarray_new(filtered_tags, neu_resp_tag_value_meta_icd());

neu_resp_tag_value_meta_t *tag_ptr = NULL;
while ((tag_ptr = (neu_resp_tag_value_meta_t *) utarray_next(data->tags,
tag_ptr))) {
if (tag_ptr->value.type != NEU_TYPE_ERROR) {
utarray_push_back(filtered_tags, tag_ptr);
}
}

utarray_free(data->tags);
data->tags = filtered_tags;
}

char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
mqtt_upload_format_e format)
mqtt_upload_format_e format, bool *skip)
{
char * json_str = NULL;
neu_json_read_periodic_t header = { .group = (char *) data->group,
.node = (char *) data->driver,
.timestamp = global_timestamp };
neu_json_read_resp_t json = { 0 };

if (!plugin->config.upload_err && skip != NULL) {
filter_error_tags(data);

if (utarray_len(data->tags) == 0) {
*skip = true;
return NULL;
}
}

if (0 != tag_values_to_json(data->tags, &json)) {
plog_error(plugin, "tag_values_to_json fail");
return NULL;
Expand Down Expand Up @@ -802,8 +832,12 @@ int handle_trans_data(neu_plugin_t * plugin,
break;
}

char *json_str =
generate_upload_json(plugin, trans_data, plugin->config.format);
bool skip_none = false;
char *json_str = generate_upload_json(
plugin, trans_data, plugin->config.format, &skip_none);
if (skip_none) {
break;
}
if (NULL == json_str) {
plog_error(plugin, "generate upload json fail");
rv = NEU_ERR_EINTERNAL;
Expand Down
2 changes: 1 addition & 1 deletion plugins/mqtt/mqtt_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int handle_driver_action_response(neu_plugin_t * plugin,
neu_resp_driver_action_t *data);

char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
mqtt_upload_format_e format);
mqtt_upload_format_e format, bool *skip);
int handle_trans_data(neu_plugin_t * plugin,
neu_reqresp_trans_data_t *trans_data);

Expand Down
34 changes: 33 additions & 1 deletion tests/ft/app/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
INTERVAL = 100
TAG0 = {
"name": "tag0",
"address": "1!400001.",
"address": "1!400001",
"attribute": config.NEU_TAG_ATTRIBUTE_RW,
"type": config.NEU_TYPE_INT16,
}
TAG_ERR = {
"name": "tag_err",
"address": "100!400001",
"attribute": config.NEU_TAG_ATTRIBUTE_RW,
"type": config.NEU_TYPE_INT16,
}
Expand Down Expand Up @@ -203,6 +209,14 @@ def conf_upload_driver_state(conf_base):
}


@pytest.fixture(scope="function")
def conf_upload_err(conf_base):
return {
**conf_base,
"upload_err": False,
}


@pytest.fixture(scope="function")
def conf_bad_host(conf_base):
return {
Expand Down Expand Up @@ -462,6 +476,24 @@ def test_mqtt_upload(self, mocker, request, conf):
finally:
api.del_tags(DRIVER, GROUP, [tag["name"] for tag in TAGS])

@description(
given="MQTT node",
when="read an error tag and set upload_err false",
then="broker should not receive error data on upload topic",
)
def test_mqtt_upload_err(self, mocker, conf_upload_err):
api.node_setting_check(NODE, conf_upload_err)
api.node_ctl(NODE, config.NEU_CTL_START)

try:
api.add_tags_check(DRIVER, GROUP, tags=[TAG_ERR])
msg = mocker.get(UPLOAD_TOPIC, timeout=3)
assert msg is not None
assert 0 == len(msg["errors"])

finally:
api.del_tags(DRIVER, GROUP, [TAG_ERR["name"]])

@description(
given="MQTT node",
when="after renaming subscribed driver",
Expand Down

0 comments on commit 6df00a3

Please sign in to comment.