Skip to content

Commit

Permalink
out_cloudwatch_logs: Support EMF format for metrics type of events (#…
Browse files Browse the repository at this point in the history
…8448)


---------

Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 authored Feb 25, 2024
1 parent 1f43c5d commit 6ffc3da
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 30 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <cmetrics/cmt_encode_prometheus_remote_write.h>
#include <cmetrics/cmt_encode_msgpack.h>
#include <cmetrics/cmt_encode_splunk_hec.h>
#include <cmetrics/cmt_encode_cloudwatch_emf.h>

/* Metrics IDs for general purpose (used by core and Plugins */
#define FLB_METRIC_N_RECORDS 0
Expand Down
140 changes: 113 additions & 27 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_intermediate_metric.h>
#include <fluent-bit/flb_metrics.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand Down Expand Up @@ -783,13 +784,9 @@ int pack_emf_payload(struct flb_cloudwatch *ctx,
return 0;
}

/*
* Main routine- processes msgpack and sends in batches which ignore the empty ones
* return value is the number of events processed and send.
*/
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes)
static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes)
{
int i = 0;
size_t map_size;
Expand Down Expand Up @@ -834,7 +831,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
if (strncmp(input_plugin, "cpu", 3) == 0) {
intermediate_metric_type = GAUGE;
intermediate_metric_unit = PERCENT;
}
}
else if (strncmp(input_plugin, "mem", 3) == 0) {
intermediate_metric_type = GAUGE;
intermediate_metric_unit = BYTES;
Expand Down Expand Up @@ -901,16 +898,16 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
continue;
}

if (strncmp(input_plugin, "cpu", 3) == 0
if (strncmp(input_plugin, "cpu", 3) == 0
|| strncmp(input_plugin, "mem", 3) == 0) {
/* Added for EMF support: Construct a list */
struct mk_list flb_intermediate_metrics;
mk_list_init(&flb_intermediate_metrics);

kv = map.via.map.ptr;

/*
* Iterate through the record map, extract intermediate metric data,
/*
* Iterate through the record map, extract intermediate metric data,
* and add to the list.
*/
for (i = 0; i < map_size; i++) {
Expand All @@ -926,25 +923,25 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
metric->timestamp = log_event.timestamp;

mk_list_add(&metric->_head, &flb_intermediate_metrics);
}

}

/* The msgpack object is only valid during the lifetime of the
* sbuffer & the unpacked result.
*/
*/
msgpack_sbuffer_init(&mp_sbuf);
msgpack_unpacked_init(&mp_emf_result);

ret = pack_emf_payload(ctx,
&flb_intermediate_metrics,
input_plugin,
log_event.timestamp,
&mp_sbuf,
&mp_emf_result,
&emf_payload);
&flb_intermediate_metrics,
input_plugin,
log_event.timestamp,
&mp_sbuf,
&mp_emf_result,
&emf_payload);

/* free the intermediate metric list */

mk_list_foreach_safe(head, tmp, &flb_intermediate_metrics) {
an_item = mk_list_entry(head, struct flb_intermediate_metric, _head);
mk_list_del(&an_item->_head);
Expand Down Expand Up @@ -978,6 +975,100 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
}
flb_log_event_decoder_destroy(&log_decoder);

return i;

error:
flb_log_event_decoder_destroy(&log_decoder);

return -1;
}


static int process_metric_events(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes)
{
int i = 0;
int ret;
msgpack_object map;
msgpack_unpacked mp_emf_result;

struct log_stream *stream;

size_t off = 0;
struct cmt *cmt;
char *mp_buf = NULL;
size_t mp_size = 0;
size_t mp_off = 0;
struct flb_time tm;

while ((ret = cmt_decode_msgpack_create(&cmt,
data,
bytes, &off)) == CMT_DECODE_MSGPACK_SUCCESS) {
ret = cmt_encode_cloudwatch_emf_create(cmt, &mp_buf, &mp_size, CMT_FALSE);
if (ret < 0) {
goto cmt_error;
}

msgpack_unpacked_init(&mp_emf_result);
while (msgpack_unpack_next(&mp_emf_result, mp_buf, mp_size, &mp_off) == MSGPACK_UNPACK_SUCCESS) {
map = mp_emf_result.data;
if (map.type != MSGPACK_OBJECT_MAP) {
continue;
}

stream = get_log_stream(ctx, tag, map);
if (!stream) {
flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag);
goto cmt_error;
}

flb_time_get(&tm);
ret = add_event(ctx, buf, stream, &map,
&tm);

if (ret < 0 ) {
goto cmt_error;
}

if (ret == 0) {
i++;
}
}
cmt_encode_cloudwatch_emf_destroy(mp_buf);
msgpack_unpacked_destroy(&mp_emf_result);
cmt_destroy(cmt);
}

return i;

cmt_error:
cmt_destroy(cmt);

return -1;
}

/*
* Main routine- processes msgpack and sends in batches which ignore the empty ones
* return value is the number of events processed and send.
*/
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes, int event_type)
{
int ret;
int i = 0;

if (event_type == FLB_EVENT_TYPE_LOGS) {
i = process_log_events(ctx, input_plugin,
buf, tag,
data, bytes);
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
i = process_metric_events(ctx, input_plugin,
buf, tag,
data, bytes);
}
/* send any remaining events */
ret = send_log_events(ctx, buf);
reset_flush_buf(ctx, buf);
Expand All @@ -987,11 +1078,6 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,

/* return number of events */
return i;

error:
flb_log_event_decoder_destroy(&log_decoder);

return -1;
}

struct log_stream *get_or_create_log_stream(struct flb_cloudwatch *ctx,
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_cloudwatch_logs/cloudwatch_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@

void cw_flush_destroy(struct cw_flush *buf);

int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes);
const char *data, size_t bytes, int event_type);
int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry);
struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag,
const msgpack_object map);
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size,
event_chunk->type);
if (event_count < 0) {
flb_plg_error(ctx->ins, "Failed to send events");
cw_flush_destroy(buf);
Expand Down Expand Up @@ -477,6 +478,10 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
flb_sds_destroy(ctx->stream_name);
}

if (ctx->metric_namespace) {
flb_sds_destroy(ctx->metric_namespace);
}

mk_list_foreach_safe(head, tmp, &ctx->streams) {
stream = mk_list_entry(head, struct log_stream, _head);
mk_list_del(&stream->_head);
Expand Down Expand Up @@ -660,6 +665,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
.cb_exit = cb_cloudwatch_exit,
.flags = 0,
.workers = 1,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,

/* Configuration */
.config_map = config_map,
Expand Down
47 changes: 47 additions & 0 deletions tests/runtime/out_cloudwatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,52 @@ void flb_test_cloudwatch_success(void)
flb_destroy(ctx);
}

/* It writes a json/emf formatted metrics */
void flb_test_cloudwatch_success_with_metrics(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();
flb_service_set(ctx,
"Flush", "0.200000000",
"Grace", "1",
NULL);

/* Input */
in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL);
TEST_CHECK(in_ffd >= 0);
ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL);
TEST_CHECK(ret == 0);
ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL);
TEST_CHECK(ret == 0);
ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL);
TEST_CHECK(ret == 0);

out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,"match", "test", NULL);
flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd,"log_format", "json_emf", NULL);
flb_output_set(ctx, out_ffd,"log_group_name", "fluent-health", NULL);
flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-cmetrics-", NULL);
flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL);
flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_cloudwatch_already_exists_create_group(void)
{
int ret;
Expand Down Expand Up @@ -350,6 +396,7 @@ void flb_test_cloudwatch_error_put_retention_policy(void)
/* Test list */
TEST_LIST = {
{"success", flb_test_cloudwatch_success },
{"success_with_metrics", flb_test_cloudwatch_success_with_metrics},
{"group_already_exists", flb_test_cloudwatch_already_exists_create_group },
{"stream_already_exists", flb_test_cloudwatch_already_exists_create_stream },
{"create_group_error", flb_test_cloudwatch_error_create_group },
Expand Down

0 comments on commit 6ffc3da

Please sign in to comment.