diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 58413cbac74..237e50e626a 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -92,10 +92,50 @@ static int flb_input_chunk_is_task_safe_delete(struct flb_task *task); static int flb_input_chunk_drop_task_route( struct flb_task *task, - struct flb_output_instance *o_ins); + struct flb_output_instance *o_ins, + ssize_t *dropped_record_count); static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); +static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk) +{ + ssize_t record_count; + char *chunk_buffer; + size_t chunk_size; + int set_down; + int ret; + + ret = cio_chunk_is_up(input_chunk->chunk); + set_down = FLB_FALSE; + + if (ret == CIO_FALSE) { + ret = cio_chunk_up_force(input_chunk->chunk); + + if (ret == -1) { + return -1; + } + + set_down = FLB_TRUE; + } + + ret = cio_chunk_get_content(input_chunk->chunk, + &chunk_buffer, + &chunk_size); + + if (ret == CIO_OK) { + record_count = flb_mp_count(chunk_buffer, chunk_size); + } + else { + record_count = -1; + } + + if (set_down) { + cio_chunk_down(input_chunk->chunk); + } + + return record_count; +} + static int flb_input_chunk_release_space( struct flb_input_chunk *new_input_chunk, struct flb_input_instance *input_plugin, @@ -105,6 +145,7 @@ static int flb_input_chunk_release_space( { struct mk_list *input_chunk_iterator_tmp; struct mk_list *input_chunk_iterator; + ssize_t dropped_record_count; int chunk_destroy_flag; struct flb_input_chunk *old_input_chunk; ssize_t released_space; @@ -130,7 +171,8 @@ static int flb_input_chunk_release_space( } if (flb_input_chunk_drop_task_route(old_input_chunk->task, - output_plugin) == FLB_FALSE) { + output_plugin, + &dropped_record_count) == FLB_FALSE) { continue; } @@ -154,6 +196,27 @@ static int flb_input_chunk_release_space( chunk_destroy_flag = FLB_TRUE; } +#ifdef FLB_HAVE_METRICS + if (dropped_record_count == 0) { + dropped_record_count = get_input_chunk_record_count(old_input_chunk); + + if (dropped_record_count == -1) { + flb_debug("[task] error getting chunk record count : %s", + old_input_chunk->in->name); + } + else { + cmt_counter_add(output_plugin->cmt_dropped_records, + cfl_time_now(), + dropped_record_count, + 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, + dropped_record_count, + output_plugin->metrics); + } + } +#endif + if (chunk_destroy_flag) { if (old_input_chunk->task != NULL) { /* @@ -271,11 +334,14 @@ int flb_input_chunk_write_at(void *data, off_t offset, static int flb_input_chunk_drop_task_route( struct flb_task *task, - struct flb_output_instance *output_plugin) + struct flb_output_instance *output_plugin, + ssize_t *dropped_record_count) { int route_status; int result; + *dropped_record_count = 0; + if (task == NULL) { return FLB_TRUE; } @@ -295,6 +361,8 @@ static int flb_input_chunk_drop_task_route( output_plugin, FLB_TASK_ROUTE_DROPPED); + *dropped_record_count = (ssize_t) task->records; + result = FLB_TRUE; }