Skip to content

Commit

Permalink
out_loki: allow sending unquoted strings
Browse files Browse the repository at this point in the history
This patch adds a third value to `drop_single_key` - `raw`, which allows
sending unquoted strings to Loki when using JSON as the `line_format`.

While yes, for the output to be valid JSON, quotes would be expected,
Loki does not support reading a plain quoted string with its JSON
parser, complaining that it cannot find a `}` character.
Instead, you need to use a combination of regexp and line_format
expressions to unquote the log before running any other parsers over it.

By adding a third value of `raw`, this ensures backwards compatibility
for anyone that is already relying on the existing behaviour.

Signed-off-by: Andrew Titmuss <[email protected]>
  • Loading branch information
iandrewt authored and edsiper committed Jun 24, 2024
1 parent b20df67 commit 1009138
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 15 deletions.
54 changes: 48 additions & 6 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,12 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
struct flb_config *config)
{
int ret;
int tmp;
int io_flags = 0;
struct flb_loki *ctx;
struct flb_upstream *upstream;
char *compress;
char *drop_single_key;

/* Create context */
ctx = flb_calloc(1, sizeof(struct flb_loki));
Expand Down Expand Up @@ -962,6 +964,29 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
}
}

/* Drop Single Key */
drop_single_key = flb_output_get_property("drop_single_key", ins);
ctx->out_drop_single_key = FLB_LOKI_DROP_SINGLE_KEY_OFF;
if (drop_single_key) {
if (strcasecmp(drop_single_key, "raw") == 0) {
ctx->out_drop_single_key = FLB_LOKI_DROP_SINGLE_KEY_ON | FLB_LOKI_DROP_SINGLE_KEY_RAW;
}
else {
tmp = flb_utils_bool(drop_single_key);
if (tmp == FLB_TRUE) {
ctx->out_drop_single_key = FLB_LOKI_DROP_SINGLE_KEY_ON;
}
else if (tmp == FLB_FALSE) {
ctx->out_drop_single_key = FLB_LOKI_DROP_SINGLE_KEY_OFF;
}
else {
flb_plg_error(ctx->ins, "invalid 'drop_single_key' value: %s",
ctx->drop_single_key);
return NULL;
}
}
}

/* Line Format */
if (strcasecmp(ctx->line_format, "json") == 0) {
ctx->out_line_format = FLB_LOKI_FMT_JSON;
Expand Down Expand Up @@ -1206,12 +1231,28 @@ static int pack_record(struct flb_loki *ctx,
}

/* Drop single key */
if (ctx->drop_single_key == FLB_TRUE && rec->type == MSGPACK_OBJECT_MAP && rec->via.map.size == 1) {
if (ctx->out_drop_single_key & FLB_LOKI_DROP_SINGLE_KEY_ON &&
rec->type == MSGPACK_OBJECT_MAP && rec->via.map.size == 1) {
val = rec->via.map.ptr[0].val;

if (ctx->out_line_format == FLB_LOKI_FMT_JSON) {
rec = &rec->via.map.ptr[0].val;
} else if (ctx->out_line_format == FLB_LOKI_FMT_KV) {
val = rec->via.map.ptr[0].val;
if (val.type == MSGPACK_OBJECT_STR &&
ctx->out_drop_single_key & FLB_LOKI_DROP_SINGLE_KEY_RAW) {
msgpack_pack_str(mp_pck, val.via.str.size);
msgpack_pack_str_body(mp_pck, val.via.str.ptr, val.via.str.size);

msgpack_unpacked_destroy(&mp_buffer);
if (tmp_sbuf_data) {
flb_free(tmp_sbuf_data);
}

return 0;
}
else {
rec = &val;
}
}
else if (ctx->out_line_format == FLB_LOKI_FMT_KV) {
if (val.type == MSGPACK_OBJECT_STR) {
msgpack_pack_str(mp_pck, val.via.str.size);
msgpack_pack_str_body(mp_pck, val.via.str.ptr, val.via.str.size);
Expand Down Expand Up @@ -1774,10 +1815,11 @@ static struct flb_config_map config_map[] = {
},

{
FLB_CONFIG_MAP_BOOL, "drop_single_key", "false",
FLB_CONFIG_MAP_STR, "drop_single_key", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, drop_single_key),
"If set to true and only a single key remains, the log line sent to Loki "
"will be the value of that key.",
"will be the value of that key. If set to 'raw' and the log line is "
"a string, the log line will be sent unquoted.",
},

{
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_loki/loki.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
#define FLB_LOKI_FMT_JSON 0
#define FLB_LOKI_FMT_KV 1

/* Drop single key */
#define FLB_LOKI_DROP_SINGLE_KEY_OFF (((uint64_t) 1) << 0)
#define FLB_LOKI_DROP_SINGLE_KEY_ON (((uint64_t) 1) << 1)
#define FLB_LOKI_DROP_SINGLE_KEY_RAW (((uint64_t) 1) << 2)

struct flb_loki_kv {
int val_type; /* FLB_LOKI_KV_STR or FLB_LOKI_KV_RA */
flb_sds_t key; /* string key */
Expand All @@ -54,8 +59,8 @@ struct flb_loki_kv {
struct flb_loki {
/* Public configuration properties */
int auto_kubernetes_labels;
int drop_single_key;

flb_sds_t drop_single_key;
flb_sds_t uri;
flb_sds_t line_format;
flb_sds_t tenant_id;
Expand All @@ -80,6 +85,7 @@ struct flb_loki {
int tcp_port;
char *tcp_host;
int out_line_format;
int out_drop_single_key;
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
Expand Down
195 changes: 187 additions & 8 deletions tests/runtime/out_loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,182 @@ void flb_test_line_format()
flb_destroy(ctx);
}

static void cb_check_drop_single_key_off(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
flb_sds_t out_js = res_data;
char *index_line = "{\\\"key\\\":\\\"value\\\"}";

p = strstr(out_js, index_line);
if (!TEST_CHECK(p != NULL)) {
TEST_MSG("Given:%s", out_js);
}

flb_sds_destroy(out_js);
}

void flb_test_drop_single_key_off()
{
int ret;
int size = sizeof(JSON_BASIC) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1",
"log_level", "error",
NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Loki output */
out_ffd = flb_output(ctx, (char *) "loki", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
"drop_single_key", "off",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_drop_single_key_off,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size);
TEST_CHECK(ret >= 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

static void cb_check_drop_single_key_on(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
flb_sds_t out_js = res_data;
char *index_line = "\\\"value\\\"";

p = strstr(out_js, index_line);
if (!TEST_CHECK(p != NULL)) {
TEST_MSG("Given:%s", out_js);
}

flb_sds_destroy(out_js);
}

void flb_test_drop_single_key_on()
{
int ret;
int size = sizeof(JSON_BASIC) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1",
"log_level", "error",
NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Loki output */
out_ffd = flb_output(ctx, (char *) "loki", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
"drop_single_key", "on",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_drop_single_key_on,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size);
TEST_CHECK(ret >= 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

static void cb_check_drop_single_key_raw(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
flb_sds_t out_js = res_data;
char *index_line = "\"value\"";

p = strstr(out_js, index_line);
if (!TEST_CHECK(p != NULL)) {
TEST_MSG("Given:%s", out_js);
}

flb_sds_destroy(out_js);
}

void flb_test_drop_single_key_raw()
{
int ret;
int size = sizeof(JSON_BASIC) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1",
"log_level", "error",
NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Loki output */
out_ffd = flb_output(ctx, (char *) "loki", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
"drop_single_key", "raw",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_drop_single_key_raw,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size);
TEST_CHECK(ret >= 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

static void cb_check_line_format_remove_keys(void *ctx, int ffd,
int res_ret, void *res_data,
Expand Down Expand Up @@ -611,13 +787,16 @@ void flb_test_float_value()
/* Test list */
TEST_LIST = {
{"remove_keys_remove_map" , flb_test_remove_map},
{"labels_ra" , flb_test_labels_ra },
{"remove_keys" , flb_test_remove_keys },
{"basic" , flb_test_basic },
{"labels" , flb_test_labels },
{"label_keys" , flb_test_label_keys },
{"line_format" , flb_test_line_format },
{"label_map_path" , flb_test_label_map_path},
{"float_value" , flb_test_float_value},
{"labels_ra" , flb_test_labels_ra },
{"remove_keys" , flb_test_remove_keys },
{"basic" , flb_test_basic },
{"labels" , flb_test_labels },
{"label_keys" , flb_test_label_keys },
{"line_format" , flb_test_line_format },
{"drop_single_key_off" , flb_test_drop_single_key_off },
{"drop_single_key_on" , flb_test_drop_single_key_on },
{"drop_single_key_raw" , flb_test_drop_single_key_raw },
{"label_map_path" , flb_test_label_map_path},
{"float_value" , flb_test_float_value},
{NULL, NULL}
};

0 comments on commit 1009138

Please sign in to comment.