From 6df00a37d6be9be523dd7e9a57b3427ceca055be Mon Sep 17 00:00:00 2001 From: hxy7yx <1595670487@qq.com> Date: Wed, 11 Dec 2024 16:00:30 +0800 Subject: [PATCH] feat(mqtt):filter tags with errors when uploadind --- plugins/mqtt/azure_iot_plugin.c | 2 +- plugins/mqtt/mqtt.json | 10 +++++++++ plugins/mqtt/mqtt_config.c | 13 +++++++++++ plugins/mqtt/mqtt_config.h | 1 + plugins/mqtt/mqtt_handle.c | 40 ++++++++++++++++++++++++++++++--- plugins/mqtt/mqtt_handle.h | 2 +- tests/ft/app/test_mqtt.py | 34 +++++++++++++++++++++++++++- 7 files changed, 96 insertions(+), 6 deletions(-) diff --git a/plugins/mqtt/azure_iot_plugin.c b/plugins/mqtt/azure_iot_plugin.c index a806ffe2a..bd470edc6 100644 --- a/plugins/mqtt/azure_iot_plugin.c +++ b/plugins/mqtt/azure_iot_plugin.c @@ -300,7 +300,7 @@ static int azure_handle_trans_data(neu_plugin_t * plugin, } char *json_str = - generate_upload_json(plugin, trans_data, plugin->config.format); + generate_upload_json(plugin, trans_data, plugin->config.format, NULL); if (NULL == json_str) { plog_error(plugin, "generate upload json fail"); return NEU_ERR_EINTERNAL; diff --git a/plugins/mqtt/mqtt.json b/plugins/mqtt/mqtt.json index 570e2f8a3..cd4a5eaec 100644 --- a/plugins/mqtt/mqtt.json +++ b/plugins/mqtt/mqtt.json @@ -74,6 +74,16 @@ ] } }, + "upload_err": { + "name": "Upload Tag Error Code ", + "name_zh": "上报点位错误码", + "description": "When data tag collection reports an error, report the tag error code.", + "description_zh": "点位采集报错时,上报点位错误码。", + "attribute": "optional", + "type": "bool", + "default": true, + "valid": {} + }, "write-req-topic": { "name": "Write Request Topic", "name_zh": "写请求主题", diff --git a/plugins/mqtt/mqtt_config.c b/plugins/mqtt/mqtt_config.c index 2654c3dce..32e22955b 100644 --- a/plugins/mqtt/mqtt_config.c +++ b/plugins/mqtt/mqtt_config.c @@ -315,6 +315,12 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting, neu_json_elem_t upload_drv_state_interval = { .name = "upload_drv_state_interval", .t = NEU_JSON_INT }; + neu_json_elem_t upload_err = { + .name = "upload_err", + .t = NEU_JSON_BOOL, + .v.val_bool = true, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; if (NULL == setting || NULL == config) { plog_error(plugin, "invalid argument, null pointer"); @@ -435,6 +441,11 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting, goto error; } + ret = neu_parse_param(setting, NULL, 1, &upload_err); + if (0 != ret) { + plog_notice(plugin, "setting upload_err failed"); + } + config->version = version.v.val_int; config->client_id = client_id.v.val_str; config->qos = qos.v.val_int; @@ -459,6 +470,7 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting, config->upload_drv_state = upload_drv_state.v.val_bool; config->heartbeat_topic = upload_drv_state_topic.v.val_str; config->heartbeat_interval = upload_drv_state_interval.v.val_int; + config->upload_err = upload_err.v.val_bool; plog_notice(plugin, "config MQTT version : %d", config->version); plog_notice(plugin, "config client-id : %s", config->client_id); @@ -474,6 +486,7 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting, config->driver_action_resp_topic); plog_notice(plugin, "config upload-drv-state: %d", config->upload_drv_state); + plog_notice(plugin, "config upload-err: %d", config->upload_err); if (config->upload_drv_state) { if (config->heartbeat_topic) { plog_notice(plugin, "config upload-drv-state-topic: %s", diff --git a/plugins/mqtt/mqtt_config.h b/plugins/mqtt/mqtt_config.h index c28ae7cfd..1b79f5355 100644 --- a/plugins/mqtt/mqtt_config.h +++ b/plugins/mqtt/mqtt_config.h @@ -56,6 +56,7 @@ typedef struct { char * write_resp_topic; // write response topic char * driver_action_req_topic; // driver action request topic char * driver_action_resp_topic; // driver action response topic + bool upload_err; // Upload tag error code flag bool upload_drv_state; // upload driver state flag char * heartbeat_topic; // upload driver state topic uint16_t heartbeat_interval; // upload driver state interval diff --git a/plugins/mqtt/mqtt_handle.c b/plugins/mqtt/mqtt_handle.c index 77a9308b0..905dbd5f3 100644 --- a/plugins/mqtt/mqtt_handle.c +++ b/plugins/mqtt/mqtt_handle.c @@ -64,8 +64,29 @@ static int tag_values_to_json(UT_array *tags, neu_json_read_resp_t *json) return 0; } +void filter_error_tags(neu_reqresp_trans_data_t *data) +{ + if (!data || !data->tags) { + return; + } + + UT_array *filtered_tags; + utarray_new(filtered_tags, neu_resp_tag_value_meta_icd()); + + neu_resp_tag_value_meta_t *tag_ptr = NULL; + while ((tag_ptr = (neu_resp_tag_value_meta_t *) utarray_next(data->tags, + tag_ptr))) { + if (tag_ptr->value.type != NEU_TYPE_ERROR) { + utarray_push_back(filtered_tags, tag_ptr); + } + } + + utarray_free(data->tags); + data->tags = filtered_tags; +} + char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data, - mqtt_upload_format_e format) + mqtt_upload_format_e format, bool *skip) { char * json_str = NULL; neu_json_read_periodic_t header = { .group = (char *) data->group, @@ -73,6 +94,15 @@ char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data, .timestamp = global_timestamp }; neu_json_read_resp_t json = { 0 }; + if (!plugin->config.upload_err && skip != NULL) { + filter_error_tags(data); + + if (utarray_len(data->tags) == 0) { + *skip = true; + return NULL; + } + } + if (0 != tag_values_to_json(data->tags, &json)) { plog_error(plugin, "tag_values_to_json fail"); return NULL; @@ -802,8 +832,12 @@ int handle_trans_data(neu_plugin_t * plugin, break; } - char *json_str = - generate_upload_json(plugin, trans_data, plugin->config.format); + bool skip_none = false; + char *json_str = generate_upload_json( + plugin, trans_data, plugin->config.format, &skip_none); + if (skip_none) { + break; + } if (NULL == json_str) { plog_error(plugin, "generate upload json fail"); rv = NEU_ERR_EINTERNAL; diff --git a/plugins/mqtt/mqtt_handle.h b/plugins/mqtt/mqtt_handle.h index fd1dabde7..31df78812 100644 --- a/plugins/mqtt/mqtt_handle.h +++ b/plugins/mqtt/mqtt_handle.h @@ -58,7 +58,7 @@ int handle_driver_action_response(neu_plugin_t * plugin, neu_resp_driver_action_t *data); char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data, - mqtt_upload_format_e format); + mqtt_upload_format_e format, bool *skip); int handle_trans_data(neu_plugin_t * plugin, neu_reqresp_trans_data_t *trans_data); diff --git a/tests/ft/app/test_mqtt.py b/tests/ft/app/test_mqtt.py index 87e170fbf..842eba8ee 100644 --- a/tests/ft/app/test_mqtt.py +++ b/tests/ft/app/test_mqtt.py @@ -24,7 +24,13 @@ INTERVAL = 100 TAG0 = { "name": "tag0", - "address": "1!400001.", + "address": "1!400001", + "attribute": config.NEU_TAG_ATTRIBUTE_RW, + "type": config.NEU_TYPE_INT16, +} +TAG_ERR = { + "name": "tag_err", + "address": "100!400001", "attribute": config.NEU_TAG_ATTRIBUTE_RW, "type": config.NEU_TYPE_INT16, } @@ -203,6 +209,14 @@ def conf_upload_driver_state(conf_base): } +@pytest.fixture(scope="function") +def conf_upload_err(conf_base): + return { + **conf_base, + "upload_err": False, + } + + @pytest.fixture(scope="function") def conf_bad_host(conf_base): return { @@ -462,6 +476,24 @@ def test_mqtt_upload(self, mocker, request, conf): finally: api.del_tags(DRIVER, GROUP, [tag["name"] for tag in TAGS]) + @description( + given="MQTT node", + when="read an error tag and set upload_err false", + then="broker should not receive error data on upload topic", + ) + def test_mqtt_upload_err(self, mocker, conf_upload_err): + api.node_setting_check(NODE, conf_upload_err) + api.node_ctl(NODE, config.NEU_CTL_START) + + try: + api.add_tags_check(DRIVER, GROUP, tags=[TAG_ERR]) + msg = mocker.get(UPLOAD_TOPIC, timeout=3) + assert msg is not None + assert 0 == len(msg["errors"]) + + finally: + api.del_tags(DRIVER, GROUP, [TAG_ERR["name"]]) + @description( given="MQTT node", when="after renaming subscribed driver",