diff --git a/CMakeLists.txt b/CMakeLists.txt index 51bd1c8d729..e52e3e55832 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -256,6 +256,8 @@ if(FLB_ALL) endif() if(FLB_DEV) + FLB_DEFINITION(FLB_HAVE_DEV) + set(FLB_DEBUG On) set(FLB_TRACE On) set(FLB_CHUNK_TRACE On) diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 243ed253da8..534538dd484 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -28,6 +28,8 @@ #include #include +#define HTTP_CLIENT_TEMPORARY_BUFFER_SIZE (1024 * 64) + #define HTTP_CLIENT_SUCCESS 0 #define HTTP_CLIENT_PROVIDER_ERROR -1 @@ -188,6 +190,7 @@ struct flb_http_client_ng { uint16_t port; uint64_t flags; int protocol_version; + cfl_sds_t temporary_buffer; int releasable; void *user_data; diff --git a/include/fluent-bit/flb_http_client_http2.h b/include/fluent-bit/flb_http_client_http2.h index dbf3b85a22b..eb6932aa569 100644 --- a/include/fluent-bit/flb_http_client_http2.h +++ b/include/fluent-bit/flb_http_client_http2.h @@ -22,6 +22,7 @@ #include #include +#include struct flb_http_client_session; diff --git a/include/fluent-bit/flb_http_common.h b/include/fluent-bit/flb_http_common.h index 9d4c593dcdd..f7f0d7df90d 100644 --- a/include/fluent-bit/flb_http_common.h +++ b/include/fluent-bit/flb_http_common.h @@ -88,6 +88,7 @@ struct flb_http_server_session; struct flb_http_request { int protocol_version; + cfl_sds_t authority; int method; cfl_sds_t path; cfl_sds_t host; diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index ff36995bc56..ae614a9c575 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -258,7 +258,8 @@ int opentelemetry_post(struct opentelemetry_context *ctx, return FLB_RETRY; } - if (request->protocol_version == HTTP_PROTOCOL_VERSION_20) { + if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 && + ctx->enable_grpc_flag) { grpc_body = cfl_sds_create_size(body_len + 5); if (grpc_body == NULL) { @@ -269,7 +270,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, wire_message_length = (uint32_t) body_len; - cfl_sds_cat(grpc_body, "\x00----", 5); + sds_result = cfl_sds_cat(grpc_body, "\x00----", 5); if (sds_result == NULL) { flb_http_client_request_destroy(request, FLB_TRUE); @@ -376,17 +377,24 @@ int opentelemetry_post(struct opentelemetry_context *ctx, * - 205: Reset content * */ + if (response->status < 200 || response->status > 205) { if (ctx->log_response_payload && response->body != NULL && cfl_sds_len(response->body) > 0) { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s", - ctx->host, ctx->port, - response->status, response->body); + flb_plg_error(ctx->ins, + "%s:%i, HTTP status=%i\n%s", + ctx->host, + ctx->port, + response->status, + response->body); } else { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i", - ctx->host, ctx->port, response->status); + flb_plg_error(ctx->ins, + "%s:%i, HTTP status=%i", + ctx->host, + ctx->port, + response->status); } out_ret = FLB_RETRY; @@ -699,6 +707,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_http2), "Enable, disable or force HTTP/2 usage. Accepted values : on, off, force" }, + { + FLB_CONFIG_MAP_BOOL, "grpc", "off", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_grpc_flag), + "Enable, disable or force gRPC usage. Accepted values : on, off, auto" + }, { FLB_CONFIG_MAP_STR, "proxy", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index cb25e6da055..1041409e0ca 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -46,6 +46,7 @@ struct opentelemetry_body_key { struct opentelemetry_context { int enable_http2_flag; char *enable_http2; + int enable_grpc_flag; /* HTTP Auth */ char *http_user; diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 1dad0cba159..b22926b7be2 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -1510,6 +1510,12 @@ int flb_http_client_ng_init(struct flb_http_client_ng *client, { memset(client, 0, sizeof(struct flb_http_client_ng)); + client->temporary_buffer = cfl_sds_create_size(HTTP_CLIENT_TEMPORARY_BUFFER_SIZE); + + if (client->temporary_buffer == NULL) { + return -1; + } + client->protocol_version = protocol_version; client->upstream_ha = upstream_ha; client->upstream = upstream; @@ -1583,6 +1589,12 @@ void flb_http_client_ng_destroy(struct flb_http_client_ng *client) FLB_LOCK_INFINITE_RETRY_LIMIT, FLB_LOCK_DEFAULT_RETRY_DELAY); + if (client->temporary_buffer != NULL) { + cfl_sds_destroy(client->temporary_buffer); + + client->temporary_buffer = NULL; + } + cfl_list_foreach_safe(iterator, iterator_backup, &client->sessions) { @@ -1701,6 +1713,7 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl int protocol_version; struct flb_upstream_node *upstream_node; struct flb_connection *connection; + struct flb_upstream *upstream; struct flb_http_client_session *session; const char *alpn; @@ -1711,11 +1724,15 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl return NULL; } + upstream = upstream_node->u; + connection = flb_upstream_conn_get(upstream_node->u); } else { upstream_node = NULL; + upstream = client->upstream; + connection = flb_upstream_conn_get(client->upstream); } @@ -1747,6 +1764,10 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl protocol_version = HTTP_PROTOCOL_VERSION_11; } + if (protocol_version == HTTP_PROTOCOL_VERSION_20) { + flb_stream_disable_keepalive(&upstream->base); + } + session = flb_http_client_session_create(client, protocol_version, connection); if (session == NULL) { @@ -1932,20 +1953,20 @@ struct flb_http_response *flb_http_client_request_execute(struct flb_http_reques static int flb_http_client_session_read(struct flb_http_client_session *session) { - unsigned char input_buffer[1024 * 65]; ssize_t result; result = flb_io_net_read(session->connection, - (void *) &input_buffer, - sizeof(input_buffer)); + (void *) session->parent->temporary_buffer, + cfl_sds_avail(session->parent->temporary_buffer)); if (result <= 0) { return -1; } - result = (ssize_t) flb_http_client_session_ingest(session, - input_buffer, - result); + result = (ssize_t) flb_http_client_session_ingest( + session, + (unsigned char *) session->parent->temporary_buffer, + result); if (result < 0) { return -2; diff --git a/src/flb_http_client_http2.c b/src/flb_http_client_http2.c index d2d074cc23e..de22b62d151 100644 --- a/src/flb_http_client_http2.c +++ b/src/flb_http_client_http2.c @@ -19,6 +19,7 @@ #define _GNU_SOURCE #include +#include #include #include @@ -285,27 +286,24 @@ static int http2_data_chunk_recv_callback(nghttp2_session *inner_session, return -1; } - memcpy(stream->response.body, data, len); + cfl_sds_set_len(stream->response.body, 0); - cfl_sds_set_len(stream->response.body, len); - - stream->response.body_read_offset = len; + stream->response.body_read_offset = 0; } - else { - resized_buffer = cfl_sds_cat(stream->response.body, - (const char *) data, - len); - if (resized_buffer == NULL) { - stream->status = HTTP_STREAM_STATUS_ERROR; + resized_buffer = cfl_sds_cat(stream->response.body, + (const char *) data, + len); - return -1; - } + if (resized_buffer == NULL) { + stream->status = HTTP_STREAM_STATUS_ERROR; - stream->response.body = resized_buffer; - stream->response.body_read_offset += len; + return -1; } + stream->response.body = resized_buffer; + stream->response.body_read_offset += len; + if (stream->status == HTTP_STREAM_STATUS_RECEIVING_DATA) { if (stream->response.content_length >= stream->response.body_read_offset) { @@ -387,7 +385,7 @@ static ssize_t http2_data_source_read_callback(nghttp2_session *session, int flb_http2_client_session_init(struct flb_http2_client_session *session) { - nghttp2_settings_entry session_settings[1]; + nghttp2_settings_entry session_settings[3]; nghttp2_session_callbacks *callbacks; int result; @@ -422,10 +420,17 @@ int flb_http2_client_session_init(struct flb_http2_client_session *session) session_settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; session_settings[0].value = 1; + session_settings[1].settings_id = NGHTTP2_SETTINGS_MAX_FRAME_SIZE; + session_settings[1].value = cfl_sds_alloc(session->parent->parent->temporary_buffer); + + session_settings[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; + session_settings[2].value = 0; + + result = nghttp2_submit_settings(session->inner_session, NGHTTP2_FLAG_NONE, session_settings, - 1); + 3); if (result != 0) { return -3; @@ -480,6 +485,7 @@ int flb_http2_request_begin(struct flb_http_request *request) int flb_http2_request_commit(struct flb_http_request *request) { struct flb_http_client_session *parent_session; + cfl_sds_t sds_result; struct flb_http2_client_session *session; struct flb_http_stream *stream; int result; @@ -517,10 +523,10 @@ int flb_http2_request_commit(struct flb_http_request *request) } if (parent_session->connection->tls_session != NULL) { - scheme_as_text = "HTTPS"; + scheme_as_text = "https"; } else { - scheme_as_text = "HTTP"; + scheme_as_text = "http"; } switch (request->method) { @@ -554,6 +560,22 @@ int flb_http2_request_commit(struct flb_http_request *request) return -1; } + if (request->authority == NULL) { + request->authority = cfl_sds_create(request->host); + + if (request->authority == NULL) { + return -1; + } + + sds_result = cfl_sds_printf(&request->authority, + ":%u", + request->port); + + if (sds_result == NULL) { + return -1; + } + } + header_count = request->headers->total_count + 7; headers = flb_calloc(header_count, sizeof(nghttp2_nv)); @@ -580,8 +602,8 @@ int flb_http2_request_commit(struct flb_http_request *request) headers[header_index].name = (uint8_t *) ":authority"; headers[header_index].namelen = strlen(":authority"); - headers[header_index].value = (uint8_t *) request->host; - headers[header_index].valuelen = strlen(request->host); + headers[header_index].value = (uint8_t *) request->authority; + headers[header_index].valuelen = strlen(request->authority); header_index++; diff --git a/src/flb_http_common.c b/src/flb_http_common.c index b747a3a4895..a5c65b50ffd 100644 --- a/src/flb_http_common.c +++ b/src/flb_http_common.c @@ -150,6 +150,10 @@ struct flb_http_request *flb_http_request_create() void flb_http_request_destroy(struct flb_http_request *request) { + if (request->authority != NULL) { + cfl_sds_destroy(request->authority); + } + if (request->path != NULL) { cfl_sds_destroy(request->path); } @@ -654,6 +658,12 @@ int flb_http_request_set_url(struct flb_http_request *request, int flb_http_request_set_uri(struct flb_http_request *request, char *uri) { + if (request->path != NULL) { + cfl_sds_destroy(request->path); + + request->path = NULL; + } + request->path = cfl_sds_create(uri); if (request->path == NULL) { @@ -666,6 +676,12 @@ int flb_http_request_set_uri(struct flb_http_request *request, int flb_http_request_set_query_string(struct flb_http_request *request, char *query_string) { + if (request->query_string != NULL) { + cfl_sds_destroy(request->query_string); + + request->query_string = NULL; + } + request->query_string = cfl_sds_create(query_string); if (request->query_string == NULL) { @@ -678,6 +694,12 @@ int flb_http_request_set_query_string(struct flb_http_request *request, int flb_http_request_set_content_type(struct flb_http_request *request, char *content_type) { + if (request->content_type != NULL) { + cfl_sds_destroy(request->content_type); + + request->content_type = NULL; + } + request->content_type = cfl_sds_create(content_type); if (request->content_type == NULL) { @@ -690,6 +712,12 @@ int flb_http_request_set_content_type(struct flb_http_request *request, int flb_http_request_set_user_agent(struct flb_http_request *request, char *user_agent) { + if (request->user_agent != NULL) { + cfl_sds_destroy(request->user_agent); + + request->user_agent = NULL; + } + request->user_agent = cfl_sds_create(user_agent); if (request->user_agent == NULL) { diff --git a/src/tls/openssl.c b/src/tls/openssl.c index f6d54460f36..fca99c3542b 100644 --- a/src/tls/openssl.c +++ b/src/tls/openssl.c @@ -17,7 +17,11 @@ * limitations under the License. */ +#include +#include + #include +#include #include #include #include @@ -76,6 +80,7 @@ static int tls_init(void) SSL_load_error_strings(); SSL_library_init(); #endif + return 0; } @@ -134,10 +139,15 @@ static void tls_context_destroy(void *ctx_backend) struct tls_context *ctx = ctx_backend; pthread_mutex_lock(&ctx->mutex); + SSL_CTX_free(ctx->ctx); + if (ctx->alpn != NULL) { flb_free(ctx->alpn); + + ctx->alpn = NULL; } + pthread_mutex_unlock(&ctx->mutex); flb_free(ctx); @@ -438,7 +448,7 @@ static int macos_load_system_certificates(struct tls_context *ctx) } CFRelease(certs); - flb_debug("[tls] finished loading keychain certificates, total loaded: %d", loaded_cert_count); + flb_debug("[tls] finished loading keychain certificates, total loaded: %lu", loaded_cert_count); return 0; } #endif @@ -448,6 +458,9 @@ static int load_system_certificates(struct tls_context *ctx) int ret; const char *ca_file = FLB_DEFAULT_SEARCH_CA_BUNDLE; + (void) ret; + (void) ca_file; + /* For Windows use specific API to read the certs store */ #ifdef _MSC_VER return windows_load_system_certificates(ctx); @@ -467,6 +480,33 @@ static int load_system_certificates(struct tls_context *ctx) #endif } +#ifdef FLB_HAVE_DEV +/* This is not thread safe */ +static void ssl_key_logger(const SSL *ssl, const char *line) +{ + char *key_log_filename; + FILE *key_log_file; + + key_log_filename = getenv("SSLKEYLOGFILE"); + + if (key_log_filename == NULL) { + return; + } + + key_log_file = fopen(key_log_filename, "a"); + + if (key_log_file == NULL) { + return; + } + + setvbuf(key_log_file, NULL, 0, _IOLBF); + + fprintf(key_log_file, "%s\n", line); + + fclose(key_log_file); +} +#endif + static void *tls_context_create(int verify, int debug, int mode, @@ -481,6 +521,7 @@ static void *tls_context_create(int verify, SSL_CTX *ssl_ctx; struct tls_context *ctx; char err_buf[256]; + char *key_log_filename; /* * Init library ? based in the documentation on OpenSSL >= 1.1.0 is not longer @@ -523,6 +564,16 @@ static void *tls_context_create(int verify, flb_errno(); return NULL; } + +#ifdef FLB_HAVE_DEV + key_log_filename = getenv("SSLKEYLOGFILE"); + + if (key_log_filename != NULL) { + SSL_CTX_set_keylog_callback(ssl_ctx, ssl_key_logger); + } +#endif + + ctx->ctx = ssl_ctx; ctx->mode = mode; ctx->alpn = NULL;