diff --git a/include/fluent-bit/flb_metrics.h b/include/fluent-bit/flb_metrics.h index d40c223bf6d..ccd4694ebb1 100644 --- a/include/fluent-bit/flb_metrics.h +++ b/include/fluent-bit/flb_metrics.h @@ -38,6 +38,7 @@ #include #include #include +#include /* Metrics IDs for general purpose (used by core and Plugins */ #define FLB_METRIC_N_RECORDS 0 diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 41919f4eb3b..82039f9a6d6 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -783,13 +784,9 @@ int pack_emf_payload(struct flb_cloudwatch *ctx, return 0; } -/* - * Main routine- processes msgpack and sends in batches which ignore the empty ones - * return value is the number of events processed and send. - */ -int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, - struct cw_flush *buf, flb_sds_t tag, - const char *data, size_t bytes) +static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plugin, + struct cw_flush *buf, flb_sds_t tag, + const char *data, size_t bytes) { int i = 0; size_t map_size; @@ -834,7 +831,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, if (strncmp(input_plugin, "cpu", 3) == 0) { intermediate_metric_type = GAUGE; intermediate_metric_unit = PERCENT; - } + } else if (strncmp(input_plugin, "mem", 3) == 0) { intermediate_metric_type = GAUGE; intermediate_metric_unit = BYTES; @@ -901,7 +898,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, continue; } - if (strncmp(input_plugin, "cpu", 3) == 0 + if (strncmp(input_plugin, "cpu", 3) == 0 || strncmp(input_plugin, "mem", 3) == 0) { /* Added for EMF support: Construct a list */ struct mk_list flb_intermediate_metrics; @@ -909,8 +906,8 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, kv = map.via.map.ptr; - /* - * Iterate through the record map, extract intermediate metric data, + /* + * Iterate through the record map, extract intermediate metric data, * and add to the list. */ for (i = 0; i < map_size; i++) { @@ -926,25 +923,25 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, metric->timestamp = log_event.timestamp; mk_list_add(&metric->_head, &flb_intermediate_metrics); - - } + + } /* The msgpack object is only valid during the lifetime of the * sbuffer & the unpacked result. - */ + */ msgpack_sbuffer_init(&mp_sbuf); msgpack_unpacked_init(&mp_emf_result); ret = pack_emf_payload(ctx, - &flb_intermediate_metrics, - input_plugin, - log_event.timestamp, - &mp_sbuf, - &mp_emf_result, - &emf_payload); - + &flb_intermediate_metrics, + input_plugin, + log_event.timestamp, + &mp_sbuf, + &mp_emf_result, + &emf_payload); + /* free the intermediate metric list */ - + mk_list_foreach_safe(head, tmp, &flb_intermediate_metrics) { an_item = mk_list_entry(head, struct flb_intermediate_metric, _head); mk_list_del(&an_item->_head); @@ -978,6 +975,100 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, } flb_log_event_decoder_destroy(&log_decoder); + return i; + +error: + flb_log_event_decoder_destroy(&log_decoder); + + return -1; +} + + +static int process_metric_events(struct flb_cloudwatch *ctx, const char *input_plugin, + struct cw_flush *buf, flb_sds_t tag, + const char *data, size_t bytes) +{ + int i = 0; + int ret; + msgpack_object map; + msgpack_unpacked mp_emf_result; + + struct log_stream *stream; + + size_t off = 0; + struct cmt *cmt; + char *mp_buf = NULL; + size_t mp_size = 0; + size_t mp_off = 0; + struct flb_time tm; + + while ((ret = cmt_decode_msgpack_create(&cmt, + data, + bytes, &off)) == CMT_DECODE_MSGPACK_SUCCESS) { + ret = cmt_encode_cloudwatch_emf_create(cmt, &mp_buf, &mp_size, CMT_FALSE); + if (ret < 0) { + goto cmt_error; + } + + msgpack_unpacked_init(&mp_emf_result); + while (msgpack_unpack_next(&mp_emf_result, mp_buf, mp_size, &mp_off) == MSGPACK_UNPACK_SUCCESS) { + map = mp_emf_result.data; + if (map.type != MSGPACK_OBJECT_MAP) { + continue; + } + + stream = get_log_stream(ctx, tag, map); + if (!stream) { + flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); + goto cmt_error; + } + + flb_time_get(&tm); + ret = add_event(ctx, buf, stream, &map, + &tm); + + if (ret < 0 ) { + goto cmt_error; + } + + if (ret == 0) { + i++; + } + } + cmt_encode_cloudwatch_emf_destroy(mp_buf); + msgpack_unpacked_destroy(&mp_emf_result); + cmt_destroy(cmt); + } + + return i; + +cmt_error: + cmt_destroy(cmt); + + return -1; +} + +/* + * Main routine- processes msgpack and sends in batches which ignore the empty ones + * return value is the number of events processed and send. + */ +int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, + struct cw_flush *buf, flb_sds_t tag, + const char *data, size_t bytes, int event_type) +{ + int ret; + int i = 0; + + if (event_type == FLB_EVENT_TYPE_LOGS) { + i = process_log_events(ctx, input_plugin, + buf, tag, + data, bytes); + } + else if (event_type == FLB_EVENT_TYPE_METRICS) { + i = process_metric_events(ctx, input_plugin, + buf, tag, + data, bytes); + } /* send any remaining events */ ret = send_log_events(ctx, buf); reset_flush_buf(ctx, buf); @@ -987,11 +1078,6 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, /* return number of events */ return i; - -error: - flb_log_event_decoder_destroy(&log_decoder); - - return -1; } struct log_stream *get_or_create_log_stream(struct flb_cloudwatch *ctx, diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index 75a579e764f..05abfff30a1 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -42,9 +42,9 @@ void cw_flush_destroy(struct cw_flush *buf); -int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, +int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, struct cw_flush *buf, flb_sds_t tag, - const char *data, size_t bytes); + const char *data, size_t bytes, int event_type); int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry); struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag, const msgpack_object map); diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index d2b285e768e..4e850800c35 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -414,7 +414,8 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size); + event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size, + event_chunk->type); if (event_count < 0) { flb_plg_error(ctx->ins, "Failed to send events"); cw_flush_destroy(buf); @@ -477,6 +478,10 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx) flb_sds_destroy(ctx->stream_name); } + if (ctx->metric_namespace) { + flb_sds_destroy(ctx->metric_namespace); + } + mk_list_foreach_safe(head, tmp, &ctx->streams) { stream = mk_list_entry(head, struct log_stream, _head); mk_list_del(&stream->_head); @@ -660,6 +665,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = { .cb_exit = cb_cloudwatch_exit, .flags = 0, .workers = 1, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, /* Configuration */ .config_map = config_map, diff --git a/tests/runtime/out_cloudwatch.c b/tests/runtime/out_cloudwatch.c index fc54e4c380b..35d785f454c 100644 --- a/tests/runtime/out_cloudwatch.c +++ b/tests/runtime/out_cloudwatch.c @@ -46,6 +46,52 @@ void flb_test_cloudwatch_success(void) flb_destroy(ctx); } +/* It writes a json/emf formatted metrics */ +void flb_test_cloudwatch_success_with_metrics(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "1", + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "test", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"log_format", "json_emf", NULL); + flb_output_set(ctx, out_ffd,"log_group_name", "fluent-health", NULL); + flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-cmetrics-", NULL); + flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL); + flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + void flb_test_cloudwatch_already_exists_create_group(void) { int ret; @@ -350,6 +396,7 @@ void flb_test_cloudwatch_error_put_retention_policy(void) /* Test list */ TEST_LIST = { {"success", flb_test_cloudwatch_success }, + {"success_with_metrics", flb_test_cloudwatch_success_with_metrics}, {"group_already_exists", flb_test_cloudwatch_already_exists_create_group }, {"stream_already_exists", flb_test_cloudwatch_already_exists_create_stream }, {"create_group_error", flb_test_cloudwatch_error_create_group },