diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 21a7637e172..e2db6f3deb8 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -994,6 +994,18 @@ static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o, return -1; } +static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key) +{ + msgpack_object tmp; + if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) { + *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size); + return 0; + } + + *s = 0; + return -1; +} + static int get_severity_level(severity_t * s, const msgpack_object * o, const flb_sds_t key) { @@ -1006,6 +1018,7 @@ static int get_severity_level(severity_t * s, const msgpack_object * o, return -1; } + static int get_stream(msgpack_object_map map) { int i; @@ -1104,8 +1117,13 @@ static int pack_json_payload(int insert_id_extracted, monitored_resource_key, local_resource_id_key, ctx->labels_key, + ctx->severity_key, + ctx->trace_key, + ctx->log_name_key, stream - /* more special fields are required to be added */ + /* more special fields are required to be added, but, if this grows with more + than a few records, it might need to be converted to flb_hash + */ }; if (insert_id_extracted == FLB_TRUE) { @@ -1138,7 +1156,7 @@ static int pack_json_payload(int insert_id_extracted, * check length of key to avoid partial matching * e.g. labels key = labels && kv->key = labelss */ - if (flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { + if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { to_remove += 1; break; } @@ -1215,7 +1233,7 @@ static int pack_json_payload(int insert_id_extracted, len = kv->key.via.str.size; for (j = 0; j < len_to_be_removed; j++) { removed = to_be_removed[j]; - if (flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { + if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { key_not_found = 0; break; } @@ -1264,6 +1282,7 @@ static int stackdriver_format(struct flb_config *config, char path[PATH_MAX]; char time_formatted[255]; const char *newtag; + const char *new_log_name; msgpack_object *obj; msgpack_object *labels_ptr; msgpack_unpacked result; @@ -1276,6 +1295,16 @@ static int stackdriver_format(struct flb_config *config, int severity_extracted = FLB_FALSE; severity_t severity; + /* Parameters for trace */ + int trace_extracted = FLB_FALSE; + flb_sds_t trace; + char stackdriver_trace[PATH_MAX]; + const char *new_trace; + + /* Parameters for log name */ + int log_name_extracted = FLB_FALSE; + flb_sds_t log_name; + /* Parameters for insertId */ msgpack_object insert_id_obj; insert_id_status in_status; @@ -1581,7 +1610,8 @@ static int stackdriver_format(struct flb_config *config, * "labels": "...", * "logName": "...", * "jsonPayload": {...}, - * "timestamp": "..." + * "timestamp": "...", + * "trace": "..." * } */ entry_size = 3; @@ -1594,6 +1624,21 @@ static int stackdriver_format(struct flb_config *config, entry_size += 1; } + /* Extract trace */ + trace_extracted = FLB_FALSE; + if (ctx->trace_key + && get_string(&trace, obj, ctx->trace_key) == 0) { + trace_extracted = FLB_TRUE; + entry_size += 1; + } + + /* Extract log name */ + log_name_extracted = FLB_FALSE; + if (ctx->log_name_key + && get_string(&log_name, obj, ctx->log_name_key) == 0) { + log_name_extracted = FLB_TRUE; + } + /* Extract insertId */ in_status = validate_insert_id(&insert_id_obj, obj); if (in_status == INSERTID_VALID) { @@ -1668,6 +1713,26 @@ static int stackdriver_format(struct flb_config *config, msgpack_pack_int(&mp_pck, severity); } + /* Add trace into the log entry */ + if (trace_extracted == FLB_TRUE) { + msgpack_pack_str(&mp_pck, 5); + msgpack_pack_str_body(&mp_pck, "trace", 5); + + if (ctx->autoformat_stackdriver_trace) { + len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1, + "projects/%s/traces/%s", ctx->project_id, trace); + new_trace = stackdriver_trace; + } + else { + len = flb_sds_len(trace); + new_trace = trace; + } + + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, new_trace, len); + flb_sds_destroy(trace); + } + /* Add insertId field into the log entry */ if (insert_id_extracted == FLB_TRUE) { msgpack_pack_str(&mp_pck, 8); @@ -1729,9 +1794,21 @@ static int stackdriver_format(struct flb_config *config, newtag = "stderr"; } } + + if (log_name_extracted == FLB_FALSE) { + new_log_name = newtag; + } + else { + new_log_name = log_name; + } + /* logName */ len = snprintf(path, sizeof(path) - 1, - "projects/%s/logs/%s", ctx->project_id, newtag); + "projects/%s/logs/%s", ctx->project_id, new_log_name); + + if (log_name_extracted == FLB_TRUE) { + flb_sds_destroy(log_name); + } msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "logName", 7); diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 9a24828bdcf..32778117e01 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -50,6 +50,9 @@ #define MONITORED_RESOURCE_KEY "logging.googleapis.com/monitored_resource" #define LOCAL_RESOURCE_ID_KEY "logging.googleapis.com/local_resource_id" #define DEFAULT_LABELS_KEY "logging.googleapis.com/labels" +#define DEFAULT_SEVERITY_KEY "logging.googleapis.com/severity" +#define DEFAULT_TRACE_KEY "logging.googleapis.com/trace" +#define DEFAULT_LOG_NAME_KEY "logging.googleapis.com/logName" #define DEFAULT_INSERT_ID_KEY "logging.googleapis.com/insertId" #define SOURCELOCATION_FIELD_IN_JSON "logging.googleapis.com/sourceLocation" #define HTTPREQUEST_FIELD_IN_JSON "logging.googleapis.com/http_request" @@ -108,6 +111,9 @@ struct flb_stackdriver { /* other */ flb_sds_t resource; flb_sds_t severity_key; + flb_sds_t trace_key; + flb_sds_t log_name_key; + bool autoformat_stackdriver_trace; /* oauth2 context */ struct flb_oauth2 *o; diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index 0e50b09286a..57c9262b668 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -268,8 +269,35 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * if (tmp) { ctx->severity_key = flb_sds_create(tmp); } + else { + ctx->severity_key = flb_sds_create(DEFAULT_SEVERITY_KEY); + } + + tmp = flb_output_get_property("autoformat_stackdriver_trace", ins); + if (tmp) { + ctx->autoformat_stackdriver_trace = flb_utils_bool(tmp); + } + else { + ctx->autoformat_stackdriver_trace = FLB_FALSE; + } + + tmp = flb_output_get_property("trace_key", ins); + if (tmp) { + ctx->trace_key = flb_sds_create(tmp); + } + else { + ctx->trace_key = flb_sds_create(DEFAULT_TRACE_KEY); + } + + tmp = flb_output_get_property("log_name_key", ins); + if (tmp) { + ctx->log_name_key = flb_sds_create(tmp); + } + else { + ctx->log_name_key = flb_sds_create(DEFAULT_LOG_NAME_KEY); + } - if (flb_sds_cmp(ctx->resource, "k8s_container", + if (flb_sds_cmp(ctx->resource, "k8s_container", flb_sds_len(ctx->resource)) == 0 || flb_sds_cmp(ctx->resource, "k8s_node", flb_sds_len(ctx->resource)) == 0 || @@ -344,6 +372,8 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) flb_sds_destroy(ctx->token_uri); flb_sds_destroy(ctx->resource); flb_sds_destroy(ctx->severity_key); + flb_sds_destroy(ctx->trace_key); + flb_sds_destroy(ctx->log_name_key); flb_sds_destroy(ctx->labels_key); flb_sds_destroy(ctx->tag_prefix); diff --git a/tests/runtime/data/stackdriver/stackdriver_test_log_name.h b/tests/runtime/data/stackdriver/stackdriver_test_log_name.h new file mode 100644 index 00000000000..4d39e242e46 --- /dev/null +++ b/tests/runtime/data/stackdriver/stackdriver_test_log_name.h @@ -0,0 +1,10 @@ +#define LOG_NAME_OVERRIDE "[" \ + "1591111124," \ + "{" \ + "\"custom_log_name_key\": \"custom_log_name\"" \ + "}]" + +#define LOG_NAME_NO_OVERRIDE "[" \ + "1591111124," \ + "{" \ + "}]" diff --git a/tests/runtime/data/stackdriver/stackdriver_test_trace.h b/tests/runtime/data/stackdriver/stackdriver_test_trace.h new file mode 100644 index 00000000000..15c7bc911ba --- /dev/null +++ b/tests/runtime/data/stackdriver/stackdriver_test_trace.h @@ -0,0 +1,6 @@ +#define TRACE_COMMON_CASE "[" \ + "1591111124," \ + "{" \ + "\"trace\": \"test-trace-id-xyz\"" \ + "}]" + \ No newline at end of file diff --git a/tests/runtime/out_stackdriver.c b/tests/runtime/out_stackdriver.c index f0f4290717c..e3b822dd2cb 100644 --- a/tests/runtime/out_stackdriver.c +++ b/tests/runtime/out_stackdriver.c @@ -34,6 +34,8 @@ #include "data/stackdriver/stackdriver_test_operation.h" #include "data/stackdriver/stackdriver_test_k8s_resource.h" #include "data/stackdriver/stackdriver_test_labels.h" +#include "data/stackdriver/stackdriver_test_trace.h" +#include "data/stackdriver/stackdriver_test_log_name.h" #include "data/stackdriver/stackdriver_test_insert_id.h" #include "data/stackdriver/stackdriver_test_source_location.h" #include "data/stackdriver/stackdriver_test_http_request.h" @@ -570,6 +572,76 @@ static void cb_check_k8s_container_resource_default_regex(void *ctx, int ffd, flb_sds_destroy(res_data); } +static void cb_check_trace_no_autoformat(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + /* trace in the entries */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['trace']", "test-trace-id-xyz"); + TEST_CHECK(ret == FLB_TRUE); + + /* trace has been removed from jsonPayload */ + ret = mp_kv_exists(res_data, res_size, "$entries[0]['jsonPayload']['trace']"); + TEST_CHECK(ret == FLB_FALSE); + + flb_sds_destroy(res_data); +} + +static void cb_check_trace_stackdriver_autoformat(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + /* trace in the entries */ + ret = mp_kv_cmp( + res_data, + res_size, + "$entries[0]['trace']", + "projects/fluent-bit-test/traces/test-trace-id-xyz"); + TEST_CHECK(ret == FLB_TRUE); + + /* trace has been removed from jsonPayload */ + ret = mp_kv_exists(res_data, res_size, "$entries[0]['jsonPayload']['trace']"); + TEST_CHECK(ret == FLB_FALSE); + + flb_sds_destroy(res_data); +} + +static void cb_check_log_name_override(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + /* logName in the entries is created using the value under log_name_key */ + ret = mp_kv_cmp( + res_data, res_size, "$entries[0]['logName']", "projects/fluent-bit-test/logs/custom_log_name"); + TEST_CHECK(ret == FLB_TRUE); + + /* log_name_key has been removed from jsonPayload */ + ret = mp_kv_exists(res_data, res_size, "$entries[0]['jsonPayload']['custom_log_name_key']"); + TEST_CHECK(ret == FLB_FALSE); + + flb_sds_destroy(res_data); +} + +static void cb_check_log_name_no_override(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + /* logName in the entries is created using the tag */ + ret = mp_kv_cmp( + res_data, res_size, "$entries[0]['logName']", "projects/fluent-bit-test/logs/test"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + static void cb_check_k8s_node_resource(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) @@ -1040,12 +1112,20 @@ static void cb_check_multi_entries_severity(void *ctx, int ffd, ret = mp_kv_cmp(res_data, res_size, "$entries[0]['severity']", "INFO"); TEST_CHECK(ret == FLB_TRUE); + // verifies that severity is removed from jsonPayload + ret = mp_kv_exists(res_data, res_size, "$entries[0]['jsonPayload']['severity']"); + TEST_CHECK(ret == FLB_FALSE); + ret = mp_kv_exists(res_data, res_size, "$entries[1]['severity']"); TEST_CHECK(ret == FLB_FALSE); ret = mp_kv_cmp(res_data, res_size, "$entries[2]['severity']", "DEBUG"); TEST_CHECK(ret == FLB_TRUE); + // verifies that severity is removed from jsonPayload + ret = mp_kv_exists(res_data, res_size, "$entries[2]['jsonPayload']['severity']"); + TEST_CHECK(ret == FLB_FALSE); + ret = mp_kv_exists(res_data, res_size, "$entries[3]['severity']"); TEST_CHECK(ret == FLB_FALSE); @@ -1854,6 +1934,171 @@ void flb_test_resource_global() flb_destroy(ctx); } +void flb_test_trace_no_autoformat() +{ + int ret; + int size = sizeof(TRACE_COMMON_CASE) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "trace_key", "trace", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_trace_no_autoformat, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) TRACE_COMMON_CASE, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_trace_stackdriver_autoformat() +{ + int ret; + int size = sizeof(TRACE_COMMON_CASE) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "trace_key", "trace", + "autoformat_stackdriver_trace", "true", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_trace_stackdriver_autoformat, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) TRACE_COMMON_CASE, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_log_name_override() +{ + int ret; + int size = sizeof(LOG_NAME_OVERRIDE) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "log_name_key", "custom_log_name_key", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_log_name_override, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) LOG_NAME_OVERRIDE, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_log_name_no_override() +{ + int ret; + int size = sizeof(LOG_NAME_NO_OVERRIDE) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "log_name_key", "custom_log_name_key", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_log_name_no_override, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) LOG_NAME_NO_OVERRIDE, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + void flb_test_resource_global_custom_prefix() { /* configuring tag_prefix for non-k8s resource type should have no effect at all */ @@ -3925,6 +4170,14 @@ TEST_LIST = { {"resource_global_custom_prefix", flb_test_resource_global_custom_prefix }, {"resource_gce_instance", flb_test_resource_gce_instance }, + /* test trace */ + {"trace_no_autoformat", flb_test_trace_no_autoformat}, + {"trace_stackdriver_autoformat", flb_test_trace_stackdriver_autoformat}, + + /* test log name */ + {"log_name_override", flb_test_log_name_override}, + {"log_name_no_override", flb_test_log_name_no_override}, + /* test insertId */ {"insertId_common_case", flb_test_insert_id_common_case}, {"empty_insertId", flb_test_empty_insert_id},