Skip to content

Commit

Permalink
parser_json: Handle numeric timestamps
Browse files Browse the repository at this point in the history
The `time_unit_per_second` parser option enables parsing JSON numbers as
timestamps. The same `time_key` is used as for `time_format`.
  • Loading branch information
hvenev committed Aug 11, 2024
1 parent 13db2ee commit f8b31cf
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 95 deletions.
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct flb_parser {
int time_system_timezone; /* use the system timezone as a fallback */
int time_keep; /* keep time field */
int time_strict; /* parse time field strictly */
double time_unit_per_second; /* divisor for numeric time */
int logfmt_no_bare_keys; /* in logfmt parsers, require all keys to have values */
char *time_frac_secs; /* time format have fractional seconds ? */
struct flb_parser_types *types; /* type casting */
Expand Down Expand Up @@ -99,6 +100,7 @@ struct flb_parser *flb_parser_create(const char *name, const char *format,
int time_keep,
int time_strict,
int time_system_timezone,
double time_unit_per_second,
int logfmt_no_bare_keys,
struct flb_parser_types *types,
int types_len,
Expand Down
13 changes: 12 additions & 1 deletion src/flb_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ struct flb_parser *flb_parser_create(const char *name, const char *format,
int time_keep,
int time_strict,
int time_system_timezone,
double time_unit_per_second,
int logfmt_no_bare_keys,
struct flb_parser_types *types,
int types_len,
Expand Down Expand Up @@ -341,6 +342,7 @@ struct flb_parser *flb_parser_create(const char *name, const char *format,

p->time_keep = time_keep;
p->time_strict = time_strict;
p->time_unit_per_second = time_unit_per_second;
p->logfmt_no_bare_keys = logfmt_no_bare_keys;
p->types = types;
p->types_len = types_len;
Expand Down Expand Up @@ -500,6 +502,7 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf,
int time_system_timezone;
int logfmt_no_bare_keys;
int types_len;
double time_unit_per_second;
struct mk_list *head;
struct mk_list *decoders = NULL;
struct flb_cf_section *s;
Expand Down Expand Up @@ -582,6 +585,13 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf,
/* time_offset (UTC offset) */
time_offset = get_parser_key(config, cf, s, "time_offset");

time_unit_per_second = 0;
tmp_str = get_parser_key(config, cf, s, "time_unit_per_second");
if (tmp_str) {
time_unit_per_second = atof(tmp_str);
flb_sds_destroy(tmp_str);
}

/* logfmt_no_bare_keys */
logfmt_no_bare_keys = FLB_FALSE;
tmp_str = get_parser_key(config, cf, s, "logfmt_no_bare_keys");
Expand All @@ -605,7 +615,8 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf,
/* Create the parser context */
if (!flb_parser_create(name, format, regex, skip_empty,
time_fmt, time_key, time_offset, time_keep, time_strict,
time_system_timezone, logfmt_no_bare_keys, types, types_len,
time_system_timezone, time_unit_per_second,
logfmt_no_bare_keys, types, types_len,
decoders, config)) {
goto fconf_error;
}
Expand Down
147 changes: 87 additions & 60 deletions src/flb_parser_json.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,89 @@
*/

#define _GNU_SOURCE
#include <math.h>
#include <stdbool.h>
#include <time.h>

#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_parser_decoder.h>

static bool flb_parser_json_timestamp_str(struct flb_parser *parser,
const char *ptr, size_t len,
struct flb_time *out_time)
{
int ret;
double tmfrac = 0;
struct flb_tm tm = {0};
time_t tmint = 0;

if (!parser->time_fmt) {
return false;
}

/* Lookup time */
ret = flb_parser_time_lookup(ptr, len, 0, parser, &tm, &tmfrac);
if (ret == -1) {
flb_warn("[parser:%s] invalid time format %s for '%.*s'",
parser->name, parser->time_fmt_full, len > 254 ? 254 : (int)len, ptr);
return false;
}

tmint = flb_parser_tm2time(&tm, parser->time_system_timezone);

out_time->tm.tv_sec = tmint;
out_time->tm.tv_nsec = tmfrac * 1000000000;

return true;
}

static bool flb_parser_json_timestamp_f64(struct flb_parser *parser,
double val,
struct flb_time *out_time)
{
double tmfrac = 0;
double tmint = 0;

if (parser->time_unit_per_second <= 0) {
flb_warn("[parser:%s] invalid non-string time", parser->name);
return false;
}

tmfrac = modf(val / parser->time_unit_per_second, &tmint);

out_time->tm.tv_sec = tmint;
out_time->tm.tv_nsec = tmfrac * 1000000000;

return true;
}

int flb_parser_json_do(struct flb_parser *parser,
const char *in_buf, size_t in_size,
void **out_buf, size_t *out_size,
struct flb_time *out_time)
{
int i;
int skip;
int time_index;
int ret;
int slen;
int root_type;
int records;
double tmfrac = 0;
bool time_ok;
char *mp_buf = NULL;
char *time_key;
char *tmp_out_buf = NULL;
char tmp[255];
size_t tmp_out_size = 0;
size_t off = 0;
size_t map_size;
size_t mp_size;
size_t len;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
msgpack_unpacked result;
msgpack_object map;
msgpack_object *k = NULL;
msgpack_object *v = NULL;
time_t time_lookup;
struct flb_tm tm = {0};
struct flb_time *t;
size_t consumed;

consumed = 0;
Expand Down Expand Up @@ -121,7 +167,7 @@ int flb_parser_json_do(struct flb_parser *parser,
}

/* Do time resolution ? */
if (!parser->time_fmt) {
if (!parser->time_fmt && parser->time_unit_per_second <= 0) {
msgpack_unpacked_destroy(&result);

return (int) consumed;
Expand All @@ -137,7 +183,7 @@ int flb_parser_json_do(struct flb_parser *parser,

/* Lookup time field */
map_size = map.via.map.size;
skip = map_size;
time_index = map_size;
for (i = 0; i < map_size; i++) {
k = &map.via.map.ptr[i].key;
v = &map.via.map.ptr[i].val;
Expand All @@ -162,14 +208,8 @@ int flb_parser_json_do(struct flb_parser *parser,
}

if (strncmp(k->via.str.ptr, time_key, k->via.str.size) == 0) {
time_index = i;
/* We found the key, break the loop and keep the index */
if (parser->time_keep == FLB_FALSE) {
skip = i;
break;
}
else {
skip = -1;
}
break;
}

Expand All @@ -185,60 +225,47 @@ int flb_parser_json_do(struct flb_parser *parser,
}

/* Ensure we have an accurate type */
if (v->type != MSGPACK_OBJECT_STR) {
msgpack_unpacked_destroy(&result);
switch(v->type) {
case MSGPACK_OBJECT_STR:
time_ok = flb_parser_json_timestamp_str(parser, v->via.str.ptr, v->via.str.size, out_time);
break;

return (int) consumed;
}
case MSGPACK_OBJECT_FLOAT32:
case MSGPACK_OBJECT_FLOAT64:
time_ok = flb_parser_json_timestamp_f64(parser, v->via.f64, out_time);
break;

/* Lookup time */
ret = flb_parser_time_lookup(v->via.str.ptr, v->via.str.size,
0, parser, &tm, &tmfrac);
if (ret == -1) {
len = v->via.str.size;
if (len > sizeof(tmp) - 1) {
len = sizeof(tmp) - 1;
}
memcpy(tmp, v->via.str.ptr, len);
tmp[len] = '\0';
flb_warn("[parser:%s] invalid time format %s for '%s'",
parser->name, parser->time_fmt_full, tmp);
time_lookup = 0;
skip = map_size;
}
else {
time_lookup = flb_parser_tm2time(&tm, parser->time_system_timezone);
case MSGPACK_OBJECT_POSITIVE_INTEGER:
time_ok = flb_parser_json_timestamp_f64(parser, v->via.u64, out_time);
break;

default:
time_ok = false;
break;
}

/* Compose a new map without the time_key field */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
if (time_ok && parser->time_keep == FLB_FALSE) {
/* Compose a new map without the time_key field */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

if (parser->time_keep == FLB_FALSE && skip < map_size) {
msgpack_pack_map(&mp_pck, map_size - 1);
}
else {
msgpack_pack_map(&mp_pck, map_size);
}

for (i = 0; i < map_size; i++) {
if (i == skip) {
continue;
}

msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key);
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val);
}
for (i = 0; i < map_size; i++) {
if (i == time_index) {
continue;
}

/* Export the proper buffer */
flb_free(tmp_out_buf);
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key);
msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val);
}

*out_buf = mp_sbuf.data;
*out_size = mp_sbuf.size;
/* Export the proper buffer */
flb_free(tmp_out_buf);

t = out_time;
t->tm.tv_sec = time_lookup;
t->tm.tv_nsec = (tmfrac * 1000000000);
*out_buf = mp_sbuf.data;
*out_size = mp_sbuf.size;
}

msgpack_unpacked_destroy(&result);

Expand Down
1 change: 1 addition & 0 deletions src/multiline/flb_ml_parser_cri.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static struct flb_parser *cri_parser_create(struct flb_config *config)
FLB_TRUE, /* time keep */
FLB_FALSE, /* time strict */
FLB_FALSE, /* time system timezone */
0.0, /* numeric time units per second */
FLB_FALSE, /* no bare keys */
NULL, /* parser types */
0, /* types len */
Expand Down
1 change: 1 addition & 0 deletions src/multiline/flb_ml_parser_docker.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static struct flb_parser *docker_parser_create(struct flb_config *config)
FLB_TRUE, /* time keep */
FLB_FALSE, /* time strict */
FLB_FALSE, /* time system timezone */
0.0, /* numeric time units per second */
FLB_FALSE, /* no bare keys */
NULL, /* parser types */
0, /* types len */
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/fuzzers/engine_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ int LLVMFuzzerInitialize(int *argc, char ***argv) {
flb_input_set(ctx, in_ffd, (char *) "A", NULL);

parser = flb_parser_create("timestamp", "regex", "^(?<time>.*)$", FLB_TRUE,
"%s.%L", "time", NULL, MK_FALSE, 0, FLB_FALSE,
"%s.%L", "time", NULL, MK_FALSE, 0, FLB_FALSE, 0.0,
FLB_FALSE, NULL, 0, NULL, ctx->config);
filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
int ret;
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/fuzzers/parse_json_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size){
}

fuzz_parser = flb_parser_create("fuzzer", "json", NULL, FLB_TRUE, NULL,
NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE,
NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, 1.0, FLB_FALSE,
NULL, 0, NULL, fuzz_config);
if (fuzz_parser) {
flb_parser_do(fuzz_parser, (char*)data, size,
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/fuzzers/parse_logfmt_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size){
}
fuzz_parser = flb_parser_create("fuzzer", "logfmt", NULL, FLB_TRUE,
NULL, NULL, NULL, MK_FALSE,
MK_TRUE, FLB_FALSE, FLB_FALSE, NULL, 0,
MK_TRUE, FLB_FALSE, 0.0, FLB_FALSE, NULL, 0,
NULL, fuzz_config);
if (fuzz_parser) {
flb_parser_do(fuzz_parser, (char*)data, size,
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/fuzzers/parse_ltsv_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size){
fuzz_config = flb_config_init();
fuzz_parser = flb_parser_create("fuzzer", "ltsv", NULL, FLB_TRUE,
NULL, NULL, NULL, MK_FALSE,
MK_TRUE, FLB_FALSE, FLB_FALSE, NULL, 0,
MK_TRUE, FLB_FALSE, 0.0, FLB_FALSE, NULL, 0,
NULL, fuzz_config);
flb_parser_do(fuzz_parser, (char*)data, size,
&out_buf, &out_size, &out_time);
Expand Down
7 changes: 6 additions & 1 deletion tests/internal/fuzzers/parser_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size)
struct flb_parser *fuzz_parser = NULL;
int time_keep = 0;
int types_len = 0;
double time_unit_per_second;

/* Set fuzzer-malloc chance of failure */
flb_malloc_mod = 25000;
Expand Down Expand Up @@ -99,6 +100,10 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size)
time_keep = (GET_MOD_EQ(2,1)) ? MK_TRUE : MK_FALSE;
MOVE_INPUT(1);

/* time_unit_per_second */
time_unit_per_second = (GET_MOD_EQ(2,1)) ? 1.0 : 0.0;
MOVE_INPUT(1);

/* types_str */
if (GET_MOD_EQ(2,1)) {
types = flb_malloc(sizeof(struct flb_parser_types) * TYPES_LEN);
Expand Down Expand Up @@ -153,7 +158,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size)

/* now call into the parser */
fuzz_parser = flb_parser_create("fuzzer", format, pregex, FLB_TRUE,
time_fmt, time_key, time_offset, time_keep, 0, FLB_FALSE,
time_fmt, time_key, time_offset, time_keep, 0, FLB_FALSE, time_unit_per_second,
FLB_FALSE, types, types_len, list, fuzz_config);

/* Second step is to use the random parser to parse random input */
Expand Down
Loading

0 comments on commit f8b31cf

Please sign in to comment.