From a087f53d04118483a36d6beaea21bf1236d1be99 Mon Sep 17 00:00:00 2001 From: hxy7yx <1595670487@qq.com> Date: Mon, 16 Dec 2024 14:20:31 +0800 Subject: [PATCH] feat(mqtt):support ECP format --- include/neuron/json/json.h | 10 +++++++ include/neuron/json/neu_json_rw.h | 5 ++-- plugins/mqtt/mqtt.json | 4 +++ plugins/mqtt/mqtt_config.c | 3 +- plugins/mqtt/mqtt_config.h | 3 ++ plugins/mqtt/mqtt_handle.c | 15 ++++++++-- src/parser/neu_json_rw.c | 49 +++++++++++++++++++++++++++++++ src/utils/json.c | 20 +++++++++++++ 8 files changed, 103 insertions(+), 6 deletions(-) diff --git a/include/neuron/json/json.h b/include/neuron/json/json.h index 96f090911..58004be02 100644 --- a/include/neuron/json/json.h +++ b/include/neuron/json/json.h @@ -28,6 +28,14 @@ extern "C" { #include #include +typedef enum neu_json_type_ecp { + NEU_JSON_ECP_UNDEFINE = 0, + NEU_JSON_ECP_BOOL = 1, + NEU_JSON_ECP_INT = 2, + NEU_JSON_ECP_FLOAT = 3, + NEU_JSON_ECP_STRING = 4, +} neu_json_type_ecp_e; + typedef enum neu_json_type { NEU_JSON_UNDEFINE = 0, NEU_JSON_INT = 1, @@ -157,6 +165,8 @@ void neu_json_elem_free(neu_json_elem_t *elem); /* New a empty josn array */ void *neu_json_array(); +int neu_json_type_transfer(neu_json_type_e type); + int neu_json_decode_by_json(void *json, int size, neu_json_elem_t *ele); int neu_json_decode(char *buf, int size, neu_json_elem_t *ele); int neu_json_decode_array_size_by_json(void *json, char *child); diff --git a/include/neuron/json/neu_json_rw.h b/include/neuron/json/neu_json_rw.h index 187d106ba..97d02438a 100644 --- a/include/neuron/json/neu_json_rw.h +++ b/include/neuron/json/neu_json_rw.h @@ -62,8 +62,9 @@ typedef struct { } neu_json_read_resp_t; int neu_json_encode_read_resp(void *json_object, void *param); -int neu_json_encode_read_resp1(void *json_object, void *param); -int neu_json_encode_read_resp2(void *json_object, void *param); +int neu_json_encode_read_resp1(void *json_object, void *param); // values +int neu_json_encode_read_resp2(void *json_object, void *param); // tags +int neu_json_encode_read_resp_ecp(void *json_object, void *param); int neu_json_encode_read_paginate_resp(void *json_object, void *param); typedef struct { diff --git a/plugins/mqtt/mqtt.json b/plugins/mqtt/mqtt.json index cd4a5eaec..5bd59bf0c 100644 --- a/plugins/mqtt/mqtt.json +++ b/plugins/mqtt/mqtt.json @@ -70,6 +70,10 @@ { "key": "tags-format", "value": 1 + }, + { + "key": "ECP-format", + "value": 2 } ] } diff --git a/plugins/mqtt/mqtt_config.c b/plugins/mqtt/mqtt_config.c index 32e22955b..2505724a4 100644 --- a/plugins/mqtt/mqtt_config.c +++ b/plugins/mqtt/mqtt_config.c @@ -354,7 +354,8 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting, // format, required if (MQTT_UPLOAD_FORMAT_VALUES != format.v.val_int && - MQTT_UPLOAD_FORMAT_TAGS != format.v.val_int) { + MQTT_UPLOAD_FORMAT_TAGS != format.v.val_int && + MQTT_UPLOAD_FORMAT_ECP != format.v.val_int) { plog_error(plugin, "setting invalid format: %" PRIi64, format.v.val_int); goto error; diff --git a/plugins/mqtt/mqtt_config.h b/plugins/mqtt/mqtt_config.h index 1b79f5355..1de354b7b 100644 --- a/plugins/mqtt/mqtt_config.h +++ b/plugins/mqtt/mqtt_config.h @@ -33,6 +33,7 @@ extern "C" { typedef enum { MQTT_UPLOAD_FORMAT_VALUES = 0, MQTT_UPLOAD_FORMAT_TAGS = 1, + MQTT_UPLOAD_FORMAT_ECP = 2, } mqtt_upload_format_e; static inline const char *mqtt_upload_format_str(mqtt_upload_format_e f) @@ -42,6 +43,8 @@ static inline const char *mqtt_upload_format_str(mqtt_upload_format_e f) return "format-values"; case MQTT_UPLOAD_FORMAT_TAGS: return "format-tags"; + case MQTT_UPLOAD_FORMAT_ECP: + return "ECP-format"; default: return NULL; } diff --git a/plugins/mqtt/mqtt_handle.c b/plugins/mqtt/mqtt_handle.c index 905dbd5f3..40c0f10c7 100644 --- a/plugins/mqtt/mqtt_handle.c +++ b/plugins/mqtt/mqtt_handle.c @@ -108,16 +108,25 @@ char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data, return NULL; } - if (MQTT_UPLOAD_FORMAT_VALUES == format) { // values + switch (format) { + case MQTT_UPLOAD_FORMAT_VALUES: neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp1, &header, neu_json_encode_read_periodic_resp, &json_str); - } else if (MQTT_UPLOAD_FORMAT_TAGS == format) { // tags + break; + case MQTT_UPLOAD_FORMAT_TAGS: neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp2, &header, neu_json_encode_read_periodic_resp, &json_str); - } else { + break; + case MQTT_UPLOAD_FORMAT_ECP: + neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp_ecp, &header, + neu_json_encode_read_periodic_resp, + &json_str); + break; + default: plog_warn(plugin, "invalid upload format: %d", format); + break; } for (int i = 0; i < json.n_tag; i++) { diff --git a/src/parser/neu_json_rw.c b/src/parser/neu_json_rw.c index 5cddaf10d..4cb8eb20a 100644 --- a/src/parser/neu_json_rw.c +++ b/src/parser/neu_json_rw.c @@ -345,6 +345,55 @@ int neu_json_encode_read_resp2(void *json_object, void *param) return ret; } +int neu_json_encode_read_resp_ecp(void *json_object, void *param) +{ + int ret = 0; + neu_json_read_resp_t *resp = (neu_json_read_resp_t *) param; + + void * tag_array = neu_json_array(); + neu_json_read_resp_tag_t *p_tag = resp->tags; + for (int i = 0; i < resp->n_tag; i++) { + neu_json_elem_t tag_elems[2 + NEU_TAG_META_SIZE] = { 0 }; + + tag_elems[0].name = "name"; + tag_elems[0].t = NEU_JSON_STR; + tag_elems[0].v.val_str = p_tag->name; + + if (p_tag->error != 0) { + continue; + } else { + tag_elems[1].name = "value"; + tag_elems[1].t = p_tag->t; + tag_elems[1].v = p_tag->value; + tag_elems[1].precision = p_tag->precision; + + tag_elems[2].name = "type"; + tag_elems[2].t = NEU_JSON_INT; + tag_elems[2].v.val_int = neu_json_type_transfer(p_tag->t); + } + + for (int k = 0; k < p_tag->n_meta; k++) { + tag_elems[3 + k].name = p_tag->metas[k].name; + tag_elems[3 + k].t = p_tag->metas[k].t; + tag_elems[3 + k].v = p_tag->metas[k].value; + } + + tag_array = + neu_json_encode_array(tag_array, tag_elems, 3 + p_tag->n_meta); + p_tag++; + } + + neu_json_elem_t resp_elems[] = { { + .name = "tags", + .t = NEU_JSON_OBJECT, + .v.val_object = tag_array, + } }; + ret = neu_json_encode_field(json_object, resp_elems, + NEU_JSON_ELEM_SIZE(resp_elems)); + + return ret; +} + int neu_json_decode_write_req(char *buf, neu_json_write_req_t **result) { void *json_obj = neu_json_decode_new(buf); diff --git a/src/utils/json.c b/src/utils/json.c index b402ba8b6..6f9209839 100644 --- a/src/utils/json.c +++ b/src/utils/json.c @@ -155,6 +155,26 @@ static json_t *encode_object_value(neu_json_elem_t *ele) return ob; } +int neu_json_type_transfer(neu_json_type_e type) +{ + switch (type) { + case NEU_JSON_BOOL: + return NEU_JSON_ECP_BOOL; + case NEU_JSON_INT: + case NEU_JSON_BIT: + return NEU_JSON_ECP_INT; + case NEU_JSON_FLOAT: + case NEU_JSON_DOUBLE: + return NEU_JSON_ECP_FLOAT; + case NEU_JSON_STR: + return NEU_JSON_ECP_STRING; + default: + break; + } + + return NEU_JSON_ECP_STRING; +} + static json_t *encode_object(json_t *object, neu_json_elem_t ele) { json_t *ob = object;