Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_client: Add Ability to Process Http Chunked Stream #8316

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#define FLB_HTTP_MORE 0
#define FLB_HTTP_OK 1
#define FLB_HTTP_NOT_FOUND 2 /* header not found */
#define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */

/* Useful headers */
#define FLB_HTTP_HEADER_AUTH "Authorization"
Expand All @@ -66,8 +67,6 @@ struct flb_http_client_response {
int content_length; /* Content length set by headers */
int chunked_encoding; /* Chunked transfer encoding ? */
int connection_close; /* connection: close ? */
long chunked_cur_size;
long chunked_exp_size; /* expected chunked size */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were completely unused excluding in a mocked aws test, so I cleaned this up.

char *chunk_processed_end; /* Position to mark last chunk */
char *headers_end; /* Headers end (\r\n\r\n) */

Expand Down Expand Up @@ -162,6 +161,8 @@ int flb_http_set_content_encoding_gzip(struct flb_http_client *c);
int flb_http_set_callback_context(struct flb_http_client *c,
struct flb_callback *cb_ctx);

int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed);
int flb_http_do_request(struct flb_http_client *c, size_t *bytes);
int flb_http_do(struct flb_http_client *c, size_t *bytes);
int flb_http_client_proxy_connect(struct flb_connection *u_conn);
void flb_http_client_destroy(struct flb_http_client *c);
Expand Down
116 changes: 90 additions & 26 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,10 @@ static int process_chunked_data(struct flb_http_client *c)
long val;
char *p;
char tmp[32];
int found_full_chunk = FLB_FALSE;
struct flb_http_client_response *r = &c->resp;


chunk_start:
p = strstr(r->chunk_processed_end, "\r\n");
if (!p) {
Expand Down Expand Up @@ -327,6 +329,7 @@ static int process_chunked_data(struct flb_http_client *c)
* 3. remove chunk ending \r\n
*/

found_full_chunk = FLB_TRUE;
/* 1. Validate ending chunk */
if (val - 2 == 0) {
/*
Expand Down Expand Up @@ -365,10 +368,11 @@ static int process_chunked_data(struct flb_http_client *c)
/* Always append a NULL byte */
r->data[r->data_len] = '\0';

/* Always update payload size after full chunk */
r->payload_size = r->data_len - (r->headers_end - r->data);

/* Is this the last chunk ? */
if ((val - 2 == 0)) {
/* Update payload size */
r->payload_size = r->data_len - (r->headers_end - r->data);
return FLB_HTTP_OK;
}

Expand All @@ -378,7 +382,10 @@ static int process_chunked_data(struct flb_http_client *c)
goto chunk_start;
}

return FLB_HTTP_MORE;
if (found_full_chunk == FLB_TRUE) {
return FLB_HTTP_CHUNK_AVAILABLE;
}
return FLB_HTTP_MORE;
}

static int process_data(struct flb_http_client *c)
Expand Down Expand Up @@ -460,8 +467,8 @@ static int process_data(struct flb_http_client *c)
if (ret == FLB_HTTP_ERROR) {
return FLB_HTTP_ERROR;
}
else if (ret == FLB_HTTP_OK) {
return FLB_HTTP_OK;
else if (ret == FLB_HTTP_OK || ret == FLB_HTTP_CHUNK_AVAILABLE) {
return ret;
}
}
else {
Expand Down Expand Up @@ -1173,15 +1180,16 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token)
return result;
}


int flb_http_do(struct flb_http_client *c, size_t *bytes)
/* flb_http_do_request only sends the http request the data.
* This is useful for processing the chunked responses on your own.
* If you do not want to process the response on your own or expect
* all response data before you process data, use flb_http_do instead.
*/
int flb_http_do_request(struct flb_http_client *c, size_t *bytes)
{
int ret;
int r_bytes;
int crlf = 2;
int new_size;
ssize_t available;
size_t out_size;
size_t bytes_header = 0;
size_t bytes_body = 0;
char *tmp;
Expand All @@ -1192,7 +1200,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
/* Append pending headers */
ret = http_headers_compose(c);
if (ret == -1) {
return -1;
return FLB_HTTP_ERROR;
}

/* check enough space for the ending CRLF */
Expand All @@ -1201,7 +1209,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
tmp = flb_realloc(c->header_buf, new_size);
if (!tmp) {
flb_errno();
return -1;
return FLB_HTTP_ERROR;
}
c->header_buf = tmp;
c->header_size = new_size;
Expand Down Expand Up @@ -1230,7 +1238,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
if (errno != 0) {
flb_errno();
}
return -1;
return FLB_HTTP_ERROR;
}

if (c->body_len > 0) {
Expand All @@ -1239,16 +1247,56 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
&bytes_body);
if (ret == -1) {
flb_errno();
return -1;
return FLB_HTTP_ERROR;
}
}

/* number of sent bytes */
*bytes = (bytes_header + bytes_body);

/* Read the server response, we need at least 19 bytes */
/* prep c->resp for incoming data */
c->resp.data_len = 0;
while (1) {

/* at this point we've sent our request so we expect more data in response*/
return FLB_HTTP_MORE;
}

int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed)
{
/* returns
* FLB_HTTP_MORE - if we are waiting for more data to be received
* FLB_HTTP_CHUNK_AVAILABLE - if this is a chunked transfer and one or more chunks
* have been received and it is not the end of the stream
* FLB_HTTP_OK - if we have collected all response data and no errors were thrown
* (in chunked transfers this means we've received the end chunk
* and any remaining data to process from the end of stream, will be
* contained in the response payload)
* FLB_HTTP_ERROR - for any error
*/
int ret = FLB_HTTP_MORE;
int r_bytes;
ssize_t available;
size_t out_size;

// if the caller has consumed some of the payload (via bytes_consumed)
// we consume those bytes off the payload
if( bytes_consumed > 0 ) {
if(bytes_consumed > c->resp.payload_size) {
flb_error("[http_client] attempting to consume more bytes than "
"available. Attempted bytes_consumed=%zu payload_size=%zu ",
bytes_consumed,
c->resp.payload_size);
return FLB_HTTP_ERROR;
}

c->resp.payload_size -= bytes_consumed;
c->resp.data_len -= bytes_consumed;
memmove(c->resp.payload, c->resp.payload+bytes_consumed, c->resp.payload_size);
c->resp.chunk_processed_end = c->resp.payload+c->resp.payload_size;
c->resp.data[c->resp.data_len] = '\0';
}

while (ret == FLB_HTTP_MORE) {
available = flb_http_buffer_available(c) - 1;
if (available <= 1) {
/*
Expand All @@ -1267,7 +1315,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
c->resp.data_size + FLB_HTTP_DATA_CHUNK,
c->resp.data_size_max);
flb_upstream_conn_recycle(c->u_conn, FLB_FALSE);
return 0;
return FLB_HTTP_ERROR;
}
available = flb_http_buffer_available(c) - 1;
}
Expand All @@ -1277,7 +1325,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
available);
if (r_bytes <= 0) {
if (c->flags & FLB_HTTP_10) {
break;
return FLB_HTTP_OK;
}
}

Expand All @@ -1293,23 +1341,39 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
c->u_conn->upstream->tcp_host,
c->u_conn->upstream->tcp_port,
c->u_conn->fd);
return -1;
}
else if (ret == FLB_HTTP_OK) {
break;
}
else if (ret == FLB_HTTP_MORE) {
continue;
return FLB_HTTP_ERROR;
}
}
else {
flb_error("[http_client] broken connection to %s:%i ?",
c->u_conn->upstream->tcp_host,
c->u_conn->upstream->tcp_port);
return -1;
return FLB_HTTP_ERROR;
}
}

return ret;
}

int flb_http_do(struct flb_http_client *c, size_t *bytes)
{
int ret;

ret = flb_http_do_request(c, bytes);
if (ret != 0) {
return ret;
}

/* Read the server response, we need at least 19 bytes */
while (ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) {
/* flb_http_do does not consume any bytes during processing
* so we always pass 0 consumed_bytes because we fetch until
* the end chunk before returning to the caller
*/

ret = flb_http_get_response_data(c, 0);
}

/* Check 'Connection' response header */
ret = check_connection(c);
if (ret == FLB_HTTP_OK) {
Expand Down
2 changes: 0 additions & 2 deletions tests/include/aws_client_mock_client_resp.def
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(status, STATUS,
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(content_length, CONTENT_LENGTH, T_INT)) /* Content length set by headers */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_encoding, CHUNKED_ENCODEING, T_INT)) /* Chunked transfer encoding ? */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(connection_close, CONNECTION_CLOSE, T_INT)) /* connection: close ? */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_cur_size, CHUNKED_CUR_SIZE, T_LONG))
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_exp_size, CHUNKED_EXP_SIZE, T_LONG)) /* expected chunked size */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunk_processed_end, CHUNK_PROCESSED_END, T_CHAR_STAR)) /* Position to mark last chunk */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(headers_end, HEADERS_END, T_CHAR_STAR)) /* Headers end (\r\n\r\n) */

Expand Down
Loading