Skip to content

Commit

Permalink
out_chronicle: handle 1MB or larger chunks
Browse files Browse the repository at this point in the history
The actual size of payloads on out_chronicle is determined by the results of `chronicle_format`.
The first attempt to create a payload can exceed the limit of chronicle which is 1MiB (= 1 * 1024 * 1024) due to the metadata and timestamps. This will set a smaller threshold and retry to send the data.

Signed-off-by: lecaros <[email protected]>
  • Loading branch information
lecaros authored and edsiper committed Apr 26, 2024
1 parent 3c2eece commit fead302
Showing 1 changed file with 182 additions and 55 deletions.
237 changes: 182 additions & 55 deletions plugins/out_chronicle/chronicle.c
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,45 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, uint64_t by
return out_buf;
}

static int count_mp_with_threshold(size_t last_offset, size_t threshold,
struct flb_log_event_decoder *log_decoder,
struct flb_chronicle *ctx)
{
int ret;
int array_size = 0;
size_t off = 0;
struct flb_log_event log_event;

/* Adjust decoder offset */
if (last_offset != 0) {
log_decoder->offset = last_offset;
}

while ((ret = flb_log_event_decoder_next(
log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
off = log_decoder->offset;
array_size++;

if (off >= (threshold + last_offset)) {
flb_plg_debug(ctx->ins,
"the offset %zu is exceeded the threshold %zu. "
"Splitting the payload over the threshold so the processed array size is %d",
off, threshold, array_size);

break;
}
}

return array_size;
}

static int chronicle_format(const void *data, size_t bytes,
const char *tag, size_t tag_len,
char **out_data, size_t *out_size,
size_t last_offset,
size_t threshold, size_t *out_offset,
struct flb_log_event_decoder *log_decoder,
struct flb_chronicle *ctx)
{
int len;
Expand All @@ -643,24 +679,16 @@ static int chronicle_format(const void *data, size_t bytes,
/* Parameters for Timestamp */
struct tm tm;
flb_sds_t out_buf;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
flb_sds_t log_text = NULL;
int log_text_size;

/* Count number of records */
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
array_size = count_mp_with_threshold(last_offset, threshold, log_decoder, ctx);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", ret);

return -1;
}

array_size = flb_mp_count(data, bytes);
/* Reset the decoder state */
flb_log_event_decoder_reset(log_decoder, (char *) data, bytes);

/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
Expand Down Expand Up @@ -707,10 +735,16 @@ static int chronicle_format(const void *data, size_t bytes,
/* Append entries */
msgpack_pack_array(&mp_pck, array_size);

flb_plg_trace(ctx->ins, "last offset is %zu", last_offset);
/* Adjust decoder offset */
if (last_offset != 0) {
log_decoder->offset = last_offset;
}

while ((ret = flb_log_event_decoder_next(
&log_decoder,
log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
off = log_decoder.offset;
off = log_decoder->offset;
alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */
last_off = off;

Expand Down Expand Up @@ -764,19 +798,27 @@ static int chronicle_format(const void *data, size_t bytes,

msgpack_pack_str(&mp_pck, s);
msgpack_pack_str_body(&mp_pck, time_formatted, s);

if (off >= (threshold + last_offset)) {
flb_plg_debug(ctx->ins,
"the offset %zu is exceeded the threshold %zu. "
"Splitting the payload over the threshold so the processed array size has %d.",
off, threshold, array_size);

break;
}
}

/* Convert from msgpack to JSON */
out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);

flb_log_event_decoder_destroy(&log_decoder);
msgpack_sbuffer_destroy(&mp_sbuf);

if (!out_buf) {
flb_plg_error(ctx->ins, "error formatting JSON payload");
return -1;
}

*out_offset = last_off;
*out_data = out_buf;
*out_size = flb_sds_len(out_buf);

Expand All @@ -800,6 +842,14 @@ static void cb_chronicle_flush(struct flb_event_chunk *event_chunk,
struct flb_chronicle *ctx = out_context;
struct flb_connection *u_conn;
struct flb_http_client *c;
struct flb_log_event_decoder log_decoder;
size_t threshold = 0.8 * 1024 * 1024;
size_t offset = 0;
size_t out_offset = 0;
int need_loop = FLB_TRUE;
const int retry_limit = 8;
int retries = 0;
const size_t one_mebibyte = 1024 * 1024;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

Expand All @@ -818,61 +868,138 @@ static void cb_chronicle_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Reformat msgpack to chronicle JSON payload */
ret = chronicle_format(event_chunk->data, event_chunk->size,
event_chunk->tag, flb_sds_len(event_chunk->tag),
&payload_buf, &payload_size, ctx);
if (ret != 0) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(token);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
flb_plg_trace(ctx->ins, "msgpack payload size is %zu", event_chunk->size);

/* Prepare log decoder */
ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", ret);

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->endpoint,
payload_buf, payload_size, NULL, 0, NULL, 0);
if (!c) {
flb_plg_error(ctx->ins, "cannot create HTTP client context");
flb_upstream_conn_release(u_conn);
flb_sds_destroy(token);
flb_sds_destroy(payload_buf);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

flb_http_buffer_size(c, 4192);
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
while (need_loop) {
retry:
if (retries > 0) {
/* (retry_limit - retries)/10.0 is a factor to reduce the
* formatting payloads.
* For the first attempt, it will get:
* (8 - 1) / 10.0 = 0.7
* For the second attempt, it will get:
* (8 - 2) / 10.0 = 0.6
* ...
* For 7th attempt, it will get:
* (8 - 7) / 10.0 = 0.1
* For 8th attempt, it won't happen. Just give up for
* formating though. :)
*/
threshold = (retry_limit - retries)/10.0 * one_mebibyte;
}

/* Compose and append Authorization header */
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
/* Reformat msgpack to chronicle JSON payload */
ret = chronicle_format(event_chunk->data, event_chunk->size,
event_chunk->tag, flb_sds_len(event_chunk->tag),
&payload_buf, &payload_size,
offset, threshold, &out_offset,
&log_decoder, ctx);
if (ret != 0) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(token);
flb_sds_destroy(payload_buf);
flb_log_event_decoder_destroy(&log_decoder);

FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);
flb_plg_debug(ctx->ins, "the last offset of msgpack decoder is %zu", out_offset);

/* validate response */
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i", ret);
ret_code = FLB_RETRY;
}
else {
/* The request was issued successfully, validate the 'error' field */
flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
if (c->resp.status == 200) {
ret_code = FLB_OK;
if (payload_size >= one_mebibyte) {
retries++;
if (retries >= retry_limit) {
flb_plg_error(ctx->ins, "Retry limit is exeeced for chronicle_format");

flb_upstream_conn_release(u_conn);
flb_sds_destroy(token);
flb_sds_destroy(payload_buf);
flb_log_event_decoder_destroy(&log_decoder);

FLB_OUTPUT_RETURN(FLB_ERROR);
}

flb_plg_debug(ctx->ins,
"HTTP request body is exeeded to %d bytes. actual: %zu. left attempt(s): %d",
one_mebibyte, payload_size, retry_limit - retries);
flb_sds_destroy(payload_buf);

goto retry;
}
else {
if (c->resp.payload && c->resp.payload_size > 0) {
/* we got an error */
flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload);
}
retries = 0;
}

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->endpoint,
payload_buf, payload_size, NULL, 0, NULL, 0);
if (!c) {
flb_plg_error(ctx->ins, "cannot create HTTP client context");
flb_upstream_conn_release(u_conn);
flb_sds_destroy(token);
flb_sds_destroy(payload_buf);
flb_log_event_decoder_destroy(&log_decoder);

FLB_OUTPUT_RETURN(FLB_RETRY);
}

flb_http_buffer_size(c, 4192);
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);

/* Compose and append Authorization header */
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);

/* validate response */
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i", ret);
ret_code = FLB_RETRY;
}
else {
/* The request was issued successfully, validate the 'error' field */
flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
if (c->resp.status == 200) {
ret_code = FLB_OK;
}
else {
if (c->resp.payload && c->resp.payload_size > 0) {
/* we got an error */
flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload);
}
ret_code = FLB_RETRY;
}
}

/* Validate all chunks are processed or not */
if (out_offset >= event_chunk->size) {
need_loop = FLB_FALSE;
}
/* Clean up HTTP client stuffs */
flb_sds_destroy(payload_buf);
flb_http_client_destroy(c);

/* The next loop uses the returned offset */
offset = out_offset;
}

/* Cleanup */
flb_sds_destroy(payload_buf);
/* Cleanup decoder */
flb_log_event_decoder_destroy(&log_decoder);

/* Cleanup token and conn */
flb_sds_destroy(token);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);

/* Done */
Expand Down

0 comments on commit fead302

Please sign in to comment.