Skip to content

Commit

Permalink
input_chunk: added proper accounting of chunks that are rolled over
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich authored and edsiper committed Jan 31, 2024
1 parent 44327fa commit 1832113
Showing 1 changed file with 71 additions and 3 deletions.
74 changes: 71 additions & 3 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,50 @@ static int flb_input_chunk_is_task_safe_delete(struct flb_task *task);

static int flb_input_chunk_drop_task_route(
struct flb_task *task,
struct flb_output_instance *o_ins);
struct flb_output_instance *o_ins,
ssize_t *dropped_record_count);

static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);

static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk)
{
ssize_t record_count;
char *chunk_buffer;
size_t chunk_size;
int set_down;
int ret;

ret = cio_chunk_is_up(input_chunk->chunk);
set_down = FLB_FALSE;

if (ret == CIO_FALSE) {
ret = cio_chunk_up_force(input_chunk->chunk);

if (ret == -1) {
return -1;
}

set_down = FLB_TRUE;
}

ret = cio_chunk_get_content(input_chunk->chunk,
&chunk_buffer,
&chunk_size);

if (ret == CIO_OK) {
record_count = flb_mp_count(chunk_buffer, chunk_size);
}
else {
record_count = -1;
}

if (set_down) {
cio_chunk_down(input_chunk->chunk);
}

return record_count;
}

static int flb_input_chunk_release_space(
struct flb_input_chunk *new_input_chunk,
struct flb_input_instance *input_plugin,
Expand All @@ -105,6 +145,7 @@ static int flb_input_chunk_release_space(
{
struct mk_list *input_chunk_iterator_tmp;
struct mk_list *input_chunk_iterator;
ssize_t dropped_record_count;
int chunk_destroy_flag;
struct flb_input_chunk *old_input_chunk;
ssize_t released_space;
Expand All @@ -130,7 +171,8 @@ static int flb_input_chunk_release_space(
}

if (flb_input_chunk_drop_task_route(old_input_chunk->task,
output_plugin) == FLB_FALSE) {
output_plugin,
&dropped_record_count) == FLB_FALSE) {
continue;
}

Expand All @@ -154,6 +196,27 @@ static int flb_input_chunk_release_space(
chunk_destroy_flag = FLB_TRUE;
}

#ifdef FLB_HAVE_METRICS
if (dropped_record_count == 0) {
dropped_record_count = get_input_chunk_record_count(old_input_chunk);

if (dropped_record_count == -1) {
flb_debug("[task] error getting chunk record count : %s",
old_input_chunk->in->name);
}
else {
cmt_counter_add(output_plugin->cmt_dropped_records,
cfl_time_now(),
dropped_record_count,
1, (char *[]) {(char *) flb_output_name(output_plugin)});

flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS,
dropped_record_count,
output_plugin->metrics);
}
}
#endif

if (chunk_destroy_flag) {
if (old_input_chunk->task != NULL) {
/*
Expand Down Expand Up @@ -271,11 +334,14 @@ int flb_input_chunk_write_at(void *data, off_t offset,

static int flb_input_chunk_drop_task_route(
struct flb_task *task,
struct flb_output_instance *output_plugin)
struct flb_output_instance *output_plugin,
ssize_t *dropped_record_count)
{
int route_status;
int result;

*dropped_record_count = 0;

if (task == NULL) {
return FLB_TRUE;
}
Expand All @@ -295,6 +361,8 @@ static int flb_input_chunk_drop_task_route(
output_plugin,
FLB_TASK_ROUTE_DROPPED);

*dropped_record_count = (ssize_t) task->records;

result = FLB_TRUE;
}

Expand Down

0 comments on commit 1832113

Please sign in to comment.