diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index 0810ed000d3..0880bdcf677 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -51,6 +51,7 @@ struct flb_parser { int time_system_timezone; /* use the system timezone as a fallback */ int time_keep; /* keep time field */ int time_strict; /* parse time field strictly */ + double time_numeric_unit; /* divisor for numeric time, or <= 0 if not enabled */ int logfmt_no_bare_keys; /* in logfmt parsers, require all keys to have values */ char *time_frac_secs; /* time format have fractional seconds ? */ struct flb_parser_types *types; /* type casting */ diff --git a/src/flb_parser.c b/src/flb_parser.c index 8df7f56dfb5..75d0ee01d55 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -145,6 +145,26 @@ static void flb_interim_parser_destroy(struct flb_parser *parser) flb_free(parser); } +static double flb_parser_time_numeric_unit(const char *time_fmt) +{ + if (!time_fmt) { + return 0.0; + } + if (!strcmp(time_fmt, "SECONDS")) { + return 1.0; + } + if (!strcmp(time_fmt, "MILLISECONDS")) { + return 1000.0; + } + if (!strcmp(time_fmt, "MICROSECONDS")) { + return 1000000.0; + } + if (!strcmp(time_fmt, "NANOSECONDS")) { + return 1000000000.0; + } + return 0.0; +} + struct flb_parser *flb_parser_create(const char *name, const char *format, const char *p_regex, int skip_empty, @@ -231,6 +251,12 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, p->name = flb_strdup(name); + p->time_numeric_unit = flb_parser_time_numeric_unit(time_fmt); + if (p->time_numeric_unit > 0) { + /* Don't try to use the fixed string (SECONDS/...) as a format */ + time_fmt = NULL; + } + if (time_fmt) { p->time_fmt_full = flb_strdup(time_fmt); if (!p->time_fmt_full) { diff --git a/src/flb_parser_json.c b/src/flb_parser_json.c index 1693f766606..1bf1bb36137 100644 --- a/src/flb_parser_json.c +++ b/src/flb_parser_json.c @@ -18,6 +18,8 @@ */ #define _GNU_SOURCE +#include +#include #include #include @@ -25,36 +27,80 @@ #include #include +static bool flb_parser_json_timestamp_str(struct flb_parser *parser, + const char *ptr, size_t len, + struct flb_time *out_time) +{ + int ret; + double tmfrac = 0; + struct flb_tm tm = {0}; + time_t tmint = 0; + + if (!parser->time_fmt) { + return false; + } + + /* Lookup time */ + ret = flb_parser_time_lookup(ptr, len, 0, parser, &tm, &tmfrac); + if (ret == -1) { + flb_warn("[parser:%s] invalid time format %s for '%.*s'", + parser->name, parser->time_fmt_full, len > 254 ? 254 : (int)len, ptr); + return false; + } + + tmint = flb_parser_tm2time(&tm, parser->time_system_timezone); + + out_time->tm.tv_sec = tmint; + out_time->tm.tv_nsec = tmfrac * 1000000000; + + return true; +} + +static bool flb_parser_json_timestamp_f64(struct flb_parser *parser, + double val, + struct flb_time *out_time) +{ + double tmfrac = 0; + double tmint = 0; + + if (parser->time_numeric_unit <= 0) { + flb_warn("[parser:%s] invalid non-string time", parser->name); + return false; + } + + tmfrac = modf(val / parser->time_numeric_unit, &tmint); + + out_time->tm.tv_sec = tmint; + out_time->tm.tv_nsec = tmfrac * 1000000000; + + return true; +} + int flb_parser_json_do(struct flb_parser *parser, const char *in_buf, size_t in_size, void **out_buf, size_t *out_size, struct flb_time *out_time) { int i; - int skip; + int time_index; int ret; int slen; int root_type; int records; - double tmfrac = 0; + bool time_ok; char *mp_buf = NULL; char *time_key; char *tmp_out_buf = NULL; - char tmp[255]; size_t tmp_out_size = 0; size_t off = 0; size_t map_size; size_t mp_size; - size_t len; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; msgpack_unpacked result; msgpack_object map; msgpack_object *k = NULL; msgpack_object *v = NULL; - time_t time_lookup; - struct flb_tm tm = {0}; - struct flb_time *t; size_t consumed; consumed = 0; @@ -121,7 +167,7 @@ int flb_parser_json_do(struct flb_parser *parser, } /* Do time resolution ? */ - if (!parser->time_fmt) { + if (!parser->time_fmt && parser->time_numeric_unit <= 0) { msgpack_unpacked_destroy(&result); return (int) consumed; @@ -137,7 +183,7 @@ int flb_parser_json_do(struct flb_parser *parser, /* Lookup time field */ map_size = map.via.map.size; - skip = map_size; + time_index = map_size; for (i = 0; i < map_size; i++) { k = &map.via.map.ptr[i].key; v = &map.via.map.ptr[i].val; @@ -162,14 +208,8 @@ int flb_parser_json_do(struct flb_parser *parser, } if (strncmp(k->via.str.ptr, time_key, k->via.str.size) == 0) { + time_index = i; /* We found the key, break the loop and keep the index */ - if (parser->time_keep == FLB_FALSE) { - skip = i; - break; - } - else { - skip = -1; - } break; } @@ -185,60 +225,47 @@ int flb_parser_json_do(struct flb_parser *parser, } /* Ensure we have an accurate type */ - if (v->type != MSGPACK_OBJECT_STR) { - msgpack_unpacked_destroy(&result); + switch(v->type) { + case MSGPACK_OBJECT_STR: + time_ok = flb_parser_json_timestamp_str(parser, v->via.str.ptr, v->via.str.size, out_time); + break; - return (int) consumed; - } + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: + time_ok = flb_parser_json_timestamp_f64(parser, v->via.f64, out_time); + break; - /* Lookup time */ - ret = flb_parser_time_lookup(v->via.str.ptr, v->via.str.size, - 0, parser, &tm, &tmfrac); - if (ret == -1) { - len = v->via.str.size; - if (len > sizeof(tmp) - 1) { - len = sizeof(tmp) - 1; - } - memcpy(tmp, v->via.str.ptr, len); - tmp[len] = '\0'; - flb_warn("[parser:%s] invalid time format %s for '%s'", - parser->name, parser->time_fmt_full, tmp); - time_lookup = 0; - skip = map_size; - } - else { - time_lookup = flb_parser_tm2time(&tm, parser->time_system_timezone); + case MSGPACK_OBJECT_POSITIVE_INTEGER: + time_ok = flb_parser_json_timestamp_f64(parser, v->via.u64, out_time); + break; + + default: + time_ok = false; + break; } - /* Compose a new map without the time_key field */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + if (time_ok && parser->time_keep == FLB_FALSE) { + /* Compose a new map without the time_key field */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - if (parser->time_keep == FLB_FALSE && skip < map_size) { msgpack_pack_map(&mp_pck, map_size - 1); - } - else { - msgpack_pack_map(&mp_pck, map_size); - } - for (i = 0; i < map_size; i++) { - if (i == skip) { - continue; - } - - msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key); - msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val); - } + for (i = 0; i < map_size; i++) { + if (i == time_index) { + continue; + } - /* Export the proper buffer */ - flb_free(tmp_out_buf); + msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key); + msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val); + } - *out_buf = mp_sbuf.data; - *out_size = mp_sbuf.size; + /* Export the proper buffer */ + flb_free(tmp_out_buf); - t = out_time; - t->tm.tv_sec = time_lookup; - t->tm.tv_nsec = (tmfrac * 1000000000); + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + } msgpack_unpacked_destroy(&result); diff --git a/tests/internal/parser_json.c b/tests/internal/parser_json.c index f09f9675794..ce97d84dac7 100644 --- a/tests/internal/parser_json.c +++ b/tests/internal/parser_json.c @@ -319,6 +319,67 @@ void test_time_keep() flb_config_exit(config); } +void test_time_numeric() +{ + struct flb_parser *parser = NULL; + struct flb_config *config = NULL; + int ret = 0; + char *input = "{\"str\":\"text\", \"int\":100, \"double\":1.23, \"bool\":true, \"time\":422500}"; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_time out_time; + char *expected_strs[] = {"str", "text", "int", "100", "double","1.23", "bool", "true"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + out_time.tm.tv_sec = 0; + out_time.tm.tv_nsec = 0; + + + config = flb_config_init(); + if(!TEST_CHECK(config != NULL)) { + TEST_MSG("flb_config_init failed"); + exit(1); + } + + parser = flb_parser_create("json", "json", NULL, FLB_FALSE, "MILLISECONDS", "time", NULL, + FLB_FALSE /*time_keep */, FLB_FALSE, FLB_FALSE, FLB_FALSE, + NULL, 0, NULL, config); + if (!TEST_CHECK(parser != NULL)) { + TEST_MSG("flb_parser_create failed"); + flb_config_exit(config); + exit(1); + } + + ret = flb_parser_do(parser, input, strlen(input), &out_buf, &out_size, &out_time); + if (!TEST_CHECK(ret != -1)) { + TEST_MSG("flb_parser_do failed"); + flb_parser_destroy(parser); + flb_config_exit(config); + exit(1); + } + + ret = compare_msgpack(out_buf, out_size, &expected); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("compare failed"); + flb_free(out_buf); + flb_parser_destroy(parser); + flb_config_exit(config); + exit(1); + } + + if (!TEST_CHECK(out_time.tm.tv_sec == 422 && out_time.tm.tv_nsec == 500000000)) { + TEST_MSG("timestamp error. sec Got=%ld Expect=422", out_time.tm.tv_sec); + TEST_MSG("timestamp error. nsec Got=%ld Expect=500000000", out_time.tm.tv_nsec); + } + + flb_free(out_buf); + flb_parser_destroy(parser); + flb_config_exit(config); +} + /* * JSON parser doesn't support 'types' option. * This test is to check that 'types' doesn't affect output. @@ -540,6 +601,7 @@ TEST_LIST = { { "basic", test_basic}, { "time_key", test_time_key}, { "time_keep", test_time_keep}, + { "time_numeric", test_time_numeric}, { "types_is_not_supported", test_types_is_not_supported}, { "decode_field_json", test_decode_field_json}, { "time_key_kept_if_parse_fails", test_time_key_kept_if_parse_fails},