From b5f226703a6f9ceb4c745ebb2eac9cda2c6c41de Mon Sep 17 00:00:00 2001 From: ryanohnemus Date: Wed, 20 Dec 2023 15:19:47 -0600 Subject: [PATCH] http_client: Add Ability to Process Chunked Stream Current flb_http_do processes chunked streams, but requires that all chunks are received before allowing interaction with the response payload. This change allows a user to only initiate the http request with flb_http_do_request and then process the live stream of data by fetching available chunks with flb_http_get_available_chunks. Signed-off-by: ryanohnemus --- include/fluent-bit/flb_http_client.h | 5 +- src/flb_http_client.c | 116 ++++++++++++++---- tests/include/aws_client_mock_client_resp.def | 2 - 3 files changed, 92 insertions(+), 31 deletions(-) diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 56e7448e4fa..c79c988e0f5 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -52,6 +52,7 @@ #define FLB_HTTP_MORE 0 #define FLB_HTTP_OK 1 #define FLB_HTTP_NOT_FOUND 2 /* header not found */ +#define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */ /* Useful headers */ #define FLB_HTTP_HEADER_AUTH "Authorization" @@ -66,8 +67,6 @@ struct flb_http_response { int content_length; /* Content length set by headers */ int chunked_encoding; /* Chunked transfer encoding ? */ int connection_close; /* connection: close ? */ - long chunked_cur_size; - long chunked_exp_size; /* expected chunked size */ char *chunk_processed_end; /* Position to mark last chunk */ char *headers_end; /* Headers end (\r\n\r\n) */ @@ -162,6 +161,8 @@ int flb_http_set_content_encoding_gzip(struct flb_http_client *c); int flb_http_set_callback_context(struct flb_http_client *c, struct flb_callback *cb_ctx); +int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed); +int flb_http_do_request(struct flb_http_client *c, size_t *bytes); int flb_http_do(struct flb_http_client *c, size_t *bytes); int flb_http_client_proxy_connect(struct flb_connection *u_conn); void flb_http_client_destroy(struct flb_http_client *c); diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 404df849a3a..b450121487b 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -270,7 +270,8 @@ static int process_chunked_data(struct flb_http_client *c) long val; char *p; char tmp[32]; - struct flb_http_response *r = &c->resp; + struct flb_http_response *r = &c->resp; + int found_full_chunk = FLB_FALSE; chunk_start: p = strstr(r->chunk_processed_end, "\r\n"); @@ -327,6 +328,7 @@ static int process_chunked_data(struct flb_http_client *c) * 3. remove chunk ending \r\n */ + found_full_chunk = FLB_TRUE; /* 1. Validate ending chunk */ if (val - 2 == 0) { /* @@ -365,10 +367,11 @@ static int process_chunked_data(struct flb_http_client *c) /* Always append a NULL byte */ r->data[r->data_len] = '\0'; + /* Always update payload size after full chunk */ + r->payload_size = r->data_len - (r->headers_end - r->data); + /* Is this the last chunk ? */ if ((val - 2 == 0)) { - /* Update payload size */ - r->payload_size = r->data_len - (r->headers_end - r->data); return FLB_HTTP_OK; } @@ -378,7 +381,10 @@ static int process_chunked_data(struct flb_http_client *c) goto chunk_start; } - return FLB_HTTP_MORE; + if (found_full_chunk == FLB_TRUE) { + return FLB_HTTP_CHUNK_AVAILABLE; + } + return FLB_HTTP_MORE; } static int process_data(struct flb_http_client *c) @@ -460,8 +466,8 @@ static int process_data(struct flb_http_client *c) if (ret == FLB_HTTP_ERROR) { return FLB_HTTP_ERROR; } - else if (ret == FLB_HTTP_OK) { - return FLB_HTTP_OK; + else if (ret == FLB_HTTP_OK || ret == FLB_HTTP_CHUNK_AVAILABLE) { + return ret; } } else { @@ -1173,15 +1179,16 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token) return result; } - -int flb_http_do(struct flb_http_client *c, size_t *bytes) +/* flb_http_do_request only sends the http request the data. +* This is useful for processing the chunked responses on your own. +* If you do not want to process the response on your own or expect +* all response data before you process data, use flb_http_do instead. +*/ +int flb_http_do_request(struct flb_http_client *c, size_t *bytes) { int ret; - int r_bytes; int crlf = 2; int new_size; - ssize_t available; - size_t out_size; size_t bytes_header = 0; size_t bytes_body = 0; char *tmp; @@ -1192,7 +1199,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) /* Append pending headers */ ret = http_headers_compose(c); if (ret == -1) { - return -1; + return FLB_HTTP_ERROR; } /* check enough space for the ending CRLF */ @@ -1201,7 +1208,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) tmp = flb_realloc(c->header_buf, new_size); if (!tmp) { flb_errno(); - return -1; + return FLB_HTTP_ERROR; } c->header_buf = tmp; c->header_size = new_size; @@ -1230,7 +1237,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) if (errno != 0) { flb_errno(); } - return -1; + return FLB_HTTP_ERROR; } if (c->body_len > 0) { @@ -1239,16 +1246,55 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) &bytes_body); if (ret == -1) { flb_errno(); - return -1; + return FLB_HTTP_ERROR; } } /* number of sent bytes */ *bytes = (bytes_header + bytes_body); - /* Read the server response, we need at least 19 bytes */ + /* prep c->resp for incoming data */ c->resp.data_len = 0; - while (1) { + + /* at this point we've sent our request so we expect more data in response*/ + return FLB_HTTP_MORE; +} + +int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed) +{ + /* returns + * FLB_HTTP_MORE - if no chunks are ready and we need to fetch more data + * FLB_HTTP_CHUNK_AVAILABLE - if one or more chunks have been found and + * it is not the end of the stream, meaning more data will come + * FLB_HTTP_OK - if the 'end' chunk was found signifying end of the stream + * if there is still chunked data to process at the end of + * stream, it will be contained in the response payload + * FLB_HTTP_ERROR - for any error + */ + int ret = FLB_HTTP_MORE; + int r_bytes; + ssize_t available; + size_t out_size; + + // if the caller has consumed some of the payload (via bytes_consumed) + // we consume those bytes off the payload + if( bytes_consumed > 0 ) { + if(bytes_consumed > c->resp.payload_size) { + flb_error("[http_client] attempting to consume more bytes than " + "available. Attempted bytes_consumed=%zu payload_size=%zu ", + bytes_consumed, + c->resp.payload_size); + return FLB_HTTP_ERROR; + } + + c->resp.payload_size -= bytes_consumed; + c->resp.data_len -= bytes_consumed; + memmove(c->resp.payload, c->resp.payload+bytes_consumed, c->resp.payload_size); + c->resp.chunk_processed_end = c->resp.payload+c->resp.payload_size; + c->resp.data[c->resp.data_len] = '\0'; + } + + while (ret == FLB_HTTP_MORE) { available = flb_http_buffer_available(c) - 1; if (available <= 1) { /* @@ -1267,7 +1313,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) c->resp.data_size + FLB_HTTP_DATA_CHUNK, c->resp.data_size_max); flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); - return 0; + return FLB_HTTP_ERROR; } available = flb_http_buffer_available(c) - 1; } @@ -1277,7 +1323,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) available); if (r_bytes <= 0) { if (c->flags & FLB_HTTP_10) { - break; + return FLB_HTTP_OK; } } @@ -1293,23 +1339,39 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) c->u_conn->upstream->tcp_host, c->u_conn->upstream->tcp_port, c->u_conn->fd); - return -1; - } - else if (ret == FLB_HTTP_OK) { - break; - } - else if (ret == FLB_HTTP_MORE) { - continue; + return FLB_HTTP_ERROR; } } else { flb_error("[http_client] broken connection to %s:%i ?", c->u_conn->upstream->tcp_host, c->u_conn->upstream->tcp_port); - return -1; + return FLB_HTTP_ERROR; } } + return ret; +} + +int flb_http_do(struct flb_http_client *c, size_t *bytes) +{ + int ret; + + ret = flb_http_do_request(c, bytes); + if (ret != 0) { + return ret; + } + + /* Read the server response, we need at least 19 bytes */ + while (ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) { + /* flb_http_do does not consume any bytes during processing + * so we always pass 0 consumed_bytes because we fetch until + * the end chunk before returning to the caller + */ + + ret = flb_http_get_response_data(c, 0); + } + /* Check 'Connection' response header */ ret = check_connection(c); if (ret == FLB_HTTP_OK) { diff --git a/tests/include/aws_client_mock_client_resp.def b/tests/include/aws_client_mock_client_resp.def index 225cc2ea18f..6f06f8a972e 100644 --- a/tests/include/aws_client_mock_client_resp.def +++ b/tests/include/aws_client_mock_client_resp.def @@ -11,8 +11,6 @@ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(status, STATUS, EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(content_length, CONTENT_LENGTH, T_INT)) /* Content length set by headers */ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_encoding, CHUNKED_ENCODEING, T_INT)) /* Chunked transfer encoding ? */ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(connection_close, CONNECTION_CLOSE, T_INT)) /* connection: close ? */ -EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_cur_size, CHUNKED_CUR_SIZE, T_LONG)) -EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_exp_size, CHUNKED_EXP_SIZE, T_LONG)) /* expected chunked size */ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunk_processed_end, CHUNK_PROCESSED_END, T_CHAR_STAR)) /* Position to mark last chunk */ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(headers_end, HEADERS_END, T_CHAR_STAR)) /* Headers end (\r\n\r\n) */