Skip to content

Commit

Permalink
feat(mqtt):support ECP format
Browse files Browse the repository at this point in the history
  • Loading branch information
hxy7yx committed Dec 16, 2024
1 parent 6df00a3 commit a087f53
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 6 deletions.
10 changes: 10 additions & 0 deletions include/neuron/json/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ extern "C" {
#include <stdint.h>
#include <stdlib.h>

typedef enum neu_json_type_ecp {
NEU_JSON_ECP_UNDEFINE = 0,
NEU_JSON_ECP_BOOL = 1,
NEU_JSON_ECP_INT = 2,
NEU_JSON_ECP_FLOAT = 3,
NEU_JSON_ECP_STRING = 4,
} neu_json_type_ecp_e;

typedef enum neu_json_type {
NEU_JSON_UNDEFINE = 0,
NEU_JSON_INT = 1,
Expand Down Expand Up @@ -157,6 +165,8 @@ void neu_json_elem_free(neu_json_elem_t *elem);
/* New a empty josn array */
void *neu_json_array();

int neu_json_type_transfer(neu_json_type_e type);

int neu_json_decode_by_json(void *json, int size, neu_json_elem_t *ele);
int neu_json_decode(char *buf, int size, neu_json_elem_t *ele);
int neu_json_decode_array_size_by_json(void *json, char *child);
Expand Down
5 changes: 3 additions & 2 deletions include/neuron/json/neu_json_rw.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ typedef struct {
} neu_json_read_resp_t;

int neu_json_encode_read_resp(void *json_object, void *param);
int neu_json_encode_read_resp1(void *json_object, void *param);
int neu_json_encode_read_resp2(void *json_object, void *param);
int neu_json_encode_read_resp1(void *json_object, void *param); // values
int neu_json_encode_read_resp2(void *json_object, void *param); // tags
int neu_json_encode_read_resp_ecp(void *json_object, void *param);
int neu_json_encode_read_paginate_resp(void *json_object, void *param);

typedef struct {
Expand Down
4 changes: 4 additions & 0 deletions plugins/mqtt/mqtt.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
{
"key": "tags-format",
"value": 1
},
{
"key": "ECP-format",
"value": 2
}
]
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/mqtt/mqtt_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting,

// format, required
if (MQTT_UPLOAD_FORMAT_VALUES != format.v.val_int &&
MQTT_UPLOAD_FORMAT_TAGS != format.v.val_int) {
MQTT_UPLOAD_FORMAT_TAGS != format.v.val_int &&
MQTT_UPLOAD_FORMAT_ECP != format.v.val_int) {
plog_error(plugin, "setting invalid format: %" PRIi64,
format.v.val_int);
goto error;
Expand Down
3 changes: 3 additions & 0 deletions plugins/mqtt/mqtt_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C" {
typedef enum {
MQTT_UPLOAD_FORMAT_VALUES = 0,
MQTT_UPLOAD_FORMAT_TAGS = 1,
MQTT_UPLOAD_FORMAT_ECP = 2,
} mqtt_upload_format_e;

static inline const char *mqtt_upload_format_str(mqtt_upload_format_e f)
Expand All @@ -42,6 +43,8 @@ static inline const char *mqtt_upload_format_str(mqtt_upload_format_e f)
return "format-values";
case MQTT_UPLOAD_FORMAT_TAGS:
return "format-tags";
case MQTT_UPLOAD_FORMAT_ECP:
return "ECP-format";
default:
return NULL;
}
Expand Down
15 changes: 12 additions & 3 deletions plugins/mqtt/mqtt_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,25 @@ char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
return NULL;
}

if (MQTT_UPLOAD_FORMAT_VALUES == format) { // values
switch (format) {
case MQTT_UPLOAD_FORMAT_VALUES:
neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp1, &header,
neu_json_encode_read_periodic_resp,
&json_str);
} else if (MQTT_UPLOAD_FORMAT_TAGS == format) { // tags
break;
case MQTT_UPLOAD_FORMAT_TAGS:
neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp2, &header,
neu_json_encode_read_periodic_resp,
&json_str);
} else {
break;
case MQTT_UPLOAD_FORMAT_ECP:
neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp_ecp, &header,
neu_json_encode_read_periodic_resp,
&json_str);
break;
default:
plog_warn(plugin, "invalid upload format: %d", format);
break;
}

for (int i = 0; i < json.n_tag; i++) {
Expand Down
49 changes: 49 additions & 0 deletions src/parser/neu_json_rw.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,55 @@ int neu_json_encode_read_resp2(void *json_object, void *param)
return ret;
}

int neu_json_encode_read_resp_ecp(void *json_object, void *param)
{
int ret = 0;
neu_json_read_resp_t *resp = (neu_json_read_resp_t *) param;

void * tag_array = neu_json_array();
neu_json_read_resp_tag_t *p_tag = resp->tags;
for (int i = 0; i < resp->n_tag; i++) {
neu_json_elem_t tag_elems[2 + NEU_TAG_META_SIZE] = { 0 };

tag_elems[0].name = "name";
tag_elems[0].t = NEU_JSON_STR;
tag_elems[0].v.val_str = p_tag->name;

if (p_tag->error != 0) {
continue;
} else {
tag_elems[1].name = "value";
tag_elems[1].t = p_tag->t;
tag_elems[1].v = p_tag->value;
tag_elems[1].precision = p_tag->precision;

tag_elems[2].name = "type";
tag_elems[2].t = NEU_JSON_INT;
tag_elems[2].v.val_int = neu_json_type_transfer(p_tag->t);
}

for (int k = 0; k < p_tag->n_meta; k++) {
tag_elems[3 + k].name = p_tag->metas[k].name;
tag_elems[3 + k].t = p_tag->metas[k].t;
tag_elems[3 + k].v = p_tag->metas[k].value;
}

tag_array =
neu_json_encode_array(tag_array, tag_elems, 3 + p_tag->n_meta);
p_tag++;
}

neu_json_elem_t resp_elems[] = { {
.name = "tags",
.t = NEU_JSON_OBJECT,
.v.val_object = tag_array,
} };
ret = neu_json_encode_field(json_object, resp_elems,
NEU_JSON_ELEM_SIZE(resp_elems));

return ret;
}

int neu_json_decode_write_req(char *buf, neu_json_write_req_t **result)
{
void *json_obj = neu_json_decode_new(buf);
Expand Down
20 changes: 20 additions & 0 deletions src/utils/json.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ static json_t *encode_object_value(neu_json_elem_t *ele)
return ob;
}

int neu_json_type_transfer(neu_json_type_e type)
{
switch (type) {
case NEU_JSON_BOOL:
return NEU_JSON_ECP_BOOL;
case NEU_JSON_INT:
case NEU_JSON_BIT:
return NEU_JSON_ECP_INT;
case NEU_JSON_FLOAT:
case NEU_JSON_DOUBLE:
return NEU_JSON_ECP_FLOAT;
case NEU_JSON_STR:
return NEU_JSON_ECP_STRING;
default:
break;
}

return NEU_JSON_ECP_STRING;
}

static json_t *encode_object(json_t *object, neu_json_elem_t ele)
{
json_t *ob = object;
Expand Down

0 comments on commit a087f53

Please sign in to comment.