From e1278ca6a623272d0c96c36535ac6676c05a7d8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Warzy=C5=84ski?= Date: Tue, 26 Mar 2024 02:06:43 +0100 Subject: [PATCH 01/24] filter: aws use '== FLB_TRUE' for if bool conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mateusz WarzyƄski Signed-off-by: Markus Bergholz --- plugins/filter_aws/aws.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/filter_aws/aws.c b/plugins/filter_aws/aws.c index acb5b7d7af2..afe94a8a466 100644 --- a/plugins/filter_aws/aws.c +++ b/plugins/filter_aws/aws.c @@ -921,7 +921,7 @@ static int get_ec2_metadata(struct flb_filter_aws *ctx) metadata_fetched = FLB_FALSE; } - if (metadata_fetched) { + if (metadata_fetched == FLB_TRUE) { ctx->metadata_retrieved = FLB_TRUE; } From b9869a7465f466e8198aed6a4490033a7347807e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 10 May 2024 13:43:56 +0300 Subject: [PATCH 02/24] workflows: bump ossf/scorecard-action from 2.3.1 to 2.3.3 (#8811) Bumps [ossf/scorecard-action](https://github.com/ossf/scorecard-action) from 2.3.1 to 2.3.3. - [Release notes](https://github.com/ossf/scorecard-action/releases) - [Changelog](https://github.com/ossf/scorecard-action/blob/main/RELEASE.md) - [Commits](https://github.com/ossf/scorecard-action/compare/0864cf19026789058feabb7e87baa5f140aac736...dc50aa9510b46c811795eb24b2f1ba02a914e534) --- updated-dependencies: - dependency-name: ossf/scorecard-action dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: Markus Bergholz --- .github/workflows/cron-scorecards-analysis.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cron-scorecards-analysis.yaml b/.github/workflows/cron-scorecards-analysis.yaml index 5e234f2dcfa..20b31d8b112 100644 --- a/.github/workflows/cron-scorecards-analysis.yaml +++ b/.github/workflows/cron-scorecards-analysis.yaml @@ -29,7 +29,7 @@ jobs: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@0864cf19026789058feabb7e87baa5f140aac736 + uses: ossf/scorecard-action@dc50aa9510b46c811795eb24b2f1ba02a914e534 with: results_file: scorecard-results.sarif results_format: sarif From 33cd3a8101eee5e68d90a7f27e8f0a63653dbb6e Mon Sep 17 00:00:00 2001 From: Braydon Kains <93549768+braydonk@users.noreply.github.com> Date: Tue, 14 May 2024 09:21:01 -0400 Subject: [PATCH 03/24] config: fix error message grammar (#8818) Signed-off-by: braydonk Signed-off-by: Markus Bergholz --- src/flb_config.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flb_config.c b/src/flb_config.c index 54249fa8d33..4afaf71c946 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -777,7 +777,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, /* validate the instance creation */ if (!ins) { flb_error("[config] section '%s' tried to instance a plugin name " - "that don't exists", name); + "that doesn't exist", name); flb_sds_destroy(name); return -1; } From 6f3ba2807cab736602751fd9eaa269abb31909fd Mon Sep 17 00:00:00 2001 From: Michael Meunier Date: Fri, 3 May 2024 11:20:44 +0200 Subject: [PATCH 04/24] out_kafka: increase max kafka dynamic topic length to 249 characters This commit increases the max length of kafka dynamic topics. As the limit previously set (64) seemed arbitrary, and kafka allows for a maximum of 249 characters per topic, I think fluentbit should reflect that limit. Signed-off-by: Michael Meunier Signed-off-by: Markus Bergholz --- plugins/out_kafka/kafka.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 9379da84724..8dc8cb86ea8 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -227,9 +227,9 @@ int produce_message(struct flb_time *tm, msgpack_object *map, flb_warn("',' not allowed in dynamic_kafka topic names"); continue; } - if (val.via.str.size > 64) { - /* Don't allow length of dynamic kafka topics > 64 */ - flb_warn(" dynamic kafka topic length > 64 not allowed"); + if (val.via.str.size > 249) { + /* Don't allow length of dynamic kafka topics > 249 */ + flb_warn(" dynamic kafka topic length > 249 not allowed"); continue; } dynamic_topic = flb_malloc(val.via.str.size + 1); From 50ec01214e0bebe0ae0ee61131213bb73d9ba69e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 14 May 2024 22:10:00 +0900 Subject: [PATCH 05/24] processor_labels: Remove a needless existence check for insert operation Insert implies that adding new key-value pairs of labels. However, we did put the existence check for the keys of labels. This shouldn't be effective for such operation. Signed-off-by: Hiroshi Hatake Signed-off-by: Markus Bergholz --- plugins/processor_labels/labels.c | 33 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index 62e5323fd43..ee34e639f25 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -1524,26 +1524,27 @@ static int insert_labels(struct cmt *metrics_context, pair->key); if (result == FLB_TRUE) { - result = metrics_context_insert_dynamic_label(metrics_context, - pair->key, - pair->val); + continue; + } - if (result == FLB_FALSE) { - return FLB_FALSE; - } + result = metrics_context_insert_dynamic_label(metrics_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_FALSE; } - else { - result = metrics_context_contains_static_label(metrics_context, - pair->key); - if (result == FLB_FALSE) { - result = metrics_context_insert_static_label(metrics_context, - pair->key, - pair->val); + result = metrics_context_contains_static_label(metrics_context, + pair->key); - if (result == FLB_FALSE) { - return FLB_FALSE; - } + if (result == FLB_TRUE) { + result = metrics_context_insert_static_label(metrics_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_FALSE; } } } From bf955cff4041b0591a5de08e85bd606aa238a25a Mon Sep 17 00:00:00 2001 From: xiaobaowen <1187873955@qq.com> Date: Wed, 15 May 2024 22:39:36 +0800 Subject: [PATCH 06/24] kernel: fix incorrect assignment of kernel version between major and minor (#8797) Signed-off-by: xiaobaowen Co-authored-by: xiaobaowen Signed-off-by: Markus Bergholz --- src/flb_kernel.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_kernel.c b/src/flb_kernel.c index 5a6fe03c9c7..e80d25fdd3e 100644 --- a/src/flb_kernel.c +++ b/src/flb_kernel.c @@ -122,8 +122,8 @@ struct flb_kernel *flb_kernel_info() flb_errno(); return NULL; } - kernel->minor = a; - kernel->major = b; + kernel->major = a; + kernel->minor = b; kernel->patch = c; kernel->s_version.data = flb_malloc(16); From aa995a178ea60a05c21a0018bbc1bb52aee4a2d1 Mon Sep 17 00:00:00 2001 From: Maneesh Singh Date: Wed, 15 May 2024 07:44:58 -0700 Subject: [PATCH 07/24] out_splunk: record accessor key for hec_token (#8793) Signed-off-by: Maneesh Singh Signed-off-by: Markus Bergholz --- plugins/out_splunk/splunk.c | 6 +++--- plugins/out_splunk/splunk_conf.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/out_splunk/splunk.c b/plugins/out_splunk/splunk.c index d7493f9d24f..6c25b5b4ac6 100644 --- a/plugins/out_splunk/splunk.c +++ b/plugins/out_splunk/splunk.c @@ -353,9 +353,9 @@ static flb_sds_t extract_hec_token(struct flb_splunk *ctx, msgpack_object map, flb_sds_t hec_token; /* Extract HEC token (map which is from metadata lookup) */ - if (ctx->event_sourcetype_key) { - hec_token = flb_ra_translate(ctx->ra_metadata_auth_key, tag, tag_len, - map, NULL); + if (ctx->metadata_auth_key) { + hec_token = flb_ra_translate_check(ctx->ra_metadata_auth_key, tag, tag_len, + map, NULL, FLB_TRUE); if (hec_token) { return hec_token; } diff --git a/plugins/out_splunk/splunk_conf.c b/plugins/out_splunk/splunk_conf.c index 06902ef227f..2cc78280883 100644 --- a/plugins/out_splunk/splunk_conf.c +++ b/plugins/out_splunk/splunk_conf.c @@ -263,7 +263,7 @@ struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, } /* Currently, Splunk HEC token is stored in a fixed key, hec_token. */ - ctx->metadata_auth_key = "hec_token"; + ctx->metadata_auth_key = "$hec_token"; if (ctx->metadata_auth_key) { ctx->ra_metadata_auth_key = flb_ra_create(ctx->metadata_auth_key, FLB_TRUE); if (!ctx->ra_metadata_auth_key) { From a17b95319db0c4109d9afd937828553058803eac Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Wed, 15 May 2024 10:47:49 -0400 Subject: [PATCH 08/24] out_datadog: Update the descriptions of special field options #8751 These options are useful when there is no attribute in the incoming log that indicates the field to use for the `source` or `service` attributes, but otherwise the backend will automatically look for both in the incoming log attributes. Ref: #8687 Signed-off-by: Jesse Szwedko Signed-off-by: Markus Bergholz --- plugins/out_datadog/datadog.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/plugins/out_datadog/datadog.c b/plugins/out_datadog/datadog.c index d0b1cf3d433..c92992f534e 100644 --- a/plugins/out_datadog/datadog.c +++ b/plugins/out_datadog/datadog.c @@ -495,19 +495,24 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "dd_service", NULL, 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_service), - "The human readable name for your service generating the logs " - "- the name of your application or database." + "The human readable name for your service generating the logs " + "(e.g. the name of your application or database). If unset, Datadog " + "will look for the service using Service Remapper in Log Management " + "(by default it will look at the `service` and `syslog.appname` attributes)." + "" }, { FLB_CONFIG_MAP_STR, "dd_source", NULL, 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_source), - "A human readable name for the underlying technology of your service. " - "For example, 'postgres' or 'nginx'." + "A human readable name for the underlying technology of your service " + "(e.g. 'postgres' or 'nginx'). If unset, Datadog will expect the source " + "to be set as the `ddsource` attribute." }, { FLB_CONFIG_MAP_STR, "dd_tags", NULL, 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_tags), - "The tags you want to assign to your logs in Datadog." + "The tags you want to assign to your logs in Datadog. If unset, Datadog " + "will expect the tags in the `ddtags` attribute." }, { From 16fc700100477c1a0accaeb7b2a17edebbd56a3e Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 15 May 2024 10:51:02 -0400 Subject: [PATCH 09/24] Merge pull request from GHSA-5rjf-prwh-pp7q * api/v1/traces: validate inputs when enabling traces. validate the array of inputs when enabling multiple traces that they are strings. this patch also refactors out the allocation of said input name. Signed-off-by: Phillip Adair Stewart Whelan * api/v1/traces: disable traces api when tracing is disabled. Signed-off-by: Phillip Adair Stewart Whelan * api/v1/trace: use macros for strings and lengths in responses. avoid strlen when creating http response, especially in loops, by predefining them via macros. Signed-off-by: Phillip Whelan * api/v1/trace: use sizeof for string length macros. Signed-off-by: Phillip Whelan * api/v1/trace: use signed lenghts for strings. this avoid potential integer overflows when using them as specifiers for format strings. Signed-off-by: Phillip Whelan * api/v1/traces: use macro for inputs string. Signed-off-by: Phillip Whelan * api/v1/traces: use sizeof when comparing against base path. Signed-off-by: Phillip Whelan * api/v1/traces: replace strlen with flb_sds_len when using flb_sds_t. Signed-off-by: Phillip Whelan --------- Signed-off-by: Phillip Adair Stewart Whelan Signed-off-by: Phillip Whelan Signed-off-by: Markus Bergholz --- src/http_server/api/v1/trace.c | 219 ++++++++++++++++++++------------- 1 file changed, 135 insertions(+), 84 deletions(-) diff --git a/src/http_server/api/v1/trace.c b/src/http_server/api/v1/trace.c index 0c57f51f89e..1eb5ab70fbf 100644 --- a/src/http_server/api/v1/trace.c +++ b/src/http_server/api/v1/trace.c @@ -30,8 +30,28 @@ #include #include - -static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name) +#define STR_INPUTS "inputs" +#define STR_INPUTS_LEN (sizeof(STR_INPUTS)-1) + +#define HTTP_FIELD_MESSAGE "message" +#define HTTP_FIELD_MESSAGE_LEN (sizeof(HTTP_FIELD_MESSAGE)-1) +#define HTTP_FIELD_STATUS "status" +#define HTTP_FIELD_STATUS_LEN (sizeof(HTTP_FIELD_STATUS)-1) +#define HTTP_FIELD_RETURNCODE "returncode" +#define HTTP_FIELD_RETURNCODE_LEN (sizeof(HTTP_FIELD_RETURNCODE)-1) + +#define HTTP_RESULT_OK "ok" +#define HTTP_RESULT_OK_LEN (sizeof(HTTP_RESULT_OK)-1) +#define HTTP_RESULT_ERROR "error" +#define HTTP_RESULT_ERROR_LEN (sizeof(HTTP_RESULT_ERROR)-1) +#define HTTP_RESULT_NOTFOUND "not found" +#define HTTP_RESULT_NOTFOUND_LEN (sizeof(HTTP_RESULT_NOTFOUND)-1) +#define HTTP_RESULT_METHODNOTALLOWED "method not allowed" +#define HTTP_RESULT_METHODNOTALLOWED_LEN (sizeof(HTTP_RESULT_METHODNOTALLOWED)-1) +#define HTTP_RESULT_UNKNOWNERROR "unknown error" +#define HTTP_RESULT_UNKNOWNERROR_LEN (sizeof(HTTP_RESULT_UNKNOWNERROR)-1) + +static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name, size_t nlen) { struct mk_list *head; struct flb_input_instance *in; @@ -39,7 +59,10 @@ static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name mk_list_foreach(head, &hs->config->inputs) { in = mk_list_entry(head, struct flb_input_instance, _head); - if (strcmp(name, in->name) == 0) { + if (strlen(in->name) != nlen) { + continue; + } + if (strncmp(name, in->name, nlen) == 0) { return in; } if (in->alias) { @@ -51,26 +74,33 @@ static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name return NULL; } -static int enable_trace_input(struct flb_hs *hs, const char *name, const char *prefix, const char *output_name, struct mk_list *props) +static int enable_trace_input(struct flb_hs *hs, const char *name, ssize_t nlen, const char *prefix, + const char *output_name, struct mk_list *props) { struct flb_input_instance *in; - - in = find_input(hs, name); + in = find_input(hs, name, nlen); if (in == NULL) { + flb_error("unable to find input: [%d]%.*s", (int)nlen, (int)nlen, name); return 404; } flb_chunk_trace_context_new(in, output_name, prefix, NULL, props); - return (in->chunk_trace_ctxt == NULL ? 503 : 0); + + if (in->chunk_trace_ctxt == NULL) { + flb_error("unable to start tracing"); + return 503; + } + + return 0; } -static int disable_trace_input(struct flb_hs *hs, const char *name) +static int disable_trace_input(struct flb_hs *hs, const char *name, size_t nlen) { struct flb_input_instance *in; - - in = find_input(hs, name); + + in = find_input(hs, name, nlen); if (in == NULL) { return 404; } @@ -89,32 +119,35 @@ static flb_sds_t get_input_name(mk_request_t *request) if (request->real_path.data == NULL) { return NULL; } - if (request->real_path.len < strlen(base)) { + if (request->real_path.len < sizeof(base)-1) { return NULL; } - return flb_sds_create_len(&request->real_path.data[strlen(base)], - request->real_path.len - strlen(base)); + return flb_sds_create_len(&request->real_path.data[sizeof(base)-1], + request->real_path.len - sizeof(base)-1); } -static int http_disable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck) +static int http_disable_trace(mk_request_t *request, void *data, + const char *input_name, size_t input_nlen, + msgpack_packer *mp_pck) { struct flb_hs *hs = data; int toggled_on = 503; - toggled_on = disable_trace_input(hs, input_name); + toggled_on = disable_trace_input(hs, input_name, input_nlen); if (toggled_on < 300) { msgpack_pack_map(mp_pck, 1); - msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); return 201; } return toggled_on; } -static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, const char *input_name) +static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, + const char *input_name, ssize_t input_nlen) { int ret = -1; int i; @@ -130,11 +163,11 @@ static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *resu msgpack_object_str *param_val; - if (result->data.type == MSGPACK_OBJECT_MAP) { + if (result->data.type == MSGPACK_OBJECT_MAP) { for (i = 0; i < result->data.via.map.size; i++) { key = &result->data.via.map.ptr[i].key; val = &result->data.via.map.ptr[i].val; - + if (key->type != MSGPACK_OBJECT_STR) { ret = -1; goto parse_error; @@ -193,7 +226,7 @@ static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *resu output_name = flb_sds_create("stdout"); } - toggled_on = enable_trace_input(hs, input_name, prefix, output_name, props); + toggled_on = enable_trace_input(hs, input_name, input_nlen, prefix, output_name, props); if (!toggled_on) { ret = -1; goto parse_error; @@ -210,7 +243,9 @@ static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *resu return ret; } -static int http_enable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck) +static int http_enable_trace(mk_request_t *request, void *data, + const char *input_name, ssize_t input_nlen, + msgpack_packer *mp_pck) { char *buf = NULL; size_t buf_size; @@ -229,18 +264,18 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu struct mk_list *props = NULL; struct flb_chunk_trace_limit limit = { 0 }; struct flb_input_instance *input_instance; - + if (request->method == MK_METHOD_GET) { - ret = enable_trace_input(hs, input_name, "trace.", "stdout", NULL); + ret = enable_trace_input(hs, input_name, input_nlen, "trace.", "stdout", NULL); if (ret == 0) { msgpack_pack_map(mp_pck, 1); - msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); return 200; } else { - flb_error("unable to enable tracing for %s", input_name); + flb_error("unable to enable tracing for %.*s", (int)input_nlen, input_name); goto input_error; } } @@ -257,7 +292,7 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu rc = msgpack_unpack_next(&result, buf, buf_size, &off); if (rc != MSGPACK_UNPACK_SUCCESS) { ret = 503; - flb_error("unable to unpack msgpack parameters for %s", input_name); + flb_error("unable to unpack msgpack parameters for %.*s", (int)input_nlen, input_name); goto unpack_error; } @@ -265,7 +300,7 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu for (i = 0; i < result.data.via.map.size; i++) { key = &result.data.via.map.ptr[i].key; val = &result.data.via.map.ptr[i].val; - + if (key->type != MSGPACK_OBJECT_STR) { ret = 503; flb_error("non string key in parameters"); @@ -359,14 +394,14 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu output_name = flb_sds_create("stdout"); } - ret = enable_trace_input(hs, input_name, prefix, output_name, props); + ret = enable_trace_input(hs, input_name, input_nlen, prefix, output_name, props); if (ret != 0) { flb_error("error when enabling tracing"); goto parse_error; } if (limit.type != 0) { - input_instance = find_input(hs, input_name); + input_instance = find_input(hs, input_name, input_nlen); if (limit.type == FLB_CHUNK_TRACE_LIMIT_TIME) { flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.seconds); } @@ -377,8 +412,8 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu } msgpack_pack_map(mp_pck, 1); - msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); ret = 200; parse_error: @@ -417,21 +452,21 @@ static void cb_trace(mk_request_t *request, void *data) } if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) { - response = http_enable_trace(request, data, input_name, &mp_pck); + response = http_enable_trace(request, data, input_name, flb_sds_len(input_name), &mp_pck); } else if (request->method == MK_METHOD_DELETE) { - response = http_disable_trace(request, data, input_name, &mp_pck); + response = http_disable_trace(request, data, input_name, flb_sds_len(input_name), &mp_pck); } error: if (response == 404) { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_NOTFOUND, HTTP_RESULT_NOTFOUND_LEN); } else if (response == 503) { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_ERROR, HTTP_RESULT_ERROR_LEN); } if (input_name != NULL) { @@ -466,11 +501,11 @@ static void cb_traces(mk_request_t *request, void *data) msgpack_unpacked result; flb_sds_t error_msg = NULL; int response = 200; - flb_sds_t input_name; + const char *input_name; + ssize_t input_nlen; msgpack_object_array *inputs = NULL; size_t off = 0; int i; - /* initialize buffers */ msgpack_sbuffer_init(&mp_sbuf); @@ -503,10 +538,10 @@ static void cb_traces(mk_request_t *request, void *data) if (result.data.via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) { continue; } - if (result.data.via.map.ptr[i].key.via.str.size < strlen("inputs")) { + if (result.data.via.map.ptr[i].key.via.str.size < STR_INPUTS_LEN) { continue; } - if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, "inputs", strlen("inputs"))) { + if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, STR_INPUTS, STR_INPUTS_LEN)) { continue; } inputs = &result.data.via.map.ptr[i].val.via.array; @@ -517,48 +552,61 @@ static void cb_traces(mk_request_t *request, void *data) error_msg = flb_sds_create("inputs not found"); goto unpack_error; } - + msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "inputs", strlen("inputs")); + msgpack_pack_str_with_body(&mp_pck, STR_INPUTS, STR_INPUTS_LEN); msgpack_pack_map(&mp_pck, inputs->size); for (i = 0; i < inputs->size; i++) { - input_name = flb_sds_create_len(inputs->ptr[i].via.str.ptr, inputs->ptr[i].via.str.size); - msgpack_pack_str_with_body(&mp_pck, input_name, flb_sds_len(input_name)); - if (inputs->ptr[i].type != MSGPACK_OBJECT_STR) { - msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + if (inputs->ptr[i].type != MSGPACK_OBJECT_STR || inputs->ptr[i].via.str.ptr == NULL) { + response = 503; + error_msg = flb_sds_create("invalid input"); + msgpack_sbuffer_clear(&mp_sbuf); + goto unpack_error; } - else { - if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) { - ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, input_name); - if (ret != 0) { - msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); - msgpack_pack_str_with_body(&mp_pck, "returncode", strlen("returncode")); - msgpack_pack_int64(&mp_pck, ret); - } - else { - msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok")); - } - } - else if (request->method == MK_METHOD_DELETE) { - disable_trace_input((struct flb_hs *)data, input_name); + } + + for (i = 0; i < inputs->size; i++) { + + input_name = inputs->ptr[i].via.str.ptr; + input_nlen = inputs->ptr[i].via.str.size; + + msgpack_pack_str_with_body(&mp_pck, input_name, input_nlen); + + if (request->method == MK_METHOD_POST) { + + ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, + input_name, input_nlen); + + if (ret != 0) { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_ERROR, HTTP_RESULT_ERROR_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_RETURNCODE, + HTTP_FIELD_RETURNCODE_LEN); + msgpack_pack_int64(&mp_pck, ret); } else { - msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); - msgpack_pack_str_with_body(&mp_pck, "message", strlen("message")); - msgpack_pack_str_with_body(&mp_pck, "method not allowed", strlen("method not allowed")); + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); } } + else if (request->method == MK_METHOD_DELETE) { + disable_trace_input((struct flb_hs *)data, input_name, input_nlen); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); + } + else { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_ERROR, HTTP_RESULT_ERROR_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_MESSAGE, HTTP_FIELD_MESSAGE_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_METHODNOTALLOWED, + HTTP_RESULT_METHODNOTALLOWED_LEN); + } } msgpack_pack_str_with_body(&mp_pck, "result", strlen("result")); @@ -569,26 +617,27 @@ static void cb_traces(mk_request_t *request, void *data) msgpack_unpacked_destroy(&result); if (response == 404) { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_NOTFOUND, HTTP_RESULT_NOTFOUND_LEN); } else if (response == 503) { msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); - msgpack_pack_str_with_body(&mp_pck, "message", strlen("message")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_MESSAGE, HTTP_FIELD_MESSAGE_LEN); if (error_msg) { msgpack_pack_str_with_body(&mp_pck, error_msg, flb_sds_len(error_msg)); flb_sds_destroy(error_msg); } else { - msgpack_pack_str_with_body(&mp_pck, "unknown error", strlen("unknown error")); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_UNKNOWNERROR, + HTTP_RESULT_UNKNOWNERROR_LEN); } } else { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); } /* Export to JSON */ @@ -609,7 +658,9 @@ static void cb_traces(mk_request_t *request, void *data) /* Perform registration */ int api_v1_trace(struct flb_hs *hs) { - mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs); - mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs); + if (hs->config->enable_chunk_trace == FLB_TRUE) { + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs); + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs); + } return 0; } From 65fc34d6761d87d587a9d848bb551901d716cbf9 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 16 May 2024 11:57:49 +0900 Subject: [PATCH 10/24] in_premetheus_remote_write: Implement handler of payloads of prometheus remote write protocol (#8725) in_prometheus_remote_write: Implement prometheus remote write input plugin. This plugin is able to handle the following types currently: - Counter - Gauge - Untyped - Histogram Summary type of metrics shouldn't be handled and decoded correctly for now. --------- Signed-off-by: Hiroshi Hatake Signed-off-by: Markus Bergholz --- CMakeLists.txt | 1 + cmake/windows-setup.cmake | 1 + plugins/CMakeLists.txt | 1 + .../in_prometheus_remote_write/CMakeLists.txt | 12 + plugins/in_prometheus_remote_write/prom_rw.c | 250 +++++++++ plugins/in_prometheus_remote_write/prom_rw.h | 60 +++ .../prom_rw_config.c | 102 ++++ .../prom_rw_config.h | 29 ++ .../in_prometheus_remote_write/prom_rw_conn.c | 300 +++++++++++ .../in_prometheus_remote_write/prom_rw_conn.h | 57 ++ .../in_prometheus_remote_write/prom_rw_prot.c | 489 ++++++++++++++++++ .../in_prometheus_remote_write/prom_rw_prot.h | 39 ++ 12 files changed, 1341 insertions(+) create mode 100644 plugins/in_prometheus_remote_write/CMakeLists.txt create mode 100644 plugins/in_prometheus_remote_write/prom_rw.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw.h create mode 100644 plugins/in_prometheus_remote_write/prom_rw_config.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw_config.h create mode 100644 plugins/in_prometheus_remote_write/prom_rw_conn.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw_conn.h create mode 100644 plugins/in_prometheus_remote_write/prom_rw_prot.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw_prot.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a901fbf36ed..4960b533aa8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -209,6 +209,7 @@ option(FLB_IN_ELASTICSEARCH "Enable Elasticsearch (Bulk API) input pl option(FLB_IN_CALYPTIA_FLEET "Enable Calyptia Fleet input plugin" Yes) option(FLB_IN_SPLUNK "Enable Splunk HTTP HEC input plugin" Yes) option(FLB_IN_PROCESS_EXPORTER_METRICS "Enable process exporter metrics input plugin" Yes) +option(FLB_IN_PROMETHEUS_REMOTE_WRITE "Enable prometheus remote write input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_LOGS_INGESTION "Enable Azure Logs Ingestion output plugin" Yes) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 53a6775c2de..60b67bc1e99 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -61,6 +61,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_IN_PODMAN_METRICS No) set(FLB_IN_ELASTICSEARCH Yes) set(FLB_IN_SPLUNK Yes) + set(FLB_IN_PROMETHEUS_REMOTE_WRITE Yes) # OUTPUT plugins # ============== diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 1bd63924b71..9006ef6d823 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -229,6 +229,7 @@ REGISTER_IN_PLUGIN("in_opentelemetry") REGISTER_IN_PLUGIN("in_elasticsearch") REGISTER_IN_PLUGIN("in_calyptia_fleet") REGISTER_IN_PLUGIN("in_splunk") +REGISTER_IN_PLUGIN("in_prometheus_remote_write") # Test the event loop messaging when used in threaded mode REGISTER_IN_PLUGIN("in_event_test") diff --git a/plugins/in_prometheus_remote_write/CMakeLists.txt b/plugins/in_prometheus_remote_write/CMakeLists.txt new file mode 100644 index 00000000000..ee8ce703ae2 --- /dev/null +++ b/plugins/in_prometheus_remote_write/CMakeLists.txt @@ -0,0 +1,12 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "Prometheus remote write input plugin requires FLB_HTTP_SERVER=On.") +endif() + +set(src + prom_rw.c + prom_rw_prot.c + prom_rw_conn.c + prom_rw_config.c + ) + +FLB_PLUGIN(in_prometheus_remote_write "${src}" "monkey-core-static") diff --git a/plugins/in_prometheus_remote_write/prom_rw.c b/plugins/in_prometheus_remote_write/prom_rw.c new file mode 100644 index 00000000000..e2cb852815c --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw.c @@ -0,0 +1,250 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.in_in (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.in_in + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" +#include "prom_rw_prot.h" +#include "prom_rw_config.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int prom_rw_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct prom_remote_write_conn *conn; + struct flb_prom_remote_write *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); + + conn = prom_rw_conn_add(connection, ctx); + + if (conn == NULL) { + return -1; + } + + return 0; +} + +static int prom_rw_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_prom_remote_write *ctx; + + (void) data; + + /* Create context and basic conf */ + ctx = prom_rw_config_create(ins); + if (!ctx) { + return -1; + } + ctx->collector_id = -1; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + prom_rw_config_destroy(ctx); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + if (ctx->enable_http2) { + ret = flb_http_server_init(&ctx->http_server, + HTTP_PROTOCOL_AUTODETECT, + (FLB_HTTP_SERVER_FLAG_KEEPALIVE | FLB_HTTP_SERVER_FLAG_AUTO_INFLATE), + NULL, + ins->host.listen, + ins->host.port, + ins->tls, + ins->flags, + &ins->net_setup, + flb_input_event_loop_get(ins), + ins->config, + (void *) ctx); + + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not initialize http server on %s:%u. Aborting", + ins->host.listen, ins->host.port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + ret = flb_http_server_start(&ctx->http_server); + + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not start http server on %s:%u. Aborting", + ins->host.listen, ins->host.port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + ctx->http_server.request_callback = prom_rw_prot_handle_ng; + + flb_input_downstream_set(ctx->http_server.downstream, ctx->ins); + } + else { + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + ins->flags, + ctx->listen, + port, + ins->tls, + config, + &ins->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + flb_input_downstream_set(ctx->downstream, ctx->ins); + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + prom_rw_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + prom_rw_config_destroy(ctx); + return -1; + } + + ctx->collector_id = ret; + } + + flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port); + + if (ctx->successful_response_code != 200 && + ctx->successful_response_code != 201 && + ctx->successful_response_code != 204) { + flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201", + ctx->successful_response_code); + ctx->successful_response_code = 201; + } + + return 0; +} + +static int prom_rw_exit(void *data, struct flb_config *config) +{ + struct flb_prom_remote_write *ctx; + + (void) config; + + ctx = data; + + if (ctx != NULL) { + prom_rw_config_destroy(ctx); + } + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "http2", "true", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, enable_http2), + NULL + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_max_size), + "" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_chunk_size), + "" + }, + + { + FLB_CONFIG_MAP_STR, "uri", NULL, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, uri), + "Specify an optional HTTP URI for the target web server, e.g: /something" + }, + + { + FLB_CONFIG_MAP_BOOL, "tag_from_uri", "true", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, tag_from_uri), + "If true, tag will be created from uri. e.g. v1_metrics from /v1/metrics ." + }, + { + FLB_CONFIG_MAP_INT, "successful_response_code", "201", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, successful_response_code), + "Set successful response code. 200, 201 and 204 are supported." + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_prometheus_remote_write_plugin = { + .name = "prometheus_remote_write", + .description = "Prometheus Remote Write input", + .cb_init = prom_rw_init, + .cb_pre_run = NULL, + .cb_collect = prom_rw_collect, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = prom_rw_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/plugins/in_prometheus_remote_write/prom_rw.h b/plugins/in_prometheus_remote_write/prom_rw.h new file mode 100644 index 00000000000..698e5c89dd5 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_H +#define FLB_IN_PROM_RW_H + +#include +#include +#include + +#include +#include + +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" + +struct flb_prom_remote_write { + int successful_response_code; + flb_sds_t listen; + flb_sds_t tcp_port; + int tag_from_uri; + + struct flb_input_instance *ins; + + /* HTTP URI */ + char *uri; + + /* New gen HTTP server */ + int enable_http2; + struct flb_http_server http_server; + + /* Legacy HTTP server */ + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + int collector_id; /* Listener collector id */ + struct flb_downstream *downstream; /* Client manager */ + struct mk_list connections; /* linked list of connections */ + + struct mk_server *server; +}; + + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_config.c b/plugins/in_prometheus_remote_write/prom_rw_config.c new file mode 100644 index 00000000000..3df2ba125f6 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_config.c @@ -0,0 +1,102 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "prom_rw.h" +#include "prom_rw_config.h" +#include "prom_rw_conn.h" + +/* default HTTP port for prometheus remote write */ +#define PROMETHEUS_REMOTE_WRITE_HTTP_PORT 8080 + +struct flb_prom_remote_write *prom_rw_config_create(struct flb_input_instance *ins) +{ + int ret; + char port[8]; + struct flb_prom_remote_write *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_prom_remote_write)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0:80) */ + flb_input_net_default_listener("0.0.0.0", PROMETHEUS_REMOTE_WRITE_HTTP_PORT, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + if (ctx->server == NULL) { + flb_plg_error(ctx->ins, "error on mk_server allocation"); + prom_rw_config_destroy(ctx); + return NULL; + } + ctx->server->keep_alive = MK_TRUE; + + /* monkey detects server->workers == 0 as the server not being initialized at the + * moment so we want to make sure that it stays that way! + */ + + return ctx; +} + +int prom_rw_config_destroy(struct flb_prom_remote_write *ctx) +{ + /* release all connections */ + prom_rw_conn_release_all(ctx); + + if (ctx->collector_id != -1) { + flb_input_collector_delete(ctx->collector_id, ctx->ins); + + ctx->collector_id = -1; + } + + if (ctx->downstream != NULL) { + flb_downstream_destroy(ctx->downstream); + } + + if (ctx->enable_http2) { + flb_http_server_destroy(&ctx->http_server); + } + + if (ctx->server) { + flb_free(ctx->server); + } + + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + + return 0; +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_config.h b/plugins/in_prometheus_remote_write/prom_rw_config.h new file mode 100644 index 00000000000..e1b624bf004 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_CONFIG_H +#define FLB_IN_PROM_RW_CONFIG_H + +#include +#include "prom_rw.h" + +struct flb_prom_remote_write *prom_rw_config_create(struct flb_input_instance *ins); +int prom_rw_config_destroy(struct flb_prom_remote_write *ctx); + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_conn.c b/plugins/in_prometheus_remote_write/prom_rw_conn.c new file mode 100644 index 00000000000..7f730fdbda6 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_conn.c @@ -0,0 +1,300 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" +#include "prom_rw_prot.h" + +static void prom_rw_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request); + +static int prom_rw_conn_event(void *data) +{ + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + char *request_end; + size_t request_len; + struct prom_remote_write_conn *conn; + struct mk_event *event; + struct flb_prom_remote_write *ctx; + struct flb_connection *connection; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + prom_rw_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = flb_io_net_read(connection, + (void *) &conn->buf_data[conn->buf_len], + available); + + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + prom_rw_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + status = mk_http_parser(&conn->request, &conn->session.parser, + conn->buf_data, conn->buf_len, conn->session.server); + + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + prom_rw_prot_handle(ctx, conn, &conn->session, &conn->request); + + /* Evict the processed request from the connection buffer and reinitialize + * the HTTP parser. + */ + + request_end = NULL; + + if (NULL != conn->request.data.data) { + request_end = &conn->request.data.data[conn->request.data.len]; + } + else { + request_end = strstr(conn->buf_data, "\r\n\r\n"); + + if(NULL != request_end) { + request_end = &request_end[4]; + } + } + + if (NULL != request_end) { + request_len = (size_t)(request_end - conn->buf_data); + + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); + + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + + conn->buf_len = 0; + } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + prom_rw_conn_request_init(&conn->session, &conn->request); + } + } + else if (status == MK_HTTP_PARSER_ERROR) { + prom_rw_prot_handle_error(ctx, conn, &conn->session, &conn->request); + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + prom_rw_conn_request_init(&conn->session, &conn->request); + } + + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + prom_rw_conn_del(conn); + return -1; + } + + return 0; + +} + +static void prom_rw_conn_session_init(struct mk_http_session *session, + struct mk_server *server, + int client_fd) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + session->socket = client_fd; + + /* creation time in unix time */ + session->init_time = time(NULL); + + session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket); + session->channel->io = session->server->network; + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +static void prom_rw_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request) +{ + memset(request, 0, sizeof(struct mk_http_request)); + + mk_http_request_init(session, request, session->server); + + request->in_headers.type = MK_STREAM_IOV; + request->in_headers.dynamic = MK_FALSE; + request->in_headers.cb_consumed = NULL; + request->in_headers.cb_finished = NULL; + request->in_headers.stream = &request->stream; + + mk_list_add(&request->in_headers._head, &request->stream.inputs); + + request->session = session; +} + +struct prom_remote_write_conn *prom_rw_conn_add(struct flb_connection *connection, + struct flb_prom_remote_write *ctx) +{ + struct prom_remote_write_conn *conn; + int ret; + + conn = flb_calloc(1, sizeof(struct prom_remote_write_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = prom_rw_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(flb_engine_evl_get(), + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + flb_free(conn->buf_data); + flb_free(conn); + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + prom_rw_conn_session_init(&conn->session, ctx->server, connection->fd); + + /* Initialize HTTP Request: this is the initial request and it will be reinitialized + * automatically after the request is handled so it can be used for the next one. + */ + prom_rw_conn_request_init(&conn->session, &conn->request); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + return conn; +} + +int prom_rw_conn_del(struct prom_remote_write_conn *conn) +{ + if (conn->session.channel != NULL) { + mk_channel_release(conn->session.channel); + } + + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + mk_list_del(&conn->_head); + + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void prom_rw_conn_release_all(struct flb_prom_remote_write *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct prom_remote_write_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct prom_remote_write_conn, _head); + prom_rw_conn_del(conn); + } +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_conn.h b/plugins/in_prometheus_remote_write/prom_rw_conn.h new file mode 100644 index 00000000000..1716c32b87a --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_conn.h @@ -0,0 +1,57 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_CONN_H +#define FLB_IN_PROM_RW_CONN_H + +#include +#include +#include +#include + +#include "prom_rw_conn.h" + +struct prom_remote_write_conn { + struct mk_event event; /* Built-in event data for mk_events */ + + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + struct flb_connection *connection; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_opentelemetry->connections */ +}; + +struct prom_remote_write_conn *prom_rw_conn_add(struct flb_connection *connection, + struct flb_prom_remote_write *ctx); +int prom_rw_conn_del(struct prom_remote_write_conn *conn); +void prom_rw_conn_release_all(struct flb_prom_remote_write *ctx); + + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_prot.c b/plugins/in_prometheus_remote_write/prom_rw_prot.c new file mode 100644 index 00000000000..1ffc23fdc44 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_prot.c @@ -0,0 +1,489 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" + +static int send_response(struct flb_input_instance *in, + struct prom_remote_write_conn *conn, + int http_status, char *message) +{ + int len; + flb_sds_t out; + size_t sent; + ssize_t bytes; + int result; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 201) { + flb_sds_printf(&out, + "HTTP/1.1 201 Created \r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 204) { + flb_sds_printf(&out, + "HTTP/1.1 204 No Content\r\n" + "Server: Fluent Bit v%s\r\n" + "\r\n", + FLB_VERSION_STR); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + + /* We should check the outcome of this operation */ + bytes = flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + if (bytes == -1) { + flb_plg_error(in, "cannot send response"); + + result = -1; + } + else { + result = 0; + } + + flb_sds_destroy(out); + + return result; +} + +static int process_payload_metrics(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + struct cmt *context; + int result; + + result = cmt_decode_prometheus_remote_write_create(&context, + request->data.data, + request->data.len); + + if (result == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics : %d", result); + } + + cmt_decode_prometheus_remote_write_destroy(context); + } + + return 0; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +static int uncompress_snappy(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_snappy_uncompress_framed_data(input_buffer, + input_size, + output_buffer, + output_size); + + if (ret != 0) { + flb_error("[opentelemetry] snappy decompression failed"); + + return -1; + } + + return 1; +} + +static int uncompress_gzip(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_gzip_uncompress(input_buffer, + input_size, + (void *) output_buffer, + output_size); + + if (ret == -1) { + flb_error("[opentelemetry] gzip decompression failed"); + + return -1; + } + + return 1; +} + +int prom_rw_prot_uncompress(struct mk_http_session *session, + struct mk_http_request *request, + char **output_buffer, + size_t *output_size) +{ + struct mk_http_header *header; + size_t index; + + *output_buffer = NULL; + *output_size = 0; + + for (index = 0; + index < session->parser.headers_extra_count; + index++) { + header = &session->parser.headers_extra[index]; + + if (strncasecmp(header->key.data, "Content-Encoding", 16) == 0) { + if (strncasecmp(header->val.data, "gzip", 4) == 0) { + return uncompress_gzip(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "snappy", 6) == 0) { + return uncompress_snappy(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else { + return -2; + } + } + } + + return 0; +} + + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int prom_rw_prot_handle(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret = -1; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + char *original_data; + size_t original_data_size; + char *uncompressed_data; + size_t uncompressed_data_size; + + if (request->uri.data[0] != '/') { + send_response(ctx->ins, conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + if (ctx->uri != NULL && strcmp(uri, ctx->uri) != 0) { + send_response(ctx->ins, conn, 400, "error: invalid endpoint\n"); + mk_mem_free(uri); + + return -1; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Compose the query string using the URI */ + len = strlen(uri); + + if (ctx->tag_from_uri != FLB_TRUE) { + tag = flb_sds_create(ctx->ins->tag); + } + else { + tag = flb_sds_create_size(len); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* New tag skipping the URI '/' */ + flb_sds_cat(tag, uri + 1, len - 1); + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + mk_mem_free(uri); + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE); + + if (request->method != MK_METHOD_POST) { + flb_sds_destroy(tag); + mk_mem_free(uri); + send_response(ctx->ins, conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + original_data = request->data.data; + original_data_size = request->data.len; + + ret = prom_rw_prot_uncompress(session, request, + &uncompressed_data, + &uncompressed_data_size); + + if (ret > 0) { + request->data.data = uncompressed_data; + request->data.len = uncompressed_data_size; + } + + if (ctx->uri != NULL && strcmp(uri, ctx->uri) == 0) { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + else { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + + if (uncompressed_data != NULL) { + flb_free(uncompressed_data); + } + + request->data.data = original_data; + request->data.len = original_data_size; + + mk_mem_free(uri); + flb_sds_destroy(tag); + + send_response(ctx->ins, conn, ctx->successful_response_code, NULL); + + return ret; +} + +/* + * Handle an incoming request which has resulted in an http parser error. + */ +int prom_rw_prot_handle_error( + struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + send_response(ctx->ins, conn, 400, "error: invalid request\n"); + return -1; +} + + +/* New gen HTTP server */ +static int send_response_ng(struct flb_http_response *response, + int http_status, + char *message) +{ + flb_http_response_set_status(response, http_status); + + if (http_status == 201) { + flb_http_response_set_message(response, "Created"); + } + else if (http_status == 200) { + flb_http_response_set_message(response, "OK"); + } + else if (http_status == 204) { + flb_http_response_set_message(response, "No Content"); + } + else if (http_status == 400) { + flb_http_response_set_message(response, "Forbidden"); + } + + if (message != NULL) { + flb_http_response_set_body(response, + (unsigned char *) message, + strlen(message)); + } + + flb_http_response_commit(response); + + return 0; +} + +static int process_payload_metrics_ng(struct flb_prom_remote_write *ctx, + flb_sds_t tag, + struct flb_http_request *request, + struct flb_http_response *response) +{ + struct cmt *context; + int result; + + result = cmt_decode_prometheus_remote_write_create(&context, + request->body, + cfl_sds_len(request->body)); + + if (result == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics : %d", result); + } + + cmt_decode_prometheus_remote_write_destroy(context); + } + + return 0; +} + +int prom_rw_prot_handle_ng(struct flb_http_request *request, + struct flb_http_response *response) +{ + struct flb_prom_remote_write *context; + int result; + + context = (struct flb_prom_remote_write *) response->stream->user_data; + + if (request->path[0] != '/') { + send_response_ng(response, 400, "error: invalid request\n"); + return -1; + } + + /* ToDo: Fix me */ + /* HTTP/1.1 needs Host header */ + if (request->protocol_version == HTTP_PROTOCOL_HTTP1 && + request->host == NULL) { + + return -1; + } + + if (request->method != HTTP_METHOD_POST) { + send_response_ng(response, 400, "error: invalid HTTP method\n"); + + return -1; + } + + if (context->uri != NULL && strcmp(request->path, context->uri) == 0) { + result = process_payload_metrics_ng(context, context->ins->tag, request, response); + } + else { + result = process_payload_metrics_ng(context, context->ins->tag, request, response); + } + + send_response_ng(response, context->successful_response_code, NULL); + + return result; +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_prot.h b/plugins/in_prometheus_remote_write/prom_rw_prot.h new file mode 100644 index 00000000000..35f376884d5 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_prot.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_PROT +#define FLB_IN_PROM_RW_PROT + +#include + +int prom_rw_prot_handle(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +int prom_rw_prot_handle_error(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + + +int prom_rw_prot_handle_ng(struct flb_http_request *request, + struct flb_http_response *response); + +#endif From 2bae6dc18cfc7c688e1403d4eb6dafa89aabb518 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 21 May 2024 17:00:22 +0300 Subject: [PATCH 11/24] release: update to 3.0.5 (#8839) Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: edsiper <369718+edsiper@users.noreply.github.com> Signed-off-by: Markus Bergholz --- CMakeLists.txt | 2 +- dockerfiles/Dockerfile | 2 +- fluent-bit-3.0.4.bb => fluent-bit-3.0.5.bb | 2 +- snap/snapcraft.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename fluent-bit-3.0.4.bb => fluent-bit-3.0.5.bb (99%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4960b533aa8..41b213e71f2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 3) set(FLB_VERSION_MINOR 0) -set(FLB_VERSION_PATCH 4) +set(FLB_VERSION_PATCH 5) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index b1d505a1574..67a871ff8fa 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -11,7 +11,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=3.0.4 +ARG RELEASE_VERSION=3.0.5 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm as qemu-arm32 diff --git a/fluent-bit-3.0.4.bb b/fluent-bit-3.0.5.bb similarity index 99% rename from fluent-bit-3.0.4.bb rename to fluent-bit-3.0.5.bb index 84e9042e853..13607a10cf1 100644 --- a/fluent-bit-3.0.4.bb +++ b/fluent-bit-3.0.5.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "3.0.4" +PV = "3.0.5" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index a7fbff95c2f..19c59f6d669 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '3.0.4' +version: '3.0.5' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. From 89d92ba38035b1bc615b850f881469c8e1ade0a6 Mon Sep 17 00:00:00 2001 From: Pat Date: Wed, 22 May 2024 05:02:50 -0400 Subject: [PATCH 12/24] workflows: ensure only master releases are marked latest (#8856) Signed-off-by: Patrick Stephens Signed-off-by: Markus Bergholz --- .github/workflows/staging-release.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/staging-release.yaml b/.github/workflows/staging-release.yaml index 35bacc6952c..417887fa309 100644 --- a/.github/workflows/staging-release.yaml +++ b/.github/workflows/staging-release.yaml @@ -793,6 +793,7 @@ jobs: name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} target_commitish: '2.0' + make_latest: false - name: Release 2.1 - not latest uses: softprops/action-gh-release@v2 @@ -804,6 +805,7 @@ jobs: name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} target_commitish: '2.1' + make_latest: false - name: Release 2.2 - not latest uses: softprops/action-gh-release@v2 @@ -815,6 +817,7 @@ jobs: name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} target_commitish: '2.2' + make_latest: false - name: Release 3.0 and latest uses: softprops/action-gh-release@v2 @@ -825,6 +828,7 @@ jobs: generate_release_notes: true name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} + make_latest: true staging-release-windows-checksums: name: Get Windows checksums for new release From eae6dfc178816f5ba9753cffed628e0ae6b8499d Mon Sep 17 00:00:00 2001 From: Hiroyuki Hasebe <31443554+hiroyha1@users.noreply.github.com> Date: Thu, 15 Feb 2024 17:21:56 +0900 Subject: [PATCH 13/24] engine: free after log worker thread has been stopped Signed-off-by: Hiroyuki Hasebe <31443554+hiroyha1@users.noreply.github.com> Signed-off-by: Markus Bergholz --- src/flb_config.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/flb_config.c b/src/flb_config.c index 4afaf71c946..747d855cf08 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -390,14 +390,14 @@ void flb_config_exit(struct flb_config *config) struct mk_list *head; struct flb_cf *cf; - if (config->log_file) { - flb_free(config->log_file); - } - if (config->log) { flb_log_destroy(config->log, config); } + if (config->log_file) { + flb_free(config->log_file); + } + if (config->parsers_file) { flb_free(config->parsers_file); } From be88d252192d57da0d50298ce490bdd831ef5648 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 May 2024 12:18:25 -0600 Subject: [PATCH 14/24] out_splunk: reduce noise and fix hec_token handling (fix #8859) The following patch perform 2 changes in the code that helps to fix the problems found with Splunk hec token handling: 1. In the recent PR #8793, when using the record accessor API flb_ra_translate_check() to validate if the hec_token field exists, leads to noisy log messages since that function warns the issue if the field is not found. Most of users are not using hec_token set by Splunk input plugin, so their logging gets noisy. This patch replaces that call with flb_ra_translate() which fixes the problem. 2. If hec_token was set in the record metadata, it was being store in the main context of the plugin, however the flush callbacks that formats and deliver the data runs in separate/parallel threads that could lead to a race condition if more than onen thread tries to manipulate the value. This patch adds protection to the context value so it becomes thread safe. Signed-off-by: Eduardo Silva Signed-off-by: Markus Bergholz --- plugins/out_splunk/splunk.c | 68 +++++++++++++++++++++++++------- plugins/out_splunk/splunk.h | 2 + plugins/out_splunk/splunk_conf.c | 6 +++ 3 files changed, 61 insertions(+), 15 deletions(-) diff --git a/plugins/out_splunk/splunk.c b/plugins/out_splunk/splunk.c index 6c25b5b4ac6..c392b83687a 100644 --- a/plugins/out_splunk/splunk.c +++ b/plugins/out_splunk/splunk.c @@ -354,12 +354,27 @@ static flb_sds_t extract_hec_token(struct flb_splunk *ctx, msgpack_object map, /* Extract HEC token (map which is from metadata lookup) */ if (ctx->metadata_auth_key) { - hec_token = flb_ra_translate_check(ctx->ra_metadata_auth_key, tag, tag_len, - map, NULL, FLB_TRUE); - if (hec_token) { + hec_token = flb_ra_translate(ctx->ra_metadata_auth_key, tag, tag_len, + map, NULL); + /* + * record accessor translation can return an empty string buffer if the + * translation was not successfull or the value was not found. We consider + * a valid token any string which length is greater than 0. + * + * note: flb_ra_translate_check() is not used here because it will print + * an error message if the translation fails: + * + * ref: https://github.com/fluent/fluent-bit/issues/8859 + */ + if (hec_token && flb_sds_len(hec_token) > 0) { return hec_token; } + /* destroy empty string */ + if (hec_token) { + flb_sds_destroy(hec_token); + } + flb_plg_debug(ctx->ins, "Could not find hec_token in metadata"); return NULL; } @@ -368,21 +383,45 @@ static flb_sds_t extract_hec_token(struct flb_splunk *ctx, msgpack_object map, return NULL; } +static void set_metadata_auth_header(struct flb_splunk *ctx, flb_sds_t hec_token) +{ + pthread_mutex_lock(&ctx->mutex_hec_token); + + if (ctx->metadata_auth_header != NULL) { + flb_sds_destroy(ctx->metadata_auth_header); + } + ctx->metadata_auth_header = hec_token; + + pthread_mutex_unlock(&ctx->mutex_hec_token); +} + +static flb_sds_t get_metadata_auth_header(struct flb_splunk *ctx) +{ + flb_sds_t auth_header = NULL; + + pthread_mutex_lock(&ctx->mutex_hec_token); + auth_header = flb_sds_create(ctx->metadata_auth_header); + pthread_mutex_unlock(&ctx->mutex_hec_token); + + return auth_header; +} + static inline int splunk_format(const void *in_buf, size_t in_bytes, char *tag, int tag_len, char **out_buf, size_t *out_size, struct flb_splunk *ctx) { int ret; + char *err; msgpack_object map; msgpack_object metadata; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; - char *err; flb_sds_t tmp; flb_sds_t record; flb_sds_t json_out; flb_sds_t metadata_hec_token = NULL; + struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; @@ -403,8 +442,6 @@ static inline int splunk_format(const void *in_buf, size_t in_bytes, return -1; } - ctx->metadata_auth_header = NULL; - while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { @@ -422,10 +459,7 @@ static inline int splunk_format(const void *in_buf, size_t in_bytes, * specify only one splunk token per one instance. * So, it should be valid if storing only last value of * splunk token per one chunk. */ - if (ctx->metadata_auth_header != NULL) { - cfl_sds_destroy(ctx->metadata_auth_header); - } - ctx->metadata_auth_header = metadata_hec_token; + set_metadata_auth_header(ctx, metadata_hec_token); } if (ctx->event_key) { @@ -598,6 +632,7 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, size_t payload_size; (void) i_ins; (void) config; + flb_sds_t metadata_auth_header = NULL; /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); @@ -677,6 +712,8 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, flb_http_buffer_size(c, resp_size); } + metadata_auth_header = get_metadata_auth_header(ctx); + /* HTTP Client */ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); @@ -684,9 +721,10 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, if (ctx->http_user && ctx->http_passwd) { flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); } - else if (ctx->metadata_auth_header) { + else if (metadata_auth_header) { flb_http_add_header(c, "Authorization", 13, - ctx->metadata_auth_header, flb_sds_len(ctx->metadata_auth_header)); + metadata_auth_header, + flb_sds_len(metadata_auth_header)); } else if (ctx->auth_header) { flb_http_add_header(c, "Authorization", 13, @@ -754,10 +792,10 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, flb_sds_destroy(buf_data); } - /* Cleanup */ - if (ctx->metadata_auth_header != NULL) { - cfl_sds_destroy(ctx->metadata_auth_header); + if (metadata_auth_header) { + flb_sds_destroy(metadata_auth_header); } + flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(ret); diff --git a/plugins/out_splunk/splunk.h b/plugins/out_splunk/splunk.h index eb64d2d57e2..4c71a0ec356 100644 --- a/plugins/out_splunk/splunk.h +++ b/plugins/out_splunk/splunk.h @@ -120,6 +120,8 @@ struct flb_splunk { /* Plugin instance */ struct flb_output_instance *ins; + + pthread_mutex_t mutex_hec_token; }; #endif diff --git a/plugins/out_splunk/splunk_conf.c b/plugins/out_splunk/splunk_conf.c index 2cc78280883..d15d2edc0ec 100644 --- a/plugins/out_splunk/splunk_conf.c +++ b/plugins/out_splunk/splunk_conf.c @@ -262,6 +262,8 @@ struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, } } + pthread_mutex_init(&ctx->mutex_hec_token, NULL); + /* Currently, Splunk HEC token is stored in a fixed key, hec_token. */ ctx->metadata_auth_key = "$hec_token"; if (ctx->metadata_auth_key) { @@ -325,6 +327,10 @@ int flb_splunk_conf_destroy(struct flb_splunk *ctx) flb_ra_destroy(ctx->ra_metadata_auth_key); } + if (ctx->metadata_auth_header) { + flb_sds_destroy(ctx->metadata_auth_header); + } + event_fields_destroy(ctx); flb_free(ctx); From e3fed00e82b1c046ce101e6cdaa0deb9f0ae8f80 Mon Sep 17 00:00:00 2001 From: Braydon Kains <93549768+braydonk@users.noreply.github.com> Date: Thu, 23 May 2024 17:06:53 -0400 Subject: [PATCH 15/24] in_winevtlog: change total_size_threshold to size_t (#8853) The `total_size_threshold` field was originally an unsigned int. The field in the config map uses `FLB_CONFIG_MAP_SIZE`, which reads the value as a `size_t` and then writes it to the offset of the field in the struct. On a 64-bit machine, this is 8 bytes. This means the 8 byte value would be written at this offset, as a result overriding the value of the next struct field. This would cause `string_inserts` with default values to end up being `false` when it should be `true`. This PR changes `total_size_threshold` to a `size_t`. This seems consistent with other usages of `FLB_CONFIG_MAP_SIZE` that I was able to find. Signed-off-by: braydonk Signed-off-by: Markus Bergholz --- plugins/in_winevtlog/winevtlog.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_winevtlog/winevtlog.h b/plugins/in_winevtlog/winevtlog.h index 10ef3e457e7..20b5749d6a3 100644 --- a/plugins/in_winevtlog/winevtlog.h +++ b/plugins/in_winevtlog/winevtlog.h @@ -27,7 +27,7 @@ struct winevtlog_config { unsigned int interval_sec; unsigned int interval_nsec; - unsigned int total_size_threshold; + size_t total_size_threshold; int string_inserts; int read_existing_events; int render_event_as_xml; From 4c45c018a71ee2a7cfbaf6700b22fafb2ddbab7b Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Tue, 21 May 2024 14:32:53 -0300 Subject: [PATCH 16/24] output: Fix handling of metrics in output processor This patch passes an cmt_out_context pointer so that output processors can modify metrics. Signed-off-by: Thiago Padilha Signed-off-by: Markus Bergholz --- include/fluent-bit/flb_output.h | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 3fab2576d62..eab0c983701 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -627,6 +627,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, struct cmt *metrics_context; struct ctrace *trace_context; size_t chunk_offset; + struct cmt *cmt_out_context = NULL; /* Custom output coroutine info */ out_flush = (struct flb_output_flush *) flb_calloc(1, sizeof(struct flb_output_flush)); @@ -715,13 +716,25 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, flb_sds_len(evc->tag), (char *) metrics_context, 0, - NULL, + (void **)&cmt_out_context, NULL); if (ret == 0) { - ret = cmt_encode_msgpack_create(metrics_context, - &serialized_context_buffer, - &serialized_context_size); + if (cmt_out_context != NULL) { + ret = cmt_encode_msgpack_create(cmt_out_context, + &serialized_context_buffer, + &serialized_context_size); + + if (cmt_out_context != metrics_context) { + cmt_destroy(cmt_out_context); + } + + } + else { + ret = cmt_encode_msgpack_create(metrics_context, + &serialized_context_buffer, + &serialized_context_size); + } cmt_destroy(metrics_context); From 222bc1de0fd274d734bbbbb69399053fb0f19359 Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Wed, 22 May 2024 07:55:00 -0300 Subject: [PATCH 17/24] tests: runtime: processor_metrics_selector: check if works as an output processor Signed-off-by: Thiago Padilha Signed-off-by: Markus Bergholz --- tests/runtime/processor_metrics_selector.c | 65 ++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/runtime/processor_metrics_selector.c b/tests/runtime/processor_metrics_selector.c index f25e9751a58..78f64e83b14 100644 --- a/tests/runtime/processor_metrics_selector.c +++ b/tests/runtime/processor_metrics_selector.c @@ -549,6 +549,70 @@ void flb_test_selector_substring_exclude(void) flb_destroy(ctx); } +void flb_test_selector_can_modify_output(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/kubernetes/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "event_type", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "type", "metrics", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "interval_sec", "1", NULL); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, "match", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set(ctx, out_ffd, "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_output_set_processor(ctx, out_ffd, proc); + TEST_CHECK(ret == 0); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); +} #endif /* Test list */ @@ -560,6 +624,7 @@ TEST_LIST = { {"prefix_exclude", flb_test_selector_prefix_exclude}, {"substring_include", flb_test_selector_substring_include}, {"substring_exclude", flb_test_selector_substring_exclude}, + {"can_modify_output", flb_test_selector_can_modify_output}, #endif {NULL, NULL} }; From daf912cefcffd50e7b217c5862671781517b28e5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 24 May 2024 11:52:39 +0300 Subject: [PATCH 18/24] release: update to 3.0.6 (#8865) Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: edsiper <369718+edsiper@users.noreply.github.com> Signed-off-by: Markus Bergholz --- CMakeLists.txt | 2 +- dockerfiles/Dockerfile | 2 +- fluent-bit-3.0.5.bb => fluent-bit-3.0.6.bb | 2 +- snap/snapcraft.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename fluent-bit-3.0.5.bb => fluent-bit-3.0.6.bb (99%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 41b213e71f2..1d827275454 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 3) set(FLB_VERSION_MINOR 0) -set(FLB_VERSION_PATCH 5) +set(FLB_VERSION_PATCH 6) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index 67a871ff8fa..7499f845f28 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -11,7 +11,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=3.0.5 +ARG RELEASE_VERSION=3.0.6 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm as qemu-arm32 diff --git a/fluent-bit-3.0.5.bb b/fluent-bit-3.0.6.bb similarity index 99% rename from fluent-bit-3.0.5.bb rename to fluent-bit-3.0.6.bb index 13607a10cf1..99c998978c8 100644 --- a/fluent-bit-3.0.5.bb +++ b/fluent-bit-3.0.6.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "3.0.5" +PV = "3.0.6" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index 19c59f6d669..fb19f13bbaa 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '3.0.5' +version: '3.0.6' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. From ba5c28c1ba28b29a6b8f34093208fec9f38bf819 Mon Sep 17 00:00:00 2001 From: Markus Bergholz Date: Fri, 24 May 2024 11:31:22 +0200 Subject: [PATCH 19/24] build: add build support for ubuntu 24.04 noble numbat (#8796) Signed-off-by: Markus Bergholz --- packaging/build-config.json | 8 ++++++++ packaging/distros/ubuntu/Dockerfile | 26 ++++++++++++++++++++++++++ packaging/update-repos.sh | 1 + 3 files changed, 35 insertions(+) diff --git a/packaging/build-config.json b/packaging/build-config.json index e276f34858f..27b5d2e8671 100644 --- a/packaging/build-config.json +++ b/packaging/build-config.json @@ -92,6 +92,14 @@ "target": "ubuntu/22.04.arm64v8", "type": "deb" }, + { + "target": "ubuntu/24.04", + "type": "deb" + }, + { + "target": "ubuntu/24.04.arm64v8", + "type": "deb" + }, { "target": "raspbian/buster", "type": "deb" diff --git a/packaging/distros/ubuntu/Dockerfile b/packaging/distros/ubuntu/Dockerfile index 40272f8fcd7..2ca0704580e 100644 --- a/packaging/distros/ubuntu/Dockerfile +++ b/packaging/distros/ubuntu/Dockerfile @@ -114,6 +114,32 @@ ENV DEBIAN_FRONTEND noninteractive COPY --from=multiarch-aarch64 /usr/bin/qemu-aarch64-static /usr/bin/qemu-aarch64-static +# hadolint ignore=DL3008,DL3015 +RUN apt-get update && \ + apt-get install -y curl ca-certificates build-essential libsystemd-dev \ + cmake make bash wget unzip nano vim valgrind dh-make flex bison \ + libpq-dev postgresql-server-dev-all libpq5 \ + libsasl2-2 libsasl2-dev openssl libssl-dev libssl3 libyaml-dev pkg-config zlib1g-dev && \ + apt-get install -y --reinstall lsb-base lsb-release + + # ubuntu/24.04 base image +FROM ubuntu:24.04 as ubuntu-24.04-base +ENV DEBIAN_FRONTEND noninteractive + +# hadolint ignore=DL3008,DL3015 +RUN apt-get update && \ + apt-get install -y curl ca-certificates build-essential libsystemd-dev \ + cmake make bash wget unzip nano vim valgrind dh-make flex bison \ + libpq-dev postgresql-server-dev-all libpq5 \ + libsasl2-2 libsasl2-dev openssl libssl-dev libssl3 libyaml-dev pkg-config zlib1g-dev && \ + apt-get install -y --reinstall lsb-base lsb-release + +# ubuntu/24.04.arm64v8 base image +FROM arm64v8/ubuntu:24.04 as ubuntu-24.04.arm64v8-base +ENV DEBIAN_FRONTEND noninteractive + +COPY --from=multiarch-aarch64 /usr/bin/qemu-aarch64-static /usr/bin/qemu-aarch64-static + # hadolint ignore=DL3008,DL3015 RUN apt-get update && \ apt-get install -y curl ca-certificates build-essential libsystemd-dev \ diff --git a/packaging/update-repos.sh b/packaging/update-repos.sh index a4d711e950c..ea7acacff7c 100755 --- a/packaging/update-repos.sh +++ b/packaging/update-repos.sh @@ -54,6 +54,7 @@ DEB_REPO_PATHS=( "debian/bookworm" "ubuntu/bionic" "ubuntu/focal" "ubuntu/jammy" + "ubuntu/noble" "raspbian/buster" "raspbian/bullseye" ) From 2f9aa33345a2c1711fe896e63ca7f5f596e668ce Mon Sep 17 00:00:00 2001 From: shuaichen Date: Fri, 24 May 2024 06:28:47 -0700 Subject: [PATCH 20/24] stackdriver: Support writing to textPayload field of Cloud Logging LogEntry. (#8850) Write payload to textPayload field of LogEntry if the text_payload_key is string format and the only field after stripping special fields. Signed-off-by: shuaichen Signed-off-by: Markus Bergholz --- plugins/out_stackdriver/stackdriver.c | 102 ++++--- plugins/out_stackdriver/stackdriver.h | 3 + .../stackdriver/stackdriver_test_payload.h | 26 ++ tests/runtime/out_stackdriver.c | 249 +++++++++++++++++- 4 files changed, 345 insertions(+), 35 deletions(-) create mode 100644 tests/runtime/data/stackdriver/stackdriver_test_payload.h diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 503fc9e68bd..b21f5b9476c 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -372,7 +372,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) if (time(NULL) >= cached_expiration) { return output; } else { - /* + /* * Cached token is expired. Wait on lock to use up-to-date token * by either waiting for it to be refreshed or refresh it ourselves. */ @@ -1068,7 +1068,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx, if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, + msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string)); msgpack_pack_str_body(mp_pck, rval->val.string, @@ -1082,7 +1082,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx, } else { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, + msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val)); msgpack_pack_str_body(mp_pck, label_kv->val, @@ -1284,7 +1284,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, return -1; } - if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE + if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) { ret = gce_metadata_read_zone(ctx); if (ret == -1) { @@ -1434,13 +1434,13 @@ static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * s { msgpack_object tmp; int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN); - + if (ret == 0 && tmp.via.boolean == true) { *trace_sampled_value = FLB_TRUE; return 0; } else if (ret == 0 && tmp.via.boolean == false) { *trace_sampled_value = FLB_FALSE; - return 0; + return 0; } return -1; @@ -1476,15 +1476,16 @@ static insert_id_status validate_insert_id(msgpack_object * insert_id_value, return ret; } -static int pack_json_payload(int insert_id_extracted, - int operation_extracted, int operation_extra_size, - int source_location_extracted, - int source_location_extra_size, - int http_request_extracted, - int http_request_extra_size, - timestamp_status tms_status, - msgpack_packer *mp_pck, msgpack_object *obj, - struct flb_stackdriver *ctx) +static int pack_payload(int insert_id_extracted, + int operation_extracted, + int operation_extra_size, + int source_location_extracted, + int source_location_extra_size, + int http_request_extracted, + int http_request_extra_size, + timestamp_status tms_status, + msgpack_packer *mp_pck, msgpack_object *obj, + struct flb_stackdriver *ctx) { /* Specified fields include local_resource_id, operation, sourceLocation ... */ int i, j; @@ -1495,10 +1496,14 @@ static int pack_json_payload(int insert_id_extracted, int len; int len_to_be_removed; int key_not_found; + int text_payload_len = 0; + int is_string_text_payload = FLB_FALSE; + int write_to_textpayload_field = FLB_FALSE; flb_sds_t removed; flb_sds_t monitored_resource_key; flb_sds_t local_resource_id_key; flb_sds_t stream; + flb_sds_t text_payload = NULL; msgpack_object_kv *kv = obj->via.map.ptr; msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; @@ -1565,14 +1570,36 @@ static int pack_json_payload(int insert_id_extracted, new_map_size = map_size - to_remove; - ret = msgpack_pack_map(mp_pck, new_map_size); - if (ret < 0) { - goto error; + if (ctx->text_payload_key && get_string(&text_payload, obj, ctx->text_payload_key) == 0) { + is_string_text_payload = FLB_TRUE; + } + + /* write to textPayload if text_payload_key is the only residual string field*/ + if ((new_map_size == 1) && is_string_text_payload) { + write_to_textpayload_field = FLB_TRUE; + } + + if (write_to_textpayload_field) { + msgpack_pack_str(mp_pck, 11); + msgpack_pack_str_body(mp_pck, "textPayload", 11); + + text_payload_len = flb_sds_len(text_payload); + msgpack_pack_str(mp_pck, text_payload_len); + msgpack_pack_str_body(mp_pck, text_payload, text_payload_len); + } else { + /* jsonPayload */ + msgpack_pack_str(mp_pck, 11); + msgpack_pack_str_body(mp_pck, "jsonPayload", 11); + + ret = msgpack_pack_map(mp_pck, new_map_size); + if (ret < 0) { + goto error; + } } /* points back to the beginning of map */ kv = obj->via.map.ptr; - for(; kv != kvend; ++kv ) { + for(; kv != kvend; ++kv) { key_not_found = 1; /* processing logging.googleapis.com/insertId */ @@ -1639,7 +1666,8 @@ static int pack_json_payload(int insert_id_extracted, } } - if (key_not_found) { + /* write residual log fields to jsonPayload */ + if (key_not_found && !write_to_textpayload_field) { ret = msgpack_pack_object(mp_pck, kv->key); if (ret < 0) { goto error; @@ -1654,12 +1682,14 @@ static int pack_json_payload(int insert_id_extracted, flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); + flb_sds_destroy(text_payload); return 0; error: flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); + flb_sds_destroy(text_payload); return ret; } @@ -1821,7 +1851,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, msgpack_pack_str_body(&mp_pck, "labels", 6); ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes); - if (ret != 0) { + if (ret != 0) { if (ctx->resource_type == RESOURCE_TYPE_K8S) { ret = extract_local_resource_id(data, bytes, ctx, tag); if (ret != 0) { @@ -2314,7 +2344,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, /* Extract httpRequest */ init_http_request(&http_request); http_request_extra_size = 0; - http_request_extracted = extract_http_request(&http_request, + http_request_extracted = extract_http_request(&http_request, ctx->http_request_key, ctx->http_request_key_size, obj, &http_request_extra_size); @@ -2432,17 +2462,16 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, flb_sds_destroy(source_location_function); destroy_http_request(&http_request); - /* jsonPayload */ - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - pack_json_payload(insert_id_extracted, - operation_extracted, operation_extra_size, - source_location_extracted, - source_location_extra_size, - http_request_extracted, - http_request_extra_size, - tms_status, - &mp_pck, obj, ctx); + /* both textPayload and jsonPayload are supported */ + pack_payload(insert_id_extracted, + operation_extracted, + operation_extra_size, + source_location_extracted, + source_location_extra_size, + http_request_extracted, + http_request_extra_size, + tms_status, + &mp_pck, obj, ctx); /* avoid modifying the original tag */ newtag = tag; @@ -2594,7 +2623,7 @@ static void update_retry_metric(struct flb_stackdriver *ctx, uint64_t ts, int http_status) { - char tmp[32]; + char tmp[32]; char *name = (char *) flb_output_name(ctx->ins); /* convert status to string format */ @@ -3154,6 +3183,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels), "Set the resource labels" }, + { + FLB_CONFIG_MAP_STR, "text_payload_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_stackdriver, text_payload_key), + "Set key for extracting text payload" + }, { FLB_CONFIG_MAP_BOOL, "test_log_entry_format", "false", 0, FLB_TRUE, offsetof(struct flb_stackdriver, test_log_entry_format), diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 2a645c16402..76f5a7598ea 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -208,6 +208,9 @@ struct flb_stackdriver { /* upstream context for metadata end-point */ struct flb_upstream *metadata_u; + /* the key to extract unstructured text payload from */ + flb_sds_t text_payload_key; + #ifdef FLB_HAVE_METRICS /* metrics */ struct cmt_counter *cmt_successful_requests; diff --git a/tests/runtime/data/stackdriver/stackdriver_test_payload.h b/tests/runtime/data/stackdriver/stackdriver_test_payload.h new file mode 100644 index 00000000000..75b771f449e --- /dev/null +++ b/tests/runtime/data/stackdriver/stackdriver_test_payload.h @@ -0,0 +1,26 @@ +#define STRING_TEXT_PAYLOAD "[" \ + "1595349600," \ + "{" \ + "\"message\": \"The application errored out\"," \ + "\"logging.googleapis.com/severity\": \"ERROR\"" \ + "}]" + +#define STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \ + "1595349600," \ + "{" \ + "\"message\": \"The application errored out\"," \ + "\"logging.googleapis.com/severity\": \"ERROR\"," \ + "\"errorCode\": \"400\"" \ + "}]" + +#define NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \ + "1595349600," \ + "{" \ + "\"message\": " \ + "{" \ + "\"application_name\": \"my_application\"," \ + "\"error_message\": \"The application errored out\"," \ + "}," \ + "\"logging.googleapis.com/severity\": \"ERROR\"," \ + "\"errorCode\": \"400\"" \ + "}]" diff --git a/tests/runtime/out_stackdriver.c b/tests/runtime/out_stackdriver.c index 585e734478d..2bd79f31aaf 100644 --- a/tests/runtime/out_stackdriver.c +++ b/tests/runtime/out_stackdriver.c @@ -44,7 +44,7 @@ #include "data/stackdriver/stackdriver_test_http_request.h" #include "data/stackdriver/stackdriver_test_timestamp.h" #include "data/stackdriver/stackdriver_test_monitored_resource.h" - +#include "data/stackdriver/stackdriver_test_payload.h" /* * Fluent Bit Stackdriver plugin, always set as payload a JSON strings contained in a @@ -2292,6 +2292,85 @@ static void cb_check_timestamp_format_duo_fields_incorrect_type(void *ctx, int f flb_sds_destroy(res_data); } +static void cb_check_string_text_payload_with_matched_text_payload_key(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to textPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['textPayload']", "The application errored out"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_string_text_payload_with_mismatched_text_payload_key(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_string_text_payload_with_residual_fields(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['errorCode']", "400"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_non_scalar_payload_with_residual_fields(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['application_name']", "my_application"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['errorCode']", "400"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + void flb_test_monitored_resource_common() { int ret; @@ -6294,6 +6373,170 @@ void flb_test_timestamp_format_duo_fields_incorrect_type() flb_destroy(ctx); } +void flb_test_string_text_payload_with_matched_text_payload_key() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_matched_text_payload_key, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_string_text_payload_with_mismatched_text_payload_key() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "msg", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_mismatched_text_payload_key, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_string_text_payload_with_residual_fields() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_residual_fields, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_non_scalar_payload_with_residual_fields() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_non_scalar_payload_with_residual_fields, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"severity_multi_entries", flb_test_multi_entries_severity }, @@ -6424,5 +6667,9 @@ TEST_LIST = { {"timestamp_format_duo_fields_missing_nanos", flb_test_timestamp_format_duo_fields_missing_nanos}, {"timestamp_format_duo_fields_incorrect_type", flb_test_timestamp_format_duo_fields_incorrect_type}, + {"string_text_payload_with_matched_text_payload_key", flb_test_string_text_payload_with_matched_text_payload_key}, + {"string_text_payload_with_mismatched_text_payload_key", flb_test_string_text_payload_with_mismatched_text_payload_key}, + {"string_text_payload_with_residual_fields", flb_test_string_text_payload_with_residual_fields}, + {"non_scalar_payload_with_residual_fields", flb_test_non_scalar_payload_with_residual_fields}, {NULL, NULL} }; From ffcdd21bf634922c73926598717bab0b0c1b8a44 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 May 2024 07:55:44 -0600 Subject: [PATCH 21/24] out_splunk: fix metadata hec_token bug that overrides default splunk_token behavior (fix #8867) In the previous Splunk metadata fix, I introduced an issue where the metadata auth header was always set even if the metadata was not there, the code was generating an emptry string which leads to skip the classic splunk_token auth mechanism. This patch corrects the recent bug by validating first and returning a proper NULL when metadata auth header (hec_token) is not there. Signed-off-by: Eduardo Silva Signed-off-by: Markus Bergholz --- plugins/out_splunk/splunk.c | 14 ++++++++++++-- plugins/out_splunk/splunk_conf.c | 1 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/plugins/out_splunk/splunk.c b/plugins/out_splunk/splunk.c index c392b83687a..4ec93aff477 100644 --- a/plugins/out_splunk/splunk.c +++ b/plugins/out_splunk/splunk.c @@ -400,7 +400,11 @@ static flb_sds_t get_metadata_auth_header(struct flb_splunk *ctx) flb_sds_t auth_header = NULL; pthread_mutex_lock(&ctx->mutex_hec_token); - auth_header = flb_sds_create(ctx->metadata_auth_header); + + if (ctx->metadata_auth_header) { + auth_header = flb_sds_create(ctx->metadata_auth_header); + } + pthread_mutex_unlock(&ctx->mutex_hec_token); return auth_header; @@ -717,7 +721,13 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, /* HTTP Client */ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - /* Try to use http_user and http_passwd if not, fallback to auth_header */ + /* + * Authentication mechanism & order: + * + * 1. use the configure `http_user` and `http_passwd` + * 2. use metadata 'hec_token', if the records are generated by Splunk input plugin, this will be set. + * 3. use the configured `splunk_token` (if set). + */ if (ctx->http_user && ctx->http_passwd) { flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); } diff --git a/plugins/out_splunk/splunk_conf.c b/plugins/out_splunk/splunk_conf.c index d15d2edc0ec..43ccdc1879c 100644 --- a/plugins/out_splunk/splunk_conf.c +++ b/plugins/out_splunk/splunk_conf.c @@ -241,6 +241,7 @@ struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, } ctx->metadata_auth_header = NULL; + /* No http_user is set, fallback to splunk_token, if splunk_token is unset, fail. */ if (!ctx->http_user) { /* Splunk Auth Token */ From 58a70e21bf54e09fec7a856fba851d7756120b9e Mon Sep 17 00:00:00 2001 From: Holger Freyther Date: Mon, 27 May 2024 04:17:34 +0800 Subject: [PATCH 22/24] core: fix crash when running dry test mode (#8872) The tls variable for out_flush_params is not initialized as the flb_start function is not called during the dry run. Call flb_init directly and then shutdown the engine. configuration test is successful ================================================================= ==63633==ERROR: AddressSanitizer: attempting free on address which was not malloc()-ed: 0x0001f71b3ac0 in thread T0 #0 0x103c9f260 in wrap_free+0x98 (libclang_rt.asan_osx_dynamic.dylib:arm64e+0x53260) #1 0x100179d9c in flb_free flb_mem.h:127 #2 0x10017f4a0 in flb_output_exit flb_output.c:481 #3 0x1001cb038 in flb_engine_shutdown flb_engine.c:1119 #4 0x10010d45c in flb_destroy flb_lib.c:240 #5 0x100008c40 in flb_main fluent-bit.c:1348 #6 0x10000c644 in main fluent-bit.c:1456 #7 0x18f11e0dc () frame #6: 0x000000010017f4a4 fluent-bit`flb_output_exit(config=0x0000000102b00200) at flb_output.c:481:9 478 479 params = FLB_TLS_GET(out_flush_params); 480 if (params) { -> 481 flb_free(params); 482 } 483 } Signed-off-by: Holger Hans Peter Freyther Signed-off-by: Markus Bergholz --- src/fluent-bit.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 6e41b8619b3..d35e034ed20 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -1344,6 +1344,7 @@ int flb_main(int argc, char **argv) if (config->dry_run == FLB_TRUE) { fprintf(stderr, "configuration test is successful\n"); + flb_init_env(); flb_cf_destroy(cf_opts); flb_destroy(ctx); exit(EXIT_SUCCESS); From dc18ba022db71850016690010c779745cda97df7 Mon Sep 17 00:00:00 2001 From: edsiper <369718+edsiper@users.noreply.github.com> Date: Mon, 27 May 2024 01:47:57 +0000 Subject: [PATCH 23/24] release: update to 3.0.7 Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Signed-off-by: Markus Bergholz --- CMakeLists.txt | 2 +- dockerfiles/Dockerfile | 2 +- fluent-bit-3.0.6.bb => fluent-bit-3.0.7.bb | 2 +- snap/snapcraft.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename fluent-bit-3.0.6.bb => fluent-bit-3.0.7.bb (99%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d827275454..2879659cb62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 3) set(FLB_VERSION_MINOR 0) -set(FLB_VERSION_PATCH 6) +set(FLB_VERSION_PATCH 7) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index 7499f845f28..367b460fbf5 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -11,7 +11,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=3.0.6 +ARG RELEASE_VERSION=3.0.7 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm as qemu-arm32 diff --git a/fluent-bit-3.0.6.bb b/fluent-bit-3.0.7.bb similarity index 99% rename from fluent-bit-3.0.6.bb rename to fluent-bit-3.0.7.bb index 99c998978c8..5f52bb725a5 100644 --- a/fluent-bit-3.0.6.bb +++ b/fluent-bit-3.0.7.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "3.0.6" +PV = "3.0.7" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index fb19f13bbaa..cd9aa78b38b 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '3.0.6' +version: '3.0.7' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. From 9049fc62517d5b30e6ef240a6f329b42848a6337 Mon Sep 17 00:00:00 2001 From: Markus Bergholz Date: Wed, 29 May 2024 09:04:17 +0200 Subject: [PATCH 24/24] docs: Add Ubuntu 24.04 Noble Numbat Signed-off-by: Markus Bergholz --- packaging/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packaging/README.md b/packaging/README.md index 70361050cf7..fe1ddea17bc 100644 --- a/packaging/README.md +++ b/packaging/README.md @@ -27,6 +27,8 @@ The [`distros`](./distros/) directory contains OCI container definitions used to | Debian | 11 | arm64v8 | debian/bullseye.arm64v8 | | Debian | 10 | x86_64 | debian/buster | | Debian | 10 | arm64v8 | debian/buster.arm64v8 | +| Ubuntu | 24.04 / Noble Numbat | x86_64 | ubuntu/24.04 | +| Ubuntu | 24.04 / Noble Numbat | arm64v8 | ubuntu/24.04.arm64v8 | | Ubuntu | 22.04 / Jammy Jellyfish | x86_64 | ubuntu/22.04 | | Ubuntu | 22.04 / Jammy Jellyfish | arm64v8 | ubuntu/22.04.arm64v8 | | Ubuntu | 20.04 / Focal Fossa | x86_64 | ubuntu/20.04 |