From 85838c90f574306ef1988a9a29fdb7fc8aea7eaf Mon Sep 17 00:00:00 2001 From: seblaz Date: Fri, 5 Apr 2024 16:00:08 -0300 Subject: [PATCH] engine_dispatch: remove chunks from memory if the task fails to be created When the tasks_map is filled and the new arriving chunks try to create a new task, then an error is returned. Additionally, under that condition the new chunks don't get deleted from memory, and they occupy a space of the storage.max_chunks_up. Eventually, the new chunks end up using the entire space of the storage.max_chunks_up. This causes the old chunks that have a task assigned to be unable to be brought up into memory, and therefore they never get flushed. This pr fixes the problem by deleting the new chunks from memory in case they failed to create a task and the filesystem storage is in use. Additionally, it runs a check to not bring it up to memory in the first place if possible. Signed-off-by: seblaz --- src/flb_engine_dispatch.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/flb_engine_dispatch.c b/src/flb_engine_dispatch.c index bbfd87abbb6..b161888e679 100644 --- a/src/flb_engine_dispatch.c +++ b/src/flb_engine_dispatch.c @@ -29,6 +29,7 @@ #include #include #include +#include /* It creates a new output thread using a 'Retry' context */ @@ -270,6 +271,14 @@ int flb_engine_dispatch(uint64_t id, struct flb_input_instance *in, continue; } + if (flb_task_map_get_task_id(config) == -1) { + /* + * There isn't a task available, no more chunks can have a task + * assigned. + */ + break; + } + /* There is a match, get the buffer */ buf_data = flb_input_chunk_flush(ic, &buf_size); if (buf_size == 0) { @@ -312,6 +321,12 @@ int flb_engine_dispatch(uint64_t id, struct flb_input_instance *in, */ if (t_err == FLB_TRUE) { flb_input_chunk_release_lock(ic); + + /* + * If the Storage type is 'filesystem' we need to put + * the file content down. + */ + flb_input_chunk_down(ic); } continue; }