Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: fixes from git master to v2.1 #8064

Merged
merged 2 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off);
if (ret == -1) {
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
Expand Down
39 changes: 22 additions & 17 deletions src/stream_processor/flb_sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd)
}

/*
* if some aggregated function is required, not aggregated keys are
* not allowed so we return an error (-1).
* If aggregated functions are included in the query, non-aggregated keys are
* not allowed (except for the ones inside GROUP BY statement).
*/
if (aggr > 0 && not_aggr == 0) {
return aggr;
Expand Down Expand Up @@ -490,7 +490,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
/* Check and validate aggregated keys */
ret = sp_cmd_aggregated_keys(task->cmd);
if (ret == -1) {
flb_error("[sp] aggregated query cannot mix not aggregated keys: %s",
flb_error("[sp] aggregated query cannot include the aggregated keys: %s",
query);
flb_sp_task_destroy(task);
return NULL;
Expand All @@ -506,10 +506,10 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
event = &task->window.event;
MK_EVENT_ZERO(event);

/* Run every 'size' seconds */
/* Run every 'window size' seconds */
fd = mk_event_timeout_create(sp->config->evl,
cmd->window.size, (long) 0,
&task->window.event);
event);
if (fd == -1) {
flb_error("[sp] registration for task %s failed", task->name);
flb_free(task);
Expand All @@ -525,7 +525,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
/* Run every 'size' seconds */
fd = mk_event_timeout_create(sp->config->evl,
cmd->window.advance_by, (long) 0,
&task->window.event_hop);
event);
if (fd == -1) {
flb_error("[sp] registration for task %s failed", task->name);
flb_free(task);
Expand Down Expand Up @@ -624,8 +624,7 @@ void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
flb_free(aggr_node);
}

void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
struct flb_sp_task_window *window)
void flb_sp_window_destroy(struct flb_sp_task *task)
{
struct flb_sp_window_data *data;
struct aggregate_node *aggr_node;
Expand All @@ -635,39 +634,45 @@ void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
struct mk_list *head_hs;
struct mk_list *tmp_hs;

mk_list_foreach_safe(head, tmp, &window->data) {
mk_list_foreach_safe(head, tmp, &task->window.data) {
data = mk_list_entry(head, struct flb_sp_window_data, _head);
flb_free(data->buf_data);
mk_list_del(&data->_head);
flb_free(data);
}

mk_list_foreach_safe(head, tmp, &window->aggregate_list) {
mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) {
aggr_node = mk_list_entry(head, struct aggregate_node, _head);
mk_list_del(&aggr_node->_head);
flb_sp_aggregate_node_destroy(cmd, aggr_node);
flb_sp_aggregate_node_destroy(task->cmd, aggr_node);
}

mk_list_foreach_safe(head, tmp, &window->hopping_slot) {
mk_list_foreach_safe(head, tmp, &task->window.hopping_slot) {
hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head);
mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) {
aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head);
mk_list_del(&aggr_node->_head);
flb_sp_aggregate_node_destroy(cmd, aggr_node);
flb_sp_aggregate_node_destroy(task->cmd, aggr_node);
}
rb_tree_destroy(&hs->aggregate_tree);
flb_free(hs);
}

rb_tree_destroy(&window->aggregate_tree);
if (task->window.fd > 0) {
mk_event_timeout_destroy(task->sp->config->evl, &task->window.event);
mk_event_closesocket(task->window.fd);
}

rb_tree_destroy(&task->window.aggregate_tree);
}

void flb_sp_task_destroy(struct flb_sp_task *task)
{
flb_sds_destroy(task->name);
flb_sds_destroy(task->query);
flb_sp_window_destroy(task->cmd, &task->window);
flb_sp_window_destroy(task);
flb_sp_snapshot_destroy(task->snapshot);

mk_list_del(&task->_head);

if (task->stream) {
Expand Down Expand Up @@ -1114,6 +1119,7 @@ void package_results(const char *tag, int tag_len,
char **out_buf, size_t *out_size,
struct flb_sp_task *task)
{
char *c_name;
int i;
int len;
int map_entries;
Expand Down Expand Up @@ -1165,14 +1171,13 @@ void package_results(const char *tag, int tag_len,
flb_sds_len(ckey->alias));
}
else {
len = 0;
char *c_name;
if (!ckey->name) {
c_name = "*";
}
else {
c_name = ckey->name;
}
len = strlen(c_name);

msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, c_name, len);
Expand Down
2 changes: 1 addition & 1 deletion src/stream_processor/parser/flb_sp_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func,
struct flb_sp_cmd_key *key;
struct flb_slist_entry *entry;

/* aggregation function ? */
if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) {
/* Aggregation function */
aggr_func = func;
}
else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) {
Expand Down
16 changes: 15 additions & 1 deletion tests/internal/include/sp_cb_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ static void cb_select_groupby(int id, struct task_check *check,
ret = mp_count_rows(buf, size);
TEST_CHECK(ret == 2);

/* bool is 1 for record 0 (bool=true) */
ret = mp_record_key_cmp(buf, size,
0, "bool",
MSGPACK_OBJECT_POSITIVE_INTEGER,
NULL, 1, 0);
TEST_CHECK(ret == FLB_TRUE);

/* bool is 0 for record 1 (bool=false) */
ret = mp_record_key_cmp(buf, size,
1, "bool",
MSGPACK_OBJECT_POSITIVE_INTEGER,
NULL, 0, 0);
TEST_CHECK(ret == FLB_TRUE);

/* MIN(id) is 0 for record 0 (bool=true) */
ret = mp_record_key_cmp(buf, size,
0, "MIN(id)",
Expand All @@ -556,7 +570,7 @@ static void cb_select_groupby(int id, struct task_check *check,
NULL, 8, 0);
TEST_CHECK(ret == FLB_TRUE);

/* MAX(id) is i9 for record 1 (bool=false) */
/* MAX(id) is 9 for record 1 (bool=false) */
ret = mp_record_key_cmp(buf, size,
1, "MAX(id)",
MSGPACK_OBJECT_POSITIVE_INTEGER,
Expand Down
Loading