diff --git a/.github/workflows/cron-scorecards-analysis.yaml b/.github/workflows/cron-scorecards-analysis.yaml index 5e234f2dcfa..20b31d8b112 100644 --- a/.github/workflows/cron-scorecards-analysis.yaml +++ b/.github/workflows/cron-scorecards-analysis.yaml @@ -29,7 +29,7 @@ jobs: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@0864cf19026789058feabb7e87baa5f140aac736 + uses: ossf/scorecard-action@dc50aa9510b46c811795eb24b2f1ba02a914e534 with: results_file: scorecard-results.sarif results_format: sarif diff --git a/.github/workflows/staging-release.yaml b/.github/workflows/staging-release.yaml index 35bacc6952c..417887fa309 100644 --- a/.github/workflows/staging-release.yaml +++ b/.github/workflows/staging-release.yaml @@ -793,6 +793,7 @@ jobs: name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} target_commitish: '2.0' + make_latest: false - name: Release 2.1 - not latest uses: softprops/action-gh-release@v2 @@ -804,6 +805,7 @@ jobs: name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} target_commitish: '2.1' + make_latest: false - name: Release 2.2 - not latest uses: softprops/action-gh-release@v2 @@ -815,6 +817,7 @@ jobs: name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} target_commitish: '2.2' + make_latest: false - name: Release 3.0 and latest uses: softprops/action-gh-release@v2 @@ -825,6 +828,7 @@ jobs: generate_release_notes: true name: "Fluent Bit ${{ inputs.version }}" tag_name: v${{ inputs.version }} + make_latest: true staging-release-windows-checksums: name: Get Windows checksums for new release diff --git a/CMakeLists.txt b/CMakeLists.txt index a901fbf36ed..2879659cb62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 3) set(FLB_VERSION_MINOR 0) -set(FLB_VERSION_PATCH 4) +set(FLB_VERSION_PATCH 7) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -209,6 +209,7 @@ option(FLB_IN_ELASTICSEARCH "Enable Elasticsearch (Bulk API) input pl option(FLB_IN_CALYPTIA_FLEET "Enable Calyptia Fleet input plugin" Yes) option(FLB_IN_SPLUNK "Enable Splunk HTTP HEC input plugin" Yes) option(FLB_IN_PROCESS_EXPORTER_METRICS "Enable process exporter metrics input plugin" Yes) +option(FLB_IN_PROMETHEUS_REMOTE_WRITE "Enable prometheus remote write input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_LOGS_INGESTION "Enable Azure Logs Ingestion output plugin" Yes) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 53a6775c2de..60b67bc1e99 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -61,6 +61,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_IN_PODMAN_METRICS No) set(FLB_IN_ELASTICSEARCH Yes) set(FLB_IN_SPLUNK Yes) + set(FLB_IN_PROMETHEUS_REMOTE_WRITE Yes) # OUTPUT plugins # ============== diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index b1d505a1574..367b460fbf5 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -11,7 +11,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=3.0.4 +ARG RELEASE_VERSION=3.0.7 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm as qemu-arm32 diff --git a/fluent-bit-3.0.4.bb b/fluent-bit-3.0.7.bb similarity index 99% rename from fluent-bit-3.0.4.bb rename to fluent-bit-3.0.7.bb index 84e9042e853..5f52bb725a5 100644 --- a/fluent-bit-3.0.4.bb +++ b/fluent-bit-3.0.7.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "3.0.4" +PV = "3.0.7" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 3fab2576d62..eab0c983701 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -627,6 +627,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, struct cmt *metrics_context; struct ctrace *trace_context; size_t chunk_offset; + struct cmt *cmt_out_context = NULL; /* Custom output coroutine info */ out_flush = (struct flb_output_flush *) flb_calloc(1, sizeof(struct flb_output_flush)); @@ -715,13 +716,25 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, flb_sds_len(evc->tag), (char *) metrics_context, 0, - NULL, + (void **)&cmt_out_context, NULL); if (ret == 0) { - ret = cmt_encode_msgpack_create(metrics_context, - &serialized_context_buffer, - &serialized_context_size); + if (cmt_out_context != NULL) { + ret = cmt_encode_msgpack_create(cmt_out_context, + &serialized_context_buffer, + &serialized_context_size); + + if (cmt_out_context != metrics_context) { + cmt_destroy(cmt_out_context); + } + + } + else { + ret = cmt_encode_msgpack_create(metrics_context, + &serialized_context_buffer, + &serialized_context_size); + } cmt_destroy(metrics_context); diff --git a/packaging/README.md b/packaging/README.md index 70361050cf7..fe1ddea17bc 100644 --- a/packaging/README.md +++ b/packaging/README.md @@ -27,6 +27,8 @@ The [`distros`](./distros/) directory contains OCI container definitions used to | Debian | 11 | arm64v8 | debian/bullseye.arm64v8 | | Debian | 10 | x86_64 | debian/buster | | Debian | 10 | arm64v8 | debian/buster.arm64v8 | +| Ubuntu | 24.04 / Noble Numbat | x86_64 | ubuntu/24.04 | +| Ubuntu | 24.04 / Noble Numbat | arm64v8 | ubuntu/24.04.arm64v8 | | Ubuntu | 22.04 / Jammy Jellyfish | x86_64 | ubuntu/22.04 | | Ubuntu | 22.04 / Jammy Jellyfish | arm64v8 | ubuntu/22.04.arm64v8 | | Ubuntu | 20.04 / Focal Fossa | x86_64 | ubuntu/20.04 | diff --git a/packaging/build-config.json b/packaging/build-config.json index e276f34858f..27b5d2e8671 100644 --- a/packaging/build-config.json +++ b/packaging/build-config.json @@ -92,6 +92,14 @@ "target": "ubuntu/22.04.arm64v8", "type": "deb" }, + { + "target": "ubuntu/24.04", + "type": "deb" + }, + { + "target": "ubuntu/24.04.arm64v8", + "type": "deb" + }, { "target": "raspbian/buster", "type": "deb" diff --git a/packaging/distros/ubuntu/Dockerfile b/packaging/distros/ubuntu/Dockerfile index 40272f8fcd7..2ca0704580e 100644 --- a/packaging/distros/ubuntu/Dockerfile +++ b/packaging/distros/ubuntu/Dockerfile @@ -114,6 +114,32 @@ ENV DEBIAN_FRONTEND noninteractive COPY --from=multiarch-aarch64 /usr/bin/qemu-aarch64-static /usr/bin/qemu-aarch64-static +# hadolint ignore=DL3008,DL3015 +RUN apt-get update && \ + apt-get install -y curl ca-certificates build-essential libsystemd-dev \ + cmake make bash wget unzip nano vim valgrind dh-make flex bison \ + libpq-dev postgresql-server-dev-all libpq5 \ + libsasl2-2 libsasl2-dev openssl libssl-dev libssl3 libyaml-dev pkg-config zlib1g-dev && \ + apt-get install -y --reinstall lsb-base lsb-release + + # ubuntu/24.04 base image +FROM ubuntu:24.04 as ubuntu-24.04-base +ENV DEBIAN_FRONTEND noninteractive + +# hadolint ignore=DL3008,DL3015 +RUN apt-get update && \ + apt-get install -y curl ca-certificates build-essential libsystemd-dev \ + cmake make bash wget unzip nano vim valgrind dh-make flex bison \ + libpq-dev postgresql-server-dev-all libpq5 \ + libsasl2-2 libsasl2-dev openssl libssl-dev libssl3 libyaml-dev pkg-config zlib1g-dev && \ + apt-get install -y --reinstall lsb-base lsb-release + +# ubuntu/24.04.arm64v8 base image +FROM arm64v8/ubuntu:24.04 as ubuntu-24.04.arm64v8-base +ENV DEBIAN_FRONTEND noninteractive + +COPY --from=multiarch-aarch64 /usr/bin/qemu-aarch64-static /usr/bin/qemu-aarch64-static + # hadolint ignore=DL3008,DL3015 RUN apt-get update && \ apt-get install -y curl ca-certificates build-essential libsystemd-dev \ diff --git a/packaging/update-repos.sh b/packaging/update-repos.sh index a4d711e950c..ea7acacff7c 100755 --- a/packaging/update-repos.sh +++ b/packaging/update-repos.sh @@ -54,6 +54,7 @@ DEB_REPO_PATHS=( "debian/bookworm" "ubuntu/bionic" "ubuntu/focal" "ubuntu/jammy" + "ubuntu/noble" "raspbian/buster" "raspbian/bullseye" ) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 1bd63924b71..9006ef6d823 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -229,6 +229,7 @@ REGISTER_IN_PLUGIN("in_opentelemetry") REGISTER_IN_PLUGIN("in_elasticsearch") REGISTER_IN_PLUGIN("in_calyptia_fleet") REGISTER_IN_PLUGIN("in_splunk") +REGISTER_IN_PLUGIN("in_prometheus_remote_write") # Test the event loop messaging when used in threaded mode REGISTER_IN_PLUGIN("in_event_test") diff --git a/plugins/filter_aws/aws.c b/plugins/filter_aws/aws.c index acb5b7d7af2..afe94a8a466 100644 --- a/plugins/filter_aws/aws.c +++ b/plugins/filter_aws/aws.c @@ -921,7 +921,7 @@ static int get_ec2_metadata(struct flb_filter_aws *ctx) metadata_fetched = FLB_FALSE; } - if (metadata_fetched) { + if (metadata_fetched == FLB_TRUE) { ctx->metadata_retrieved = FLB_TRUE; } diff --git a/plugins/in_prometheus_remote_write/CMakeLists.txt b/plugins/in_prometheus_remote_write/CMakeLists.txt new file mode 100644 index 00000000000..ee8ce703ae2 --- /dev/null +++ b/plugins/in_prometheus_remote_write/CMakeLists.txt @@ -0,0 +1,12 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "Prometheus remote write input plugin requires FLB_HTTP_SERVER=On.") +endif() + +set(src + prom_rw.c + prom_rw_prot.c + prom_rw_conn.c + prom_rw_config.c + ) + +FLB_PLUGIN(in_prometheus_remote_write "${src}" "monkey-core-static") diff --git a/plugins/in_prometheus_remote_write/prom_rw.c b/plugins/in_prometheus_remote_write/prom_rw.c new file mode 100644 index 00000000000..e2cb852815c --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw.c @@ -0,0 +1,250 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.in_in (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.in_in + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" +#include "prom_rw_prot.h" +#include "prom_rw_config.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int prom_rw_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct prom_remote_write_conn *conn; + struct flb_prom_remote_write *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); + + conn = prom_rw_conn_add(connection, ctx); + + if (conn == NULL) { + return -1; + } + + return 0; +} + +static int prom_rw_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_prom_remote_write *ctx; + + (void) data; + + /* Create context and basic conf */ + ctx = prom_rw_config_create(ins); + if (!ctx) { + return -1; + } + ctx->collector_id = -1; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + prom_rw_config_destroy(ctx); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + if (ctx->enable_http2) { + ret = flb_http_server_init(&ctx->http_server, + HTTP_PROTOCOL_AUTODETECT, + (FLB_HTTP_SERVER_FLAG_KEEPALIVE | FLB_HTTP_SERVER_FLAG_AUTO_INFLATE), + NULL, + ins->host.listen, + ins->host.port, + ins->tls, + ins->flags, + &ins->net_setup, + flb_input_event_loop_get(ins), + ins->config, + (void *) ctx); + + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not initialize http server on %s:%u. Aborting", + ins->host.listen, ins->host.port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + ret = flb_http_server_start(&ctx->http_server); + + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not start http server on %s:%u. Aborting", + ins->host.listen, ins->host.port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + ctx->http_server.request_callback = prom_rw_prot_handle_ng; + + flb_input_downstream_set(ctx->http_server.downstream, ctx->ins); + } + else { + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + ins->flags, + ctx->listen, + port, + ins->tls, + config, + &ins->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + flb_input_downstream_set(ctx->downstream, ctx->ins); + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + prom_rw_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + prom_rw_config_destroy(ctx); + return -1; + } + + ctx->collector_id = ret; + } + + flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port); + + if (ctx->successful_response_code != 200 && + ctx->successful_response_code != 201 && + ctx->successful_response_code != 204) { + flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201", + ctx->successful_response_code); + ctx->successful_response_code = 201; + } + + return 0; +} + +static int prom_rw_exit(void *data, struct flb_config *config) +{ + struct flb_prom_remote_write *ctx; + + (void) config; + + ctx = data; + + if (ctx != NULL) { + prom_rw_config_destroy(ctx); + } + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "http2", "true", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, enable_http2), + NULL + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_max_size), + "" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_chunk_size), + "" + }, + + { + FLB_CONFIG_MAP_STR, "uri", NULL, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, uri), + "Specify an optional HTTP URI for the target web server, e.g: /something" + }, + + { + FLB_CONFIG_MAP_BOOL, "tag_from_uri", "true", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, tag_from_uri), + "If true, tag will be created from uri. e.g. v1_metrics from /v1/metrics ." + }, + { + FLB_CONFIG_MAP_INT, "successful_response_code", "201", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, successful_response_code), + "Set successful response code. 200, 201 and 204 are supported." + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_prometheus_remote_write_plugin = { + .name = "prometheus_remote_write", + .description = "Prometheus Remote Write input", + .cb_init = prom_rw_init, + .cb_pre_run = NULL, + .cb_collect = prom_rw_collect, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = prom_rw_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/plugins/in_prometheus_remote_write/prom_rw.h b/plugins/in_prometheus_remote_write/prom_rw.h new file mode 100644 index 00000000000..698e5c89dd5 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_H +#define FLB_IN_PROM_RW_H + +#include +#include +#include + +#include +#include + +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" + +struct flb_prom_remote_write { + int successful_response_code; + flb_sds_t listen; + flb_sds_t tcp_port; + int tag_from_uri; + + struct flb_input_instance *ins; + + /* HTTP URI */ + char *uri; + + /* New gen HTTP server */ + int enable_http2; + struct flb_http_server http_server; + + /* Legacy HTTP server */ + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + int collector_id; /* Listener collector id */ + struct flb_downstream *downstream; /* Client manager */ + struct mk_list connections; /* linked list of connections */ + + struct mk_server *server; +}; + + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_config.c b/plugins/in_prometheus_remote_write/prom_rw_config.c new file mode 100644 index 00000000000..3df2ba125f6 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_config.c @@ -0,0 +1,102 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "prom_rw.h" +#include "prom_rw_config.h" +#include "prom_rw_conn.h" + +/* default HTTP port for prometheus remote write */ +#define PROMETHEUS_REMOTE_WRITE_HTTP_PORT 8080 + +struct flb_prom_remote_write *prom_rw_config_create(struct flb_input_instance *ins) +{ + int ret; + char port[8]; + struct flb_prom_remote_write *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_prom_remote_write)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0:80) */ + flb_input_net_default_listener("0.0.0.0", PROMETHEUS_REMOTE_WRITE_HTTP_PORT, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + if (ctx->server == NULL) { + flb_plg_error(ctx->ins, "error on mk_server allocation"); + prom_rw_config_destroy(ctx); + return NULL; + } + ctx->server->keep_alive = MK_TRUE; + + /* monkey detects server->workers == 0 as the server not being initialized at the + * moment so we want to make sure that it stays that way! + */ + + return ctx; +} + +int prom_rw_config_destroy(struct flb_prom_remote_write *ctx) +{ + /* release all connections */ + prom_rw_conn_release_all(ctx); + + if (ctx->collector_id != -1) { + flb_input_collector_delete(ctx->collector_id, ctx->ins); + + ctx->collector_id = -1; + } + + if (ctx->downstream != NULL) { + flb_downstream_destroy(ctx->downstream); + } + + if (ctx->enable_http2) { + flb_http_server_destroy(&ctx->http_server); + } + + if (ctx->server) { + flb_free(ctx->server); + } + + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + + return 0; +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_config.h b/plugins/in_prometheus_remote_write/prom_rw_config.h new file mode 100644 index 00000000000..e1b624bf004 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_CONFIG_H +#define FLB_IN_PROM_RW_CONFIG_H + +#include +#include "prom_rw.h" + +struct flb_prom_remote_write *prom_rw_config_create(struct flb_input_instance *ins); +int prom_rw_config_destroy(struct flb_prom_remote_write *ctx); + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_conn.c b/plugins/in_prometheus_remote_write/prom_rw_conn.c new file mode 100644 index 00000000000..7f730fdbda6 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_conn.c @@ -0,0 +1,300 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" +#include "prom_rw_prot.h" + +static void prom_rw_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request); + +static int prom_rw_conn_event(void *data) +{ + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + char *request_end; + size_t request_len; + struct prom_remote_write_conn *conn; + struct mk_event *event; + struct flb_prom_remote_write *ctx; + struct flb_connection *connection; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + prom_rw_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = flb_io_net_read(connection, + (void *) &conn->buf_data[conn->buf_len], + available); + + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + prom_rw_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + status = mk_http_parser(&conn->request, &conn->session.parser, + conn->buf_data, conn->buf_len, conn->session.server); + + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + prom_rw_prot_handle(ctx, conn, &conn->session, &conn->request); + + /* Evict the processed request from the connection buffer and reinitialize + * the HTTP parser. + */ + + request_end = NULL; + + if (NULL != conn->request.data.data) { + request_end = &conn->request.data.data[conn->request.data.len]; + } + else { + request_end = strstr(conn->buf_data, "\r\n\r\n"); + + if(NULL != request_end) { + request_end = &request_end[4]; + } + } + + if (NULL != request_end) { + request_len = (size_t)(request_end - conn->buf_data); + + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); + + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + + conn->buf_len = 0; + } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + prom_rw_conn_request_init(&conn->session, &conn->request); + } + } + else if (status == MK_HTTP_PARSER_ERROR) { + prom_rw_prot_handle_error(ctx, conn, &conn->session, &conn->request); + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + prom_rw_conn_request_init(&conn->session, &conn->request); + } + + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + prom_rw_conn_del(conn); + return -1; + } + + return 0; + +} + +static void prom_rw_conn_session_init(struct mk_http_session *session, + struct mk_server *server, + int client_fd) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + session->socket = client_fd; + + /* creation time in unix time */ + session->init_time = time(NULL); + + session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket); + session->channel->io = session->server->network; + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +static void prom_rw_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request) +{ + memset(request, 0, sizeof(struct mk_http_request)); + + mk_http_request_init(session, request, session->server); + + request->in_headers.type = MK_STREAM_IOV; + request->in_headers.dynamic = MK_FALSE; + request->in_headers.cb_consumed = NULL; + request->in_headers.cb_finished = NULL; + request->in_headers.stream = &request->stream; + + mk_list_add(&request->in_headers._head, &request->stream.inputs); + + request->session = session; +} + +struct prom_remote_write_conn *prom_rw_conn_add(struct flb_connection *connection, + struct flb_prom_remote_write *ctx) +{ + struct prom_remote_write_conn *conn; + int ret; + + conn = flb_calloc(1, sizeof(struct prom_remote_write_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = prom_rw_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(flb_engine_evl_get(), + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + flb_free(conn->buf_data); + flb_free(conn); + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + prom_rw_conn_session_init(&conn->session, ctx->server, connection->fd); + + /* Initialize HTTP Request: this is the initial request and it will be reinitialized + * automatically after the request is handled so it can be used for the next one. + */ + prom_rw_conn_request_init(&conn->session, &conn->request); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + return conn; +} + +int prom_rw_conn_del(struct prom_remote_write_conn *conn) +{ + if (conn->session.channel != NULL) { + mk_channel_release(conn->session.channel); + } + + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + mk_list_del(&conn->_head); + + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void prom_rw_conn_release_all(struct flb_prom_remote_write *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct prom_remote_write_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct prom_remote_write_conn, _head); + prom_rw_conn_del(conn); + } +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_conn.h b/plugins/in_prometheus_remote_write/prom_rw_conn.h new file mode 100644 index 00000000000..1716c32b87a --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_conn.h @@ -0,0 +1,57 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_CONN_H +#define FLB_IN_PROM_RW_CONN_H + +#include +#include +#include +#include + +#include "prom_rw_conn.h" + +struct prom_remote_write_conn { + struct mk_event event; /* Built-in event data for mk_events */ + + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + struct flb_connection *connection; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_opentelemetry->connections */ +}; + +struct prom_remote_write_conn *prom_rw_conn_add(struct flb_connection *connection, + struct flb_prom_remote_write *ctx); +int prom_rw_conn_del(struct prom_remote_write_conn *conn); +void prom_rw_conn_release_all(struct flb_prom_remote_write *ctx); + + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_prot.c b/plugins/in_prometheus_remote_write/prom_rw_prot.c new file mode 100644 index 00000000000..1ffc23fdc44 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_prot.c @@ -0,0 +1,489 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" + +static int send_response(struct flb_input_instance *in, + struct prom_remote_write_conn *conn, + int http_status, char *message) +{ + int len; + flb_sds_t out; + size_t sent; + ssize_t bytes; + int result; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 201) { + flb_sds_printf(&out, + "HTTP/1.1 201 Created \r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 204) { + flb_sds_printf(&out, + "HTTP/1.1 204 No Content\r\n" + "Server: Fluent Bit v%s\r\n" + "\r\n", + FLB_VERSION_STR); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + + /* We should check the outcome of this operation */ + bytes = flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + if (bytes == -1) { + flb_plg_error(in, "cannot send response"); + + result = -1; + } + else { + result = 0; + } + + flb_sds_destroy(out); + + return result; +} + +static int process_payload_metrics(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + struct cmt *context; + int result; + + result = cmt_decode_prometheus_remote_write_create(&context, + request->data.data, + request->data.len); + + if (result == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics : %d", result); + } + + cmt_decode_prometheus_remote_write_destroy(context); + } + + return 0; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +static int uncompress_snappy(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_snappy_uncompress_framed_data(input_buffer, + input_size, + output_buffer, + output_size); + + if (ret != 0) { + flb_error("[opentelemetry] snappy decompression failed"); + + return -1; + } + + return 1; +} + +static int uncompress_gzip(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_gzip_uncompress(input_buffer, + input_size, + (void *) output_buffer, + output_size); + + if (ret == -1) { + flb_error("[opentelemetry] gzip decompression failed"); + + return -1; + } + + return 1; +} + +int prom_rw_prot_uncompress(struct mk_http_session *session, + struct mk_http_request *request, + char **output_buffer, + size_t *output_size) +{ + struct mk_http_header *header; + size_t index; + + *output_buffer = NULL; + *output_size = 0; + + for (index = 0; + index < session->parser.headers_extra_count; + index++) { + header = &session->parser.headers_extra[index]; + + if (strncasecmp(header->key.data, "Content-Encoding", 16) == 0) { + if (strncasecmp(header->val.data, "gzip", 4) == 0) { + return uncompress_gzip(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "snappy", 6) == 0) { + return uncompress_snappy(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else { + return -2; + } + } + } + + return 0; +} + + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int prom_rw_prot_handle(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret = -1; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + char *original_data; + size_t original_data_size; + char *uncompressed_data; + size_t uncompressed_data_size; + + if (request->uri.data[0] != '/') { + send_response(ctx->ins, conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + if (ctx->uri != NULL && strcmp(uri, ctx->uri) != 0) { + send_response(ctx->ins, conn, 400, "error: invalid endpoint\n"); + mk_mem_free(uri); + + return -1; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Compose the query string using the URI */ + len = strlen(uri); + + if (ctx->tag_from_uri != FLB_TRUE) { + tag = flb_sds_create(ctx->ins->tag); + } + else { + tag = flb_sds_create_size(len); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* New tag skipping the URI '/' */ + flb_sds_cat(tag, uri + 1, len - 1); + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + mk_mem_free(uri); + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE); + + if (request->method != MK_METHOD_POST) { + flb_sds_destroy(tag); + mk_mem_free(uri); + send_response(ctx->ins, conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + original_data = request->data.data; + original_data_size = request->data.len; + + ret = prom_rw_prot_uncompress(session, request, + &uncompressed_data, + &uncompressed_data_size); + + if (ret > 0) { + request->data.data = uncompressed_data; + request->data.len = uncompressed_data_size; + } + + if (ctx->uri != NULL && strcmp(uri, ctx->uri) == 0) { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + else { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + + if (uncompressed_data != NULL) { + flb_free(uncompressed_data); + } + + request->data.data = original_data; + request->data.len = original_data_size; + + mk_mem_free(uri); + flb_sds_destroy(tag); + + send_response(ctx->ins, conn, ctx->successful_response_code, NULL); + + return ret; +} + +/* + * Handle an incoming request which has resulted in an http parser error. + */ +int prom_rw_prot_handle_error( + struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + send_response(ctx->ins, conn, 400, "error: invalid request\n"); + return -1; +} + + +/* New gen HTTP server */ +static int send_response_ng(struct flb_http_response *response, + int http_status, + char *message) +{ + flb_http_response_set_status(response, http_status); + + if (http_status == 201) { + flb_http_response_set_message(response, "Created"); + } + else if (http_status == 200) { + flb_http_response_set_message(response, "OK"); + } + else if (http_status == 204) { + flb_http_response_set_message(response, "No Content"); + } + else if (http_status == 400) { + flb_http_response_set_message(response, "Forbidden"); + } + + if (message != NULL) { + flb_http_response_set_body(response, + (unsigned char *) message, + strlen(message)); + } + + flb_http_response_commit(response); + + return 0; +} + +static int process_payload_metrics_ng(struct flb_prom_remote_write *ctx, + flb_sds_t tag, + struct flb_http_request *request, + struct flb_http_response *response) +{ + struct cmt *context; + int result; + + result = cmt_decode_prometheus_remote_write_create(&context, + request->body, + cfl_sds_len(request->body)); + + if (result == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics : %d", result); + } + + cmt_decode_prometheus_remote_write_destroy(context); + } + + return 0; +} + +int prom_rw_prot_handle_ng(struct flb_http_request *request, + struct flb_http_response *response) +{ + struct flb_prom_remote_write *context; + int result; + + context = (struct flb_prom_remote_write *) response->stream->user_data; + + if (request->path[0] != '/') { + send_response_ng(response, 400, "error: invalid request\n"); + return -1; + } + + /* ToDo: Fix me */ + /* HTTP/1.1 needs Host header */ + if (request->protocol_version == HTTP_PROTOCOL_HTTP1 && + request->host == NULL) { + + return -1; + } + + if (request->method != HTTP_METHOD_POST) { + send_response_ng(response, 400, "error: invalid HTTP method\n"); + + return -1; + } + + if (context->uri != NULL && strcmp(request->path, context->uri) == 0) { + result = process_payload_metrics_ng(context, context->ins->tag, request, response); + } + else { + result = process_payload_metrics_ng(context, context->ins->tag, request, response); + } + + send_response_ng(response, context->successful_response_code, NULL); + + return result; +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_prot.h b/plugins/in_prometheus_remote_write/prom_rw_prot.h new file mode 100644 index 00000000000..35f376884d5 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_prot.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_PROT +#define FLB_IN_PROM_RW_PROT + +#include + +int prom_rw_prot_handle(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +int prom_rw_prot_handle_error(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + + +int prom_rw_prot_handle_ng(struct flb_http_request *request, + struct flb_http_response *response); + +#endif diff --git a/plugins/in_winevtlog/winevtlog.h b/plugins/in_winevtlog/winevtlog.h index 10ef3e457e7..20b5749d6a3 100644 --- a/plugins/in_winevtlog/winevtlog.h +++ b/plugins/in_winevtlog/winevtlog.h @@ -27,7 +27,7 @@ struct winevtlog_config { unsigned int interval_sec; unsigned int interval_nsec; - unsigned int total_size_threshold; + size_t total_size_threshold; int string_inserts; int read_existing_events; int render_event_as_xml; diff --git a/plugins/out_datadog/datadog.c b/plugins/out_datadog/datadog.c index d0b1cf3d433..c92992f534e 100644 --- a/plugins/out_datadog/datadog.c +++ b/plugins/out_datadog/datadog.c @@ -495,19 +495,24 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "dd_service", NULL, 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_service), - "The human readable name for your service generating the logs " - "- the name of your application or database." + "The human readable name for your service generating the logs " + "(e.g. the name of your application or database). If unset, Datadog " + "will look for the service using Service Remapper in Log Management " + "(by default it will look at the `service` and `syslog.appname` attributes)." + "" }, { FLB_CONFIG_MAP_STR, "dd_source", NULL, 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_source), - "A human readable name for the underlying technology of your service. " - "For example, 'postgres' or 'nginx'." + "A human readable name for the underlying technology of your service " + "(e.g. 'postgres' or 'nginx'). If unset, Datadog will expect the source " + "to be set as the `ddsource` attribute." }, { FLB_CONFIG_MAP_STR, "dd_tags", NULL, 0, FLB_TRUE, offsetof(struct flb_out_datadog, dd_tags), - "The tags you want to assign to your logs in Datadog." + "The tags you want to assign to your logs in Datadog. If unset, Datadog " + "will expect the tags in the `ddtags` attribute." }, { diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 9379da84724..8dc8cb86ea8 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -227,9 +227,9 @@ int produce_message(struct flb_time *tm, msgpack_object *map, flb_warn("',' not allowed in dynamic_kafka topic names"); continue; } - if (val.via.str.size > 64) { - /* Don't allow length of dynamic kafka topics > 64 */ - flb_warn(" dynamic kafka topic length > 64 not allowed"); + if (val.via.str.size > 249) { + /* Don't allow length of dynamic kafka topics > 249 */ + flb_warn(" dynamic kafka topic length > 249 not allowed"); continue; } dynamic_topic = flb_malloc(val.via.str.size + 1); diff --git a/plugins/out_splunk/splunk.c b/plugins/out_splunk/splunk.c index d7493f9d24f..4ec93aff477 100644 --- a/plugins/out_splunk/splunk.c +++ b/plugins/out_splunk/splunk.c @@ -353,13 +353,28 @@ static flb_sds_t extract_hec_token(struct flb_splunk *ctx, msgpack_object map, flb_sds_t hec_token; /* Extract HEC token (map which is from metadata lookup) */ - if (ctx->event_sourcetype_key) { + if (ctx->metadata_auth_key) { hec_token = flb_ra_translate(ctx->ra_metadata_auth_key, tag, tag_len, map, NULL); - if (hec_token) { + /* + * record accessor translation can return an empty string buffer if the + * translation was not successfull or the value was not found. We consider + * a valid token any string which length is greater than 0. + * + * note: flb_ra_translate_check() is not used here because it will print + * an error message if the translation fails: + * + * ref: https://github.com/fluent/fluent-bit/issues/8859 + */ + if (hec_token && flb_sds_len(hec_token) > 0) { return hec_token; } + /* destroy empty string */ + if (hec_token) { + flb_sds_destroy(hec_token); + } + flb_plg_debug(ctx->ins, "Could not find hec_token in metadata"); return NULL; } @@ -368,21 +383,49 @@ static flb_sds_t extract_hec_token(struct flb_splunk *ctx, msgpack_object map, return NULL; } +static void set_metadata_auth_header(struct flb_splunk *ctx, flb_sds_t hec_token) +{ + pthread_mutex_lock(&ctx->mutex_hec_token); + + if (ctx->metadata_auth_header != NULL) { + flb_sds_destroy(ctx->metadata_auth_header); + } + ctx->metadata_auth_header = hec_token; + + pthread_mutex_unlock(&ctx->mutex_hec_token); +} + +static flb_sds_t get_metadata_auth_header(struct flb_splunk *ctx) +{ + flb_sds_t auth_header = NULL; + + pthread_mutex_lock(&ctx->mutex_hec_token); + + if (ctx->metadata_auth_header) { + auth_header = flb_sds_create(ctx->metadata_auth_header); + } + + pthread_mutex_unlock(&ctx->mutex_hec_token); + + return auth_header; +} + static inline int splunk_format(const void *in_buf, size_t in_bytes, char *tag, int tag_len, char **out_buf, size_t *out_size, struct flb_splunk *ctx) { int ret; + char *err; msgpack_object map; msgpack_object metadata; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; - char *err; flb_sds_t tmp; flb_sds_t record; flb_sds_t json_out; flb_sds_t metadata_hec_token = NULL; + struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; @@ -403,8 +446,6 @@ static inline int splunk_format(const void *in_buf, size_t in_bytes, return -1; } - ctx->metadata_auth_header = NULL; - while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { @@ -422,10 +463,7 @@ static inline int splunk_format(const void *in_buf, size_t in_bytes, * specify only one splunk token per one instance. * So, it should be valid if storing only last value of * splunk token per one chunk. */ - if (ctx->metadata_auth_header != NULL) { - cfl_sds_destroy(ctx->metadata_auth_header); - } - ctx->metadata_auth_header = metadata_hec_token; + set_metadata_auth_header(ctx, metadata_hec_token); } if (ctx->event_key) { @@ -598,6 +636,7 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, size_t payload_size; (void) i_ins; (void) config; + flb_sds_t metadata_auth_header = NULL; /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); @@ -677,16 +716,25 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, flb_http_buffer_size(c, resp_size); } + metadata_auth_header = get_metadata_auth_header(ctx); + /* HTTP Client */ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - /* Try to use http_user and http_passwd if not, fallback to auth_header */ + /* + * Authentication mechanism & order: + * + * 1. use the configure `http_user` and `http_passwd` + * 2. use metadata 'hec_token', if the records are generated by Splunk input plugin, this will be set. + * 3. use the configured `splunk_token` (if set). + */ if (ctx->http_user && ctx->http_passwd) { flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); } - else if (ctx->metadata_auth_header) { + else if (metadata_auth_header) { flb_http_add_header(c, "Authorization", 13, - ctx->metadata_auth_header, flb_sds_len(ctx->metadata_auth_header)); + metadata_auth_header, + flb_sds_len(metadata_auth_header)); } else if (ctx->auth_header) { flb_http_add_header(c, "Authorization", 13, @@ -754,10 +802,10 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, flb_sds_destroy(buf_data); } - /* Cleanup */ - if (ctx->metadata_auth_header != NULL) { - cfl_sds_destroy(ctx->metadata_auth_header); + if (metadata_auth_header) { + flb_sds_destroy(metadata_auth_header); } + flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(ret); diff --git a/plugins/out_splunk/splunk.h b/plugins/out_splunk/splunk.h index eb64d2d57e2..4c71a0ec356 100644 --- a/plugins/out_splunk/splunk.h +++ b/plugins/out_splunk/splunk.h @@ -120,6 +120,8 @@ struct flb_splunk { /* Plugin instance */ struct flb_output_instance *ins; + + pthread_mutex_t mutex_hec_token; }; #endif diff --git a/plugins/out_splunk/splunk_conf.c b/plugins/out_splunk/splunk_conf.c index 06902ef227f..43ccdc1879c 100644 --- a/plugins/out_splunk/splunk_conf.c +++ b/plugins/out_splunk/splunk_conf.c @@ -241,6 +241,7 @@ struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, } ctx->metadata_auth_header = NULL; + /* No http_user is set, fallback to splunk_token, if splunk_token is unset, fail. */ if (!ctx->http_user) { /* Splunk Auth Token */ @@ -262,8 +263,10 @@ struct flb_splunk *flb_splunk_conf_create(struct flb_output_instance *ins, } } + pthread_mutex_init(&ctx->mutex_hec_token, NULL); + /* Currently, Splunk HEC token is stored in a fixed key, hec_token. */ - ctx->metadata_auth_key = "hec_token"; + ctx->metadata_auth_key = "$hec_token"; if (ctx->metadata_auth_key) { ctx->ra_metadata_auth_key = flb_ra_create(ctx->metadata_auth_key, FLB_TRUE); if (!ctx->ra_metadata_auth_key) { @@ -325,6 +328,10 @@ int flb_splunk_conf_destroy(struct flb_splunk *ctx) flb_ra_destroy(ctx->ra_metadata_auth_key); } + if (ctx->metadata_auth_header) { + flb_sds_destroy(ctx->metadata_auth_header); + } + event_fields_destroy(ctx); flb_free(ctx); diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 503fc9e68bd..b21f5b9476c 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -372,7 +372,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) if (time(NULL) >= cached_expiration) { return output; } else { - /* + /* * Cached token is expired. Wait on lock to use up-to-date token * by either waiting for it to be refreshed or refresh it ourselves. */ @@ -1068,7 +1068,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx, if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, + msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string)); msgpack_pack_str_body(mp_pck, rval->val.string, @@ -1082,7 +1082,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx, } else { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, + msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val)); msgpack_pack_str_body(mp_pck, label_kv->val, @@ -1284,7 +1284,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, return -1; } - if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE + if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) { ret = gce_metadata_read_zone(ctx); if (ret == -1) { @@ -1434,13 +1434,13 @@ static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * s { msgpack_object tmp; int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN); - + if (ret == 0 && tmp.via.boolean == true) { *trace_sampled_value = FLB_TRUE; return 0; } else if (ret == 0 && tmp.via.boolean == false) { *trace_sampled_value = FLB_FALSE; - return 0; + return 0; } return -1; @@ -1476,15 +1476,16 @@ static insert_id_status validate_insert_id(msgpack_object * insert_id_value, return ret; } -static int pack_json_payload(int insert_id_extracted, - int operation_extracted, int operation_extra_size, - int source_location_extracted, - int source_location_extra_size, - int http_request_extracted, - int http_request_extra_size, - timestamp_status tms_status, - msgpack_packer *mp_pck, msgpack_object *obj, - struct flb_stackdriver *ctx) +static int pack_payload(int insert_id_extracted, + int operation_extracted, + int operation_extra_size, + int source_location_extracted, + int source_location_extra_size, + int http_request_extracted, + int http_request_extra_size, + timestamp_status tms_status, + msgpack_packer *mp_pck, msgpack_object *obj, + struct flb_stackdriver *ctx) { /* Specified fields include local_resource_id, operation, sourceLocation ... */ int i, j; @@ -1495,10 +1496,14 @@ static int pack_json_payload(int insert_id_extracted, int len; int len_to_be_removed; int key_not_found; + int text_payload_len = 0; + int is_string_text_payload = FLB_FALSE; + int write_to_textpayload_field = FLB_FALSE; flb_sds_t removed; flb_sds_t monitored_resource_key; flb_sds_t local_resource_id_key; flb_sds_t stream; + flb_sds_t text_payload = NULL; msgpack_object_kv *kv = obj->via.map.ptr; msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; @@ -1565,14 +1570,36 @@ static int pack_json_payload(int insert_id_extracted, new_map_size = map_size - to_remove; - ret = msgpack_pack_map(mp_pck, new_map_size); - if (ret < 0) { - goto error; + if (ctx->text_payload_key && get_string(&text_payload, obj, ctx->text_payload_key) == 0) { + is_string_text_payload = FLB_TRUE; + } + + /* write to textPayload if text_payload_key is the only residual string field*/ + if ((new_map_size == 1) && is_string_text_payload) { + write_to_textpayload_field = FLB_TRUE; + } + + if (write_to_textpayload_field) { + msgpack_pack_str(mp_pck, 11); + msgpack_pack_str_body(mp_pck, "textPayload", 11); + + text_payload_len = flb_sds_len(text_payload); + msgpack_pack_str(mp_pck, text_payload_len); + msgpack_pack_str_body(mp_pck, text_payload, text_payload_len); + } else { + /* jsonPayload */ + msgpack_pack_str(mp_pck, 11); + msgpack_pack_str_body(mp_pck, "jsonPayload", 11); + + ret = msgpack_pack_map(mp_pck, new_map_size); + if (ret < 0) { + goto error; + } } /* points back to the beginning of map */ kv = obj->via.map.ptr; - for(; kv != kvend; ++kv ) { + for(; kv != kvend; ++kv) { key_not_found = 1; /* processing logging.googleapis.com/insertId */ @@ -1639,7 +1666,8 @@ static int pack_json_payload(int insert_id_extracted, } } - if (key_not_found) { + /* write residual log fields to jsonPayload */ + if (key_not_found && !write_to_textpayload_field) { ret = msgpack_pack_object(mp_pck, kv->key); if (ret < 0) { goto error; @@ -1654,12 +1682,14 @@ static int pack_json_payload(int insert_id_extracted, flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); + flb_sds_destroy(text_payload); return 0; error: flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); + flb_sds_destroy(text_payload); return ret; } @@ -1821,7 +1851,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, msgpack_pack_str_body(&mp_pck, "labels", 6); ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes); - if (ret != 0) { + if (ret != 0) { if (ctx->resource_type == RESOURCE_TYPE_K8S) { ret = extract_local_resource_id(data, bytes, ctx, tag); if (ret != 0) { @@ -2314,7 +2344,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, /* Extract httpRequest */ init_http_request(&http_request); http_request_extra_size = 0; - http_request_extracted = extract_http_request(&http_request, + http_request_extracted = extract_http_request(&http_request, ctx->http_request_key, ctx->http_request_key_size, obj, &http_request_extra_size); @@ -2432,17 +2462,16 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, flb_sds_destroy(source_location_function); destroy_http_request(&http_request); - /* jsonPayload */ - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - pack_json_payload(insert_id_extracted, - operation_extracted, operation_extra_size, - source_location_extracted, - source_location_extra_size, - http_request_extracted, - http_request_extra_size, - tms_status, - &mp_pck, obj, ctx); + /* both textPayload and jsonPayload are supported */ + pack_payload(insert_id_extracted, + operation_extracted, + operation_extra_size, + source_location_extracted, + source_location_extra_size, + http_request_extracted, + http_request_extra_size, + tms_status, + &mp_pck, obj, ctx); /* avoid modifying the original tag */ newtag = tag; @@ -2594,7 +2623,7 @@ static void update_retry_metric(struct flb_stackdriver *ctx, uint64_t ts, int http_status) { - char tmp[32]; + char tmp[32]; char *name = (char *) flb_output_name(ctx->ins); /* convert status to string format */ @@ -3154,6 +3183,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels), "Set the resource labels" }, + { + FLB_CONFIG_MAP_STR, "text_payload_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_stackdriver, text_payload_key), + "Set key for extracting text payload" + }, { FLB_CONFIG_MAP_BOOL, "test_log_entry_format", "false", 0, FLB_TRUE, offsetof(struct flb_stackdriver, test_log_entry_format), diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 2a645c16402..76f5a7598ea 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -208,6 +208,9 @@ struct flb_stackdriver { /* upstream context for metadata end-point */ struct flb_upstream *metadata_u; + /* the key to extract unstructured text payload from */ + flb_sds_t text_payload_key; + #ifdef FLB_HAVE_METRICS /* metrics */ struct cmt_counter *cmt_successful_requests; diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index 62e5323fd43..ee34e639f25 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -1524,26 +1524,27 @@ static int insert_labels(struct cmt *metrics_context, pair->key); if (result == FLB_TRUE) { - result = metrics_context_insert_dynamic_label(metrics_context, - pair->key, - pair->val); + continue; + } - if (result == FLB_FALSE) { - return FLB_FALSE; - } + result = metrics_context_insert_dynamic_label(metrics_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_FALSE; } - else { - result = metrics_context_contains_static_label(metrics_context, - pair->key); - if (result == FLB_FALSE) { - result = metrics_context_insert_static_label(metrics_context, - pair->key, - pair->val); + result = metrics_context_contains_static_label(metrics_context, + pair->key); - if (result == FLB_FALSE) { - return FLB_FALSE; - } + if (result == FLB_TRUE) { + result = metrics_context_insert_static_label(metrics_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_FALSE; } } } diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index a7fbff95c2f..cd9aa78b38b 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '3.0.4' +version: '3.0.7' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. diff --git a/src/flb_config.c b/src/flb_config.c index 54249fa8d33..747d855cf08 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -390,14 +390,14 @@ void flb_config_exit(struct flb_config *config) struct mk_list *head; struct flb_cf *cf; - if (config->log_file) { - flb_free(config->log_file); - } - if (config->log) { flb_log_destroy(config->log, config); } + if (config->log_file) { + flb_free(config->log_file); + } + if (config->parsers_file) { flb_free(config->parsers_file); } @@ -777,7 +777,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, /* validate the instance creation */ if (!ins) { flb_error("[config] section '%s' tried to instance a plugin name " - "that don't exists", name); + "that doesn't exist", name); flb_sds_destroy(name); return -1; } diff --git a/src/flb_kernel.c b/src/flb_kernel.c index 5a6fe03c9c7..e80d25fdd3e 100644 --- a/src/flb_kernel.c +++ b/src/flb_kernel.c @@ -122,8 +122,8 @@ struct flb_kernel *flb_kernel_info() flb_errno(); return NULL; } - kernel->minor = a; - kernel->major = b; + kernel->major = a; + kernel->minor = b; kernel->patch = c; kernel->s_version.data = flb_malloc(16); diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 6e41b8619b3..d35e034ed20 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -1344,6 +1344,7 @@ int flb_main(int argc, char **argv) if (config->dry_run == FLB_TRUE) { fprintf(stderr, "configuration test is successful\n"); + flb_init_env(); flb_cf_destroy(cf_opts); flb_destroy(ctx); exit(EXIT_SUCCESS); diff --git a/src/http_server/api/v1/trace.c b/src/http_server/api/v1/trace.c index 0c57f51f89e..1eb5ab70fbf 100644 --- a/src/http_server/api/v1/trace.c +++ b/src/http_server/api/v1/trace.c @@ -30,8 +30,28 @@ #include #include - -static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name) +#define STR_INPUTS "inputs" +#define STR_INPUTS_LEN (sizeof(STR_INPUTS)-1) + +#define HTTP_FIELD_MESSAGE "message" +#define HTTP_FIELD_MESSAGE_LEN (sizeof(HTTP_FIELD_MESSAGE)-1) +#define HTTP_FIELD_STATUS "status" +#define HTTP_FIELD_STATUS_LEN (sizeof(HTTP_FIELD_STATUS)-1) +#define HTTP_FIELD_RETURNCODE "returncode" +#define HTTP_FIELD_RETURNCODE_LEN (sizeof(HTTP_FIELD_RETURNCODE)-1) + +#define HTTP_RESULT_OK "ok" +#define HTTP_RESULT_OK_LEN (sizeof(HTTP_RESULT_OK)-1) +#define HTTP_RESULT_ERROR "error" +#define HTTP_RESULT_ERROR_LEN (sizeof(HTTP_RESULT_ERROR)-1) +#define HTTP_RESULT_NOTFOUND "not found" +#define HTTP_RESULT_NOTFOUND_LEN (sizeof(HTTP_RESULT_NOTFOUND)-1) +#define HTTP_RESULT_METHODNOTALLOWED "method not allowed" +#define HTTP_RESULT_METHODNOTALLOWED_LEN (sizeof(HTTP_RESULT_METHODNOTALLOWED)-1) +#define HTTP_RESULT_UNKNOWNERROR "unknown error" +#define HTTP_RESULT_UNKNOWNERROR_LEN (sizeof(HTTP_RESULT_UNKNOWNERROR)-1) + +static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name, size_t nlen) { struct mk_list *head; struct flb_input_instance *in; @@ -39,7 +59,10 @@ static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name mk_list_foreach(head, &hs->config->inputs) { in = mk_list_entry(head, struct flb_input_instance, _head); - if (strcmp(name, in->name) == 0) { + if (strlen(in->name) != nlen) { + continue; + } + if (strncmp(name, in->name, nlen) == 0) { return in; } if (in->alias) { @@ -51,26 +74,33 @@ static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name return NULL; } -static int enable_trace_input(struct flb_hs *hs, const char *name, const char *prefix, const char *output_name, struct mk_list *props) +static int enable_trace_input(struct flb_hs *hs, const char *name, ssize_t nlen, const char *prefix, + const char *output_name, struct mk_list *props) { struct flb_input_instance *in; - - in = find_input(hs, name); + in = find_input(hs, name, nlen); if (in == NULL) { + flb_error("unable to find input: [%d]%.*s", (int)nlen, (int)nlen, name); return 404; } flb_chunk_trace_context_new(in, output_name, prefix, NULL, props); - return (in->chunk_trace_ctxt == NULL ? 503 : 0); + + if (in->chunk_trace_ctxt == NULL) { + flb_error("unable to start tracing"); + return 503; + } + + return 0; } -static int disable_trace_input(struct flb_hs *hs, const char *name) +static int disable_trace_input(struct flb_hs *hs, const char *name, size_t nlen) { struct flb_input_instance *in; - - in = find_input(hs, name); + + in = find_input(hs, name, nlen); if (in == NULL) { return 404; } @@ -89,32 +119,35 @@ static flb_sds_t get_input_name(mk_request_t *request) if (request->real_path.data == NULL) { return NULL; } - if (request->real_path.len < strlen(base)) { + if (request->real_path.len < sizeof(base)-1) { return NULL; } - return flb_sds_create_len(&request->real_path.data[strlen(base)], - request->real_path.len - strlen(base)); + return flb_sds_create_len(&request->real_path.data[sizeof(base)-1], + request->real_path.len - sizeof(base)-1); } -static int http_disable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck) +static int http_disable_trace(mk_request_t *request, void *data, + const char *input_name, size_t input_nlen, + msgpack_packer *mp_pck) { struct flb_hs *hs = data; int toggled_on = 503; - toggled_on = disable_trace_input(hs, input_name); + toggled_on = disable_trace_input(hs, input_name, input_nlen); if (toggled_on < 300) { msgpack_pack_map(mp_pck, 1); - msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); return 201; } return toggled_on; } -static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, const char *input_name) +static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *result, + const char *input_name, ssize_t input_nlen) { int ret = -1; int i; @@ -130,11 +163,11 @@ static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *resu msgpack_object_str *param_val; - if (result->data.type == MSGPACK_OBJECT_MAP) { + if (result->data.type == MSGPACK_OBJECT_MAP) { for (i = 0; i < result->data.via.map.size; i++) { key = &result->data.via.map.ptr[i].key; val = &result->data.via.map.ptr[i].val; - + if (key->type != MSGPACK_OBJECT_STR) { ret = -1; goto parse_error; @@ -193,7 +226,7 @@ static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *resu output_name = flb_sds_create("stdout"); } - toggled_on = enable_trace_input(hs, input_name, prefix, output_name, props); + toggled_on = enable_trace_input(hs, input_name, input_nlen, prefix, output_name, props); if (!toggled_on) { ret = -1; goto parse_error; @@ -210,7 +243,9 @@ static int msgpack_params_enable_trace(struct flb_hs *hs, msgpack_unpacked *resu return ret; } -static int http_enable_trace(mk_request_t *request, void *data, const char *input_name, msgpack_packer *mp_pck) +static int http_enable_trace(mk_request_t *request, void *data, + const char *input_name, ssize_t input_nlen, + msgpack_packer *mp_pck) { char *buf = NULL; size_t buf_size; @@ -229,18 +264,18 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu struct mk_list *props = NULL; struct flb_chunk_trace_limit limit = { 0 }; struct flb_input_instance *input_instance; - + if (request->method == MK_METHOD_GET) { - ret = enable_trace_input(hs, input_name, "trace.", "stdout", NULL); + ret = enable_trace_input(hs, input_name, input_nlen, "trace.", "stdout", NULL); if (ret == 0) { msgpack_pack_map(mp_pck, 1); - msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); return 200; } else { - flb_error("unable to enable tracing for %s", input_name); + flb_error("unable to enable tracing for %.*s", (int)input_nlen, input_name); goto input_error; } } @@ -257,7 +292,7 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu rc = msgpack_unpack_next(&result, buf, buf_size, &off); if (rc != MSGPACK_UNPACK_SUCCESS) { ret = 503; - flb_error("unable to unpack msgpack parameters for %s", input_name); + flb_error("unable to unpack msgpack parameters for %.*s", (int)input_nlen, input_name); goto unpack_error; } @@ -265,7 +300,7 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu for (i = 0; i < result.data.via.map.size; i++) { key = &result.data.via.map.ptr[i].key; val = &result.data.via.map.ptr[i].val; - + if (key->type != MSGPACK_OBJECT_STR) { ret = 503; flb_error("non string key in parameters"); @@ -359,14 +394,14 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu output_name = flb_sds_create("stdout"); } - ret = enable_trace_input(hs, input_name, prefix, output_name, props); + ret = enable_trace_input(hs, input_name, input_nlen, prefix, output_name, props); if (ret != 0) { flb_error("error when enabling tracing"); goto parse_error; } if (limit.type != 0) { - input_instance = find_input(hs, input_name); + input_instance = find_input(hs, input_name, input_nlen); if (limit.type == FLB_CHUNK_TRACE_LIMIT_TIME) { flb_chunk_trace_context_set_limit(input_instance->chunk_trace_ctxt, limit.type, limit.seconds); } @@ -377,8 +412,8 @@ static int http_enable_trace(mk_request_t *request, void *data, const char *inpu } msgpack_pack_map(mp_pck, 1); - msgpack_pack_str_with_body(mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); ret = 200; parse_error: @@ -417,21 +452,21 @@ static void cb_trace(mk_request_t *request, void *data) } if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) { - response = http_enable_trace(request, data, input_name, &mp_pck); + response = http_enable_trace(request, data, input_name, flb_sds_len(input_name), &mp_pck); } else if (request->method == MK_METHOD_DELETE) { - response = http_disable_trace(request, data, input_name, &mp_pck); + response = http_disable_trace(request, data, input_name, flb_sds_len(input_name), &mp_pck); } error: if (response == 404) { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_NOTFOUND, HTTP_RESULT_NOTFOUND_LEN); } else if (response == 503) { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_ERROR, HTTP_RESULT_ERROR_LEN); } if (input_name != NULL) { @@ -466,11 +501,11 @@ static void cb_traces(mk_request_t *request, void *data) msgpack_unpacked result; flb_sds_t error_msg = NULL; int response = 200; - flb_sds_t input_name; + const char *input_name; + ssize_t input_nlen; msgpack_object_array *inputs = NULL; size_t off = 0; int i; - /* initialize buffers */ msgpack_sbuffer_init(&mp_sbuf); @@ -503,10 +538,10 @@ static void cb_traces(mk_request_t *request, void *data) if (result.data.via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) { continue; } - if (result.data.via.map.ptr[i].key.via.str.size < strlen("inputs")) { + if (result.data.via.map.ptr[i].key.via.str.size < STR_INPUTS_LEN) { continue; } - if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, "inputs", strlen("inputs"))) { + if (strncmp(result.data.via.map.ptr[i].key.via.str.ptr, STR_INPUTS, STR_INPUTS_LEN)) { continue; } inputs = &result.data.via.map.ptr[i].val.via.array; @@ -517,48 +552,61 @@ static void cb_traces(mk_request_t *request, void *data) error_msg = flb_sds_create("inputs not found"); goto unpack_error; } - + msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "inputs", strlen("inputs")); + msgpack_pack_str_with_body(&mp_pck, STR_INPUTS, STR_INPUTS_LEN); msgpack_pack_map(&mp_pck, inputs->size); for (i = 0; i < inputs->size; i++) { - input_name = flb_sds_create_len(inputs->ptr[i].via.str.ptr, inputs->ptr[i].via.str.size); - msgpack_pack_str_with_body(&mp_pck, input_name, flb_sds_len(input_name)); - if (inputs->ptr[i].type != MSGPACK_OBJECT_STR) { - msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); + if (inputs->ptr[i].type != MSGPACK_OBJECT_STR || inputs->ptr[i].via.str.ptr == NULL) { + response = 503; + error_msg = flb_sds_create("invalid input"); + msgpack_sbuffer_clear(&mp_sbuf); + goto unpack_error; } - else { - if (request->method == MK_METHOD_POST || request->method == MK_METHOD_GET) { - ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, input_name); - if (ret != 0) { - msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); - msgpack_pack_str_with_body(&mp_pck, "returncode", strlen("returncode")); - msgpack_pack_int64(&mp_pck, ret); - } - else { - msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok")); - } - } - else if (request->method == MK_METHOD_DELETE) { - disable_trace_input((struct flb_hs *)data, input_name); + } + + for (i = 0; i < inputs->size; i++) { + + input_name = inputs->ptr[i].via.str.ptr; + input_nlen = inputs->ptr[i].via.str.size; + + msgpack_pack_str_with_body(&mp_pck, input_name, input_nlen); + + if (request->method == MK_METHOD_POST) { + + ret = msgpack_params_enable_trace((struct flb_hs *)data, &result, + input_name, input_nlen); + + if (ret != 0) { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_ERROR, HTTP_RESULT_ERROR_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_RETURNCODE, + HTTP_FIELD_RETURNCODE_LEN); + msgpack_pack_int64(&mp_pck, ret); } else { - msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); - msgpack_pack_str_with_body(&mp_pck, "message", strlen("message")); - msgpack_pack_str_with_body(&mp_pck, "method not allowed", strlen("method not allowed")); + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); } } + else if (request->method == MK_METHOD_DELETE) { + disable_trace_input((struct flb_hs *)data, input_name, input_nlen); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); + } + else { + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_ERROR, HTTP_RESULT_ERROR_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_MESSAGE, HTTP_FIELD_MESSAGE_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_METHODNOTALLOWED, + HTTP_RESULT_METHODNOTALLOWED_LEN); + } } msgpack_pack_str_with_body(&mp_pck, "result", strlen("result")); @@ -569,26 +617,27 @@ static void cb_traces(mk_request_t *request, void *data) msgpack_unpacked_destroy(&result); if (response == 404) { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "not found", strlen("not found")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_NOTFOUND, HTTP_RESULT_NOTFOUND_LEN); } else if (response == 503) { msgpack_pack_map(&mp_pck, 2); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "error", strlen("error")); - msgpack_pack_str_with_body(&mp_pck, "message", strlen("message")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_MESSAGE, HTTP_FIELD_MESSAGE_LEN); if (error_msg) { msgpack_pack_str_with_body(&mp_pck, error_msg, flb_sds_len(error_msg)); flb_sds_destroy(error_msg); } else { - msgpack_pack_str_with_body(&mp_pck, "unknown error", strlen("unknown error")); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_UNKNOWNERROR, + HTTP_RESULT_UNKNOWNERROR_LEN); } } else { msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str_with_body(&mp_pck, "status", strlen("status")); - msgpack_pack_str_with_body(&mp_pck, "ok", strlen("ok")); + msgpack_pack_str_with_body(&mp_pck, HTTP_FIELD_STATUS, HTTP_FIELD_STATUS_LEN); + msgpack_pack_str_with_body(&mp_pck, HTTP_RESULT_OK, HTTP_RESULT_OK_LEN); } /* Export to JSON */ @@ -609,7 +658,9 @@ static void cb_traces(mk_request_t *request, void *data) /* Perform registration */ int api_v1_trace(struct flb_hs *hs) { - mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs); - mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs); + if (hs->config->enable_chunk_trace == FLB_TRUE) { + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/traces/", cb_traces, hs); + mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/trace/*", cb_trace, hs); + } return 0; } diff --git a/tests/runtime/data/stackdriver/stackdriver_test_payload.h b/tests/runtime/data/stackdriver/stackdriver_test_payload.h new file mode 100644 index 00000000000..75b771f449e --- /dev/null +++ b/tests/runtime/data/stackdriver/stackdriver_test_payload.h @@ -0,0 +1,26 @@ +#define STRING_TEXT_PAYLOAD "[" \ + "1595349600," \ + "{" \ + "\"message\": \"The application errored out\"," \ + "\"logging.googleapis.com/severity\": \"ERROR\"" \ + "}]" + +#define STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \ + "1595349600," \ + "{" \ + "\"message\": \"The application errored out\"," \ + "\"logging.googleapis.com/severity\": \"ERROR\"," \ + "\"errorCode\": \"400\"" \ + "}]" + +#define NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \ + "1595349600," \ + "{" \ + "\"message\": " \ + "{" \ + "\"application_name\": \"my_application\"," \ + "\"error_message\": \"The application errored out\"," \ + "}," \ + "\"logging.googleapis.com/severity\": \"ERROR\"," \ + "\"errorCode\": \"400\"" \ + "}]" diff --git a/tests/runtime/out_stackdriver.c b/tests/runtime/out_stackdriver.c index 585e734478d..2bd79f31aaf 100644 --- a/tests/runtime/out_stackdriver.c +++ b/tests/runtime/out_stackdriver.c @@ -44,7 +44,7 @@ #include "data/stackdriver/stackdriver_test_http_request.h" #include "data/stackdriver/stackdriver_test_timestamp.h" #include "data/stackdriver/stackdriver_test_monitored_resource.h" - +#include "data/stackdriver/stackdriver_test_payload.h" /* * Fluent Bit Stackdriver plugin, always set as payload a JSON strings contained in a @@ -2292,6 +2292,85 @@ static void cb_check_timestamp_format_duo_fields_incorrect_type(void *ctx, int f flb_sds_destroy(res_data); } +static void cb_check_string_text_payload_with_matched_text_payload_key(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to textPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['textPayload']", "The application errored out"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_string_text_payload_with_mismatched_text_payload_key(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_string_text_payload_with_residual_fields(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['errorCode']", "400"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_non_scalar_payload_with_residual_fields(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['application_name']", "my_application"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['errorCode']", "400"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + void flb_test_monitored_resource_common() { int ret; @@ -6294,6 +6373,170 @@ void flb_test_timestamp_format_duo_fields_incorrect_type() flb_destroy(ctx); } +void flb_test_string_text_payload_with_matched_text_payload_key() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_matched_text_payload_key, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_string_text_payload_with_mismatched_text_payload_key() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "msg", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_mismatched_text_payload_key, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_string_text_payload_with_residual_fields() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_residual_fields, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_non_scalar_payload_with_residual_fields() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_non_scalar_payload_with_residual_fields, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"severity_multi_entries", flb_test_multi_entries_severity }, @@ -6424,5 +6667,9 @@ TEST_LIST = { {"timestamp_format_duo_fields_missing_nanos", flb_test_timestamp_format_duo_fields_missing_nanos}, {"timestamp_format_duo_fields_incorrect_type", flb_test_timestamp_format_duo_fields_incorrect_type}, + {"string_text_payload_with_matched_text_payload_key", flb_test_string_text_payload_with_matched_text_payload_key}, + {"string_text_payload_with_mismatched_text_payload_key", flb_test_string_text_payload_with_mismatched_text_payload_key}, + {"string_text_payload_with_residual_fields", flb_test_string_text_payload_with_residual_fields}, + {"non_scalar_payload_with_residual_fields", flb_test_non_scalar_payload_with_residual_fields}, {NULL, NULL} }; diff --git a/tests/runtime/processor_metrics_selector.c b/tests/runtime/processor_metrics_selector.c index f25e9751a58..78f64e83b14 100644 --- a/tests/runtime/processor_metrics_selector.c +++ b/tests/runtime/processor_metrics_selector.c @@ -549,6 +549,70 @@ void flb_test_selector_substring_exclude(void) flb_destroy(ctx); } +void flb_test_selector_can_modify_output(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/kubernetes/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "event_type", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "type", "metrics", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "interval_sec", "1", NULL); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, "match", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set(ctx, out_ffd, "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_output_set_processor(ctx, out_ffd, proc); + TEST_CHECK(ret == 0); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); +} #endif /* Test list */ @@ -560,6 +624,7 @@ TEST_LIST = { {"prefix_exclude", flb_test_selector_prefix_exclude}, {"substring_include", flb_test_selector_substring_include}, {"substring_exclude", flb_test_selector_substring_exclude}, + {"can_modify_output", flb_test_selector_can_modify_output}, #endif {NULL, NULL} };