Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_client: assorted HTTP/2 related fixes #9645

Merged
merged 9 commits into from
Nov 25, 2024
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ if(FLB_ALL)
endif()

if(FLB_DEV)
FLB_DEFINITION(FLB_HAVE_DEV)

set(FLB_DEBUG On)
set(FLB_TRACE On)
set(FLB_CHUNK_TRACE On)
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <fluent-bit/flb_http_client_http1.h>
#include <fluent-bit/flb_http_client_http2.h>

#define HTTP_CLIENT_TEMPORARY_BUFFER_SIZE (1024 * 64)

#define HTTP_CLIENT_SUCCESS 0
#define HTTP_CLIENT_PROVIDER_ERROR -1

Expand Down Expand Up @@ -188,6 +190,7 @@ struct flb_http_client_ng {
uint16_t port;
uint64_t flags;
int protocol_version;
cfl_sds_t temporary_buffer;

int releasable;
void *user_data;
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_http_client_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fluent-bit/flb_http_common.h>
#include <nghttp2/nghttp2.h>
#include <nghttp2/nghttp2.h>

struct flb_http_client_session;

Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct flb_http_server_session;

struct flb_http_request {
int protocol_version;
cfl_sds_t authority;
int method;
cfl_sds_t path;
cfl_sds_t host;
Expand Down
27 changes: 20 additions & 7 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
return FLB_RETRY;
}

if (request->protocol_version == HTTP_PROTOCOL_VERSION_20) {
if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 &&
ctx->enable_grpc_flag) {
grpc_body = cfl_sds_create_size(body_len + 5);

if (grpc_body == NULL) {
Expand All @@ -269,7 +270,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx,

wire_message_length = (uint32_t) body_len;

cfl_sds_cat(grpc_body, "\x00----", 5);
sds_result = cfl_sds_cat(grpc_body, "\x00----", 5);

if (sds_result == NULL) {
flb_http_client_request_destroy(request, FLB_TRUE);
Expand Down Expand Up @@ -376,17 +377,24 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
* - 205: Reset content
*
*/

if (response->status < 200 || response->status > 205) {
if (ctx->log_response_payload &&
response->body != NULL &&
cfl_sds_len(response->body) > 0) {
flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s",
ctx->host, ctx->port,
response->status, response->body);
flb_plg_error(ctx->ins,
"%s:%i, HTTP status=%i\n%s",
ctx->host,
ctx->port,
response->status,
response->body);
}
else {
flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i",
ctx->host, ctx->port, response->status);
flb_plg_error(ctx->ins,
"%s:%i, HTTP status=%i",
ctx->host,
ctx->port,
response->status);
}

out_ret = FLB_RETRY;
Expand Down Expand Up @@ -699,6 +707,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_http2),
"Enable, disable or force HTTP/2 usage. Accepted values : on, off, force"
},
{
FLB_CONFIG_MAP_BOOL, "grpc", "off",
0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_grpc_flag),
"Enable, disable or force gRPC usage. Accepted values : on, off, auto"
},
{
FLB_CONFIG_MAP_STR, "proxy", NULL,
0, FLB_FALSE, 0,
Expand Down
1 change: 1 addition & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct opentelemetry_body_key {
struct opentelemetry_context {
int enable_http2_flag;
char *enable_http2;
int enable_grpc_flag;

/* HTTP Auth */
char *http_user;
Expand Down
33 changes: 27 additions & 6 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,12 @@ int flb_http_client_ng_init(struct flb_http_client_ng *client,
{
memset(client, 0, sizeof(struct flb_http_client_ng));

client->temporary_buffer = cfl_sds_create_size(HTTP_CLIENT_TEMPORARY_BUFFER_SIZE);

if (client->temporary_buffer == NULL) {
return -1;
}

client->protocol_version = protocol_version;
client->upstream_ha = upstream_ha;
client->upstream = upstream;
Expand Down Expand Up @@ -1583,6 +1589,12 @@ void flb_http_client_ng_destroy(struct flb_http_client_ng *client)
FLB_LOCK_INFINITE_RETRY_LIMIT,
FLB_LOCK_DEFAULT_RETRY_DELAY);

if (client->temporary_buffer != NULL) {
cfl_sds_destroy(client->temporary_buffer);

client->temporary_buffer = NULL;
}

cfl_list_foreach_safe(iterator,
iterator_backup,
&client->sessions) {
Expand Down Expand Up @@ -1701,6 +1713,7 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl
int protocol_version;
struct flb_upstream_node *upstream_node;
struct flb_connection *connection;
struct flb_upstream *upstream;
struct flb_http_client_session *session;
const char *alpn;

Expand All @@ -1711,11 +1724,15 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl
return NULL;
}

upstream = upstream_node->u;

connection = flb_upstream_conn_get(upstream_node->u);
}
else {
upstream_node = NULL;

upstream = client->upstream;

connection = flb_upstream_conn_get(client->upstream);
}

Expand Down Expand Up @@ -1747,6 +1764,10 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl
protocol_version = HTTP_PROTOCOL_VERSION_11;
}

if (protocol_version == HTTP_PROTOCOL_VERSION_20) {
flb_stream_disable_keepalive(&upstream->base);
}

session = flb_http_client_session_create(client, protocol_version, connection);

if (session == NULL) {
Expand Down Expand Up @@ -1932,20 +1953,20 @@ struct flb_http_response *flb_http_client_request_execute(struct flb_http_reques

static int flb_http_client_session_read(struct flb_http_client_session *session)
{
unsigned char input_buffer[1024 * 65];
ssize_t result;

result = flb_io_net_read(session->connection,
(void *) &input_buffer,
sizeof(input_buffer));
(void *) session->parent->temporary_buffer,
cfl_sds_avail(session->parent->temporary_buffer));

if (result <= 0) {
return -1;
}

result = (ssize_t) flb_http_client_session_ingest(session,
input_buffer,
result);
result = (ssize_t) flb_http_client_session_ingest(
session,
(unsigned char *) session->parent->temporary_buffer,
result);

if (result < 0) {
return -2;
Expand Down
62 changes: 42 additions & 20 deletions src/flb_http_client_http2.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#define _GNU_SOURCE
#include <string.h>
#include <stdio.h>

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_kv.h>
Expand Down Expand Up @@ -285,27 +286,24 @@ static int http2_data_chunk_recv_callback(nghttp2_session *inner_session,
return -1;
}

memcpy(stream->response.body, data, len);
cfl_sds_set_len(stream->response.body, 0);

cfl_sds_set_len(stream->response.body, len);

stream->response.body_read_offset = len;
stream->response.body_read_offset = 0;
}
else {
resized_buffer = cfl_sds_cat(stream->response.body,
(const char *) data,
len);

if (resized_buffer == NULL) {
stream->status = HTTP_STREAM_STATUS_ERROR;
resized_buffer = cfl_sds_cat(stream->response.body,
(const char *) data,
len);

return -1;
}
if (resized_buffer == NULL) {
stream->status = HTTP_STREAM_STATUS_ERROR;

stream->response.body = resized_buffer;
stream->response.body_read_offset += len;
return -1;
}

stream->response.body = resized_buffer;
stream->response.body_read_offset += len;

if (stream->status == HTTP_STREAM_STATUS_RECEIVING_DATA) {
if (stream->response.content_length >=
stream->response.body_read_offset) {
Expand Down Expand Up @@ -387,7 +385,7 @@ static ssize_t http2_data_source_read_callback(nghttp2_session *session,

int flb_http2_client_session_init(struct flb_http2_client_session *session)
{
nghttp2_settings_entry session_settings[1];
nghttp2_settings_entry session_settings[3];
nghttp2_session_callbacks *callbacks;
int result;

Expand Down Expand Up @@ -422,10 +420,17 @@ int flb_http2_client_session_init(struct flb_http2_client_session *session)
session_settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
session_settings[0].value = 1;

session_settings[1].settings_id = NGHTTP2_SETTINGS_MAX_FRAME_SIZE;
session_settings[1].value = cfl_sds_alloc(session->parent->parent->temporary_buffer);

session_settings[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
session_settings[2].value = 0;


result = nghttp2_submit_settings(session->inner_session,
NGHTTP2_FLAG_NONE,
session_settings,
1);
3);

if (result != 0) {
return -3;
Expand Down Expand Up @@ -480,6 +485,7 @@ int flb_http2_request_begin(struct flb_http_request *request)
int flb_http2_request_commit(struct flb_http_request *request)
{
struct flb_http_client_session *parent_session;
cfl_sds_t sds_result;
struct flb_http2_client_session *session;
struct flb_http_stream *stream;
int result;
Expand Down Expand Up @@ -517,10 +523,10 @@ int flb_http2_request_commit(struct flb_http_request *request)
}

if (parent_session->connection->tls_session != NULL) {
scheme_as_text = "HTTPS";
scheme_as_text = "https";
}
else {
scheme_as_text = "HTTP";
scheme_as_text = "http";
}

switch (request->method) {
Expand Down Expand Up @@ -554,6 +560,22 @@ int flb_http2_request_commit(struct flb_http_request *request)
return -1;
}

if (request->authority == NULL) {
request->authority = cfl_sds_create(request->host);

if (request->authority == NULL) {
return -1;
}

sds_result = cfl_sds_printf(&request->authority,
":%u",
request->port);

if (sds_result == NULL) {
return -1;
}
}

header_count = request->headers->total_count + 7;

headers = flb_calloc(header_count, sizeof(nghttp2_nv));
Expand All @@ -580,8 +602,8 @@ int flb_http2_request_commit(struct flb_http_request *request)

headers[header_index].name = (uint8_t *) ":authority";
headers[header_index].namelen = strlen(":authority");
headers[header_index].value = (uint8_t *) request->host;
headers[header_index].valuelen = strlen(request->host);
headers[header_index].value = (uint8_t *) request->authority;
headers[header_index].valuelen = strlen(request->authority);

header_index++;

Expand Down
28 changes: 28 additions & 0 deletions src/flb_http_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ struct flb_http_request *flb_http_request_create()

void flb_http_request_destroy(struct flb_http_request *request)
{
if (request->authority != NULL) {
cfl_sds_destroy(request->authority);
}

if (request->path != NULL) {
cfl_sds_destroy(request->path);
}
Expand Down Expand Up @@ -654,6 +658,12 @@ int flb_http_request_set_url(struct flb_http_request *request,
int flb_http_request_set_uri(struct flb_http_request *request,
char *uri)
{
if (request->path != NULL) {
cfl_sds_destroy(request->path);

request->path = NULL;
}

request->path = cfl_sds_create(uri);

if (request->path == NULL) {
Expand All @@ -666,6 +676,12 @@ int flb_http_request_set_uri(struct flb_http_request *request,
int flb_http_request_set_query_string(struct flb_http_request *request,
char *query_string)
{
if (request->query_string != NULL) {
cfl_sds_destroy(request->query_string);

request->query_string = NULL;
}

request->query_string = cfl_sds_create(query_string);

if (request->query_string == NULL) {
Expand All @@ -678,6 +694,12 @@ int flb_http_request_set_query_string(struct flb_http_request *request,
int flb_http_request_set_content_type(struct flb_http_request *request,
char *content_type)
{
if (request->content_type != NULL) {
cfl_sds_destroy(request->content_type);

request->content_type = NULL;
}

request->content_type = cfl_sds_create(content_type);

if (request->content_type == NULL) {
Expand All @@ -690,6 +712,12 @@ int flb_http_request_set_content_type(struct flb_http_request *request,
int flb_http_request_set_user_agent(struct flb_http_request *request,
char *user_agent)
{
if (request->user_agent != NULL) {
cfl_sds_destroy(request->user_agent);

request->user_agent = NULL;
}

request->user_agent = cfl_sds_create(user_agent);

if (request->user_agent == NULL) {
Expand Down
Loading
Loading