Skip to content

Commit

Permalink
http_server: refactored compression code
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich committed Aug 26, 2024
1 parent 7f5d149 commit a025f3d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 215 deletions.
9 changes: 5 additions & 4 deletions include/fluent-bit/http_server/flb_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
#define HTTP_SERVER_MAXIMUM_BUFFER_SIZE (10 * (1000 * 1024))

#define FLB_HTTP_SERVER_FLAG_KEEPALIVE (((uint64_t) 1) << 0)
#define FLB_HTTP_SERVER_FLAG_AUTO_INFLATE (((uint64_t) 1) << 1)
#define FLB_HTTP_SERVER_FLAG_AUTO_DEFLATE (((uint64_t) 1) << 1)
#define FLB_HTTP_SERVER_FLAG_AUTO_INFLATE (((uint64_t) 1) << 2)

#define HTTP_SERVER_SUCCESS 0
#define HTTP_SERVER_PROVIDER_ERROR -1
Expand Down Expand Up @@ -100,7 +101,7 @@ struct flb_http_server_session {

/* HTTP SERVER */

int flb_http_server_init(struct flb_http_server *session,
int flb_http_server_init(struct flb_http_server *session,
int protocol_version,
uint64_t flags,
flb_http_server_request_processor_callback
Expand Down Expand Up @@ -128,8 +129,8 @@ struct flb_http_server_session *flb_http_server_session_create(int version);

void flb_http_server_session_destroy(struct flb_http_server_session *session);

int flb_http_server_session_ingest(struct flb_http_server_session *session,
unsigned char *buffer,
int flb_http_server_session_ingest(struct flb_http_server_session *session,
unsigned char *buffer,
size_t length);

#endif
218 changes: 7 additions & 211 deletions src/http_server/flb_http_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,6 @@

#include <fluent-bit/http_server/flb_http_server.h>

#include <fluent-bit/flb_snappy.h>
#include <fluent-bit/flb_gzip.h>

static \
int uncompress_zlib(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size);

static \
int uncompress_zstd(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size);

static \
int uncompress_deflate(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size);

static \
int uncompress_snappy(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size);

static \
int uncompress_gzip(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size);

/* COMMON */





/* PRIVATE */

Expand Down Expand Up @@ -128,101 +90,6 @@ static int flb_http_server_session_write(struct flb_http_server_session *session
return 0;
}



static int flb_http_server_inflate_request_body(
struct flb_http_request *request)
{
char *content_encoding_header_value;
char new_content_length[21];
struct flb_http_server_session *parent_session;
cfl_sds_t inflated_body;
char *output_buffer;
size_t output_size;
struct flb_http_server *server;
int result;

parent_session = (struct flb_http_server_session *) request->stream->parent;

server = parent_session->parent;
result = 0;

if (request->body == NULL) {
return 0;
}

if ((server->flags & FLB_HTTP_SERVER_FLAG_AUTO_INFLATE) == 0) {
return 0;
}

content_encoding_header_value = flb_http_request_get_header(
request,
"content-encoding");

if (content_encoding_header_value == NULL) {
return 0;
}

if (strncasecmp(content_encoding_header_value, "gzip", 4) == 0) {
result = uncompress_gzip(&output_buffer,
&output_size,
request->body,
cfl_sds_len(request->body));
}
else if (strncasecmp(content_encoding_header_value, "zlib", 4) == 0) {
result = uncompress_zlib(&output_buffer,
&output_size,
request->body,
cfl_sds_len(request->body));
}
else if (strncasecmp(content_encoding_header_value, "zstd", 4) == 0) {
result = uncompress_zstd(&output_buffer,
&output_size,
request->body,
cfl_sds_len(request->body));
}
else if (strncasecmp(content_encoding_header_value, "snappy", 6) == 0) {
result = uncompress_snappy(&output_buffer,
&output_size,
request->body,
cfl_sds_len(request->body));
}
else if (strncasecmp(content_encoding_header_value, "deflate", 4) == 0) {
result = uncompress_deflate(&output_buffer,
&output_size,
request->body,
cfl_sds_len(request->body));
}

if (result == 1) {
inflated_body = cfl_sds_create_len(output_buffer, output_size);

flb_free(output_buffer);

if (inflated_body == NULL) {
return -1;
}

cfl_sds_destroy(request->body);

request->body = inflated_body;

snprintf(new_content_length,
sizeof(new_content_length),
"%zu",
output_size);

flb_http_request_unset_header(request, "content-encoding");
flb_http_request_set_header(request,
"content-length", strlen("content-length"),
new_content_length, strlen(new_content_length));

request->content_length = output_size;
}

return 0;
}

static int flb_http_server_should_connection_be_closed(
struct flb_http_request *request)
{
Expand Down Expand Up @@ -331,12 +198,15 @@ static int flb_http_server_client_activity_event_handler(void *data)
request->content_length = cfl_sds_len(request->body);
}

result = flb_http_server_inflate_request_body(request);
if ((server->flags & FLB_HTTP_SERVER_FLAG_AUTO_INFLATE) == 0) {
return 0;
result = flb_http_request_uncompress_body(request);

if (result != 0) {
flb_http_server_session_destroy(session);
if (result != 0) {
flb_http_server_session_destroy(session);

return -1;
return -1;
}
}

if (server->request_callback != NULL) {
Expand Down Expand Up @@ -706,77 +576,3 @@ int flb_http_server_session_ingest(struct flb_http_server_session *session,
return -1;
}


/* PRIVATE */

static \
int uncompress_zlib(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size)
{
return 0;
}

static \
int uncompress_zstd(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size)
{
return 0;
}

static \
int uncompress_deflate(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size)
{
return 0;
}

static \
int uncompress_snappy(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size)
{
int ret;

ret = flb_snappy_uncompress_framed_data(input_buffer,
input_size,
output_buffer,
output_size);

if (ret != 0) {
flb_error("[opentelemetry] snappy decompression failed");

return -1;
}

return 1;
}

static \
int uncompress_gzip(char **output_buffer,
size_t *output_size,
char *input_buffer,
size_t input_size)
{
int ret;

ret = flb_gzip_uncompress(input_buffer,
input_size,
(void *) output_buffer,
output_size);

if (ret == -1) {
flb_error("[opentelemetry] gzip decompression failed");

return -1;
}

return 1;
}

0 comments on commit a025f3d

Please sign in to comment.