Skip to content

Commit

Permalink
sp: fix missing key names when query contains GROUP BY (#8028)
Browse files Browse the repository at this point in the history
Signed-off-by: Masoud Koleini <[email protected]>
  • Loading branch information
koleini authored and leonardo-albertovich committed Nov 3, 2023
1 parent cd7def9 commit 5f21aa8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 19 deletions.
39 changes: 22 additions & 17 deletions src/stream_processor/flb_sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd)
}

/*
* if some aggregated function is required, not aggregated keys are
* not allowed so we return an error (-1).
* If aggregated functions are included in the query, non-aggregated keys are
* not allowed (except for the ones inside GROUP BY statement).
*/
if (aggr > 0 && not_aggr == 0) {
return aggr;
Expand Down Expand Up @@ -490,7 +490,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
/* Check and validate aggregated keys */
ret = sp_cmd_aggregated_keys(task->cmd);
if (ret == -1) {
flb_error("[sp] aggregated query cannot mix not aggregated keys: %s",
flb_error("[sp] aggregated query cannot include the aggregated keys: %s",
query);
flb_sp_task_destroy(task);
return NULL;
Expand All @@ -506,10 +506,10 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
event = &task->window.event;
MK_EVENT_ZERO(event);

/* Run every 'size' seconds */
/* Run every 'window size' seconds */
fd = mk_event_timeout_create(sp->config->evl,
cmd->window.size, (long) 0,
&task->window.event);
event);
if (fd == -1) {
flb_error("[sp] registration for task %s failed", task->name);
flb_free(task);
Expand All @@ -525,7 +525,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
/* Run every 'size' seconds */
fd = mk_event_timeout_create(sp->config->evl,
cmd->window.advance_by, (long) 0,
&task->window.event_hop);
event);
if (fd == -1) {
flb_error("[sp] registration for task %s failed", task->name);
flb_free(task);
Expand Down Expand Up @@ -624,8 +624,7 @@ void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
flb_free(aggr_node);
}

void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
struct flb_sp_task_window *window)
void flb_sp_window_destroy(struct flb_sp_task *task)
{
struct flb_sp_window_data *data;
struct aggregate_node *aggr_node;
Expand All @@ -635,39 +634,45 @@ void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
struct mk_list *head_hs;
struct mk_list *tmp_hs;

mk_list_foreach_safe(head, tmp, &window->data) {
mk_list_foreach_safe(head, tmp, &task->window.data) {
data = mk_list_entry(head, struct flb_sp_window_data, _head);
flb_free(data->buf_data);
mk_list_del(&data->_head);
flb_free(data);
}

mk_list_foreach_safe(head, tmp, &window->aggregate_list) {
mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) {
aggr_node = mk_list_entry(head, struct aggregate_node, _head);
mk_list_del(&aggr_node->_head);
flb_sp_aggregate_node_destroy(cmd, aggr_node);
flb_sp_aggregate_node_destroy(task->cmd, aggr_node);
}

mk_list_foreach_safe(head, tmp, &window->hopping_slot) {
mk_list_foreach_safe(head, tmp, &task->window.hopping_slot) {
hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head);
mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) {
aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head);
mk_list_del(&aggr_node->_head);
flb_sp_aggregate_node_destroy(cmd, aggr_node);
flb_sp_aggregate_node_destroy(task->cmd, aggr_node);
}
rb_tree_destroy(&hs->aggregate_tree);
flb_free(hs);
}

rb_tree_destroy(&window->aggregate_tree);
if (task->window.fd > 0) {
mk_event_timeout_destroy(task->sp->config->evl, &task->window.event);
mk_event_closesocket(task->window.fd);
}

rb_tree_destroy(&task->window.aggregate_tree);
}

void flb_sp_task_destroy(struct flb_sp_task *task)
{
flb_sds_destroy(task->name);
flb_sds_destroy(task->query);
flb_sp_window_destroy(task->cmd, &task->window);
flb_sp_window_destroy(task);
flb_sp_snapshot_destroy(task->snapshot);

mk_list_del(&task->_head);

if (task->stream) {
Expand Down Expand Up @@ -1114,6 +1119,7 @@ void package_results(const char *tag, int tag_len,
char **out_buf, size_t *out_size,
struct flb_sp_task *task)
{
char *c_name;
int i;
int len;
int map_entries;
Expand Down Expand Up @@ -1165,14 +1171,13 @@ void package_results(const char *tag, int tag_len,
flb_sds_len(ckey->alias));
}
else {
len = 0;
char *c_name;
if (!ckey->name) {
c_name = "*";
}
else {
c_name = ckey->name;
}
len = strlen(c_name);

msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, c_name, len);
Expand Down
2 changes: 1 addition & 1 deletion src/stream_processor/parser/flb_sp_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func,
struct flb_sp_cmd_key *key;
struct flb_slist_entry *entry;

/* aggregation function ? */
if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) {
/* Aggregation function */
aggr_func = func;
}
else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) {
Expand Down
16 changes: 15 additions & 1 deletion tests/internal/include/sp_cb_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ static void cb_select_groupby(int id, struct task_check *check,
ret = mp_count_rows(buf, size);
TEST_CHECK(ret == 2);

/* bool is 1 for record 0 (bool=true) */
ret = mp_record_key_cmp(buf, size,
0, "bool",
MSGPACK_OBJECT_POSITIVE_INTEGER,
NULL, 1, 0);
TEST_CHECK(ret == FLB_TRUE);

/* bool is 0 for record 1 (bool=false) */
ret = mp_record_key_cmp(buf, size,
1, "bool",
MSGPACK_OBJECT_POSITIVE_INTEGER,
NULL, 0, 0);
TEST_CHECK(ret == FLB_TRUE);

/* MIN(id) is 0 for record 0 (bool=true) */
ret = mp_record_key_cmp(buf, size,
0, "MIN(id)",
Expand All @@ -556,7 +570,7 @@ static void cb_select_groupby(int id, struct task_check *check,
NULL, 8, 0);
TEST_CHECK(ret == FLB_TRUE);

/* MAX(id) is i9 for record 1 (bool=false) */
/* MAX(id) is 9 for record 1 (bool=false) */
ret = mp_record_key_cmp(buf, size,
1, "MAX(id)",
MSGPACK_OBJECT_POSITIVE_INTEGER,
Expand Down

0 comments on commit 5f21aa8

Please sign in to comment.