From 43af795ec5bc8f4b290933c6fc9d1d8002b8fbbf Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 13 Sep 2024 20:27:21 +0900 Subject: [PATCH 1/4] http_client: Implement response testing framework Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_http_client.h | 82 ++++++++++++ src/flb_http_client.c | 178 +++++++++++++++++++++++---- 2 files changed, 238 insertions(+), 22 deletions(-) diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 8d7058622e0..7f11db8e83d 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -100,6 +100,71 @@ struct flb_http_debug { int (*cb_debug_request_payload); }; +/* To make opaque struct */ +struct flb_http_client; + +/* + * Tests callbacks + * =============== + */ +struct flb_test_http_response { + /* + * Response Test Mode + * ==================== + * When the response test enable the test response mode, it needs to + * keep a reference of the context and other information: + * + * - rt_ctx : flb_http_client context + * + * - rt_status : HTTP response code + * + * - rt_in_callback: intermediary function to receive the results of + * the http response test function. + * + * - rt_data: opaque data type for rt_in_callback() + */ + + /* runtime library context */ + void *rt_ctx; + + /* HTTP status */ + int rt_status; + + /* optional response context */ + void *response_ctx; + + /* + * "response test callback": this function pointer is used by Fluent Bit + * http client testing mode to reference a test function that must retrieve the + * results of 'callback'. Consider this an intermediary function to + * transfer the results to the runtime test. + * + * This function is private and should not be set manually in the plugin + * code, it's set on src/flb_http_client.c . + */ + void (*rt_resp_callback) (void *, int, void *, size_t, void *); + + /* + * opaque data type passed by the runtime library to be used on + * rt_in_callback(). + */ + void *rt_data; + + /* + * Callback + * ========= + * "HTTP response callback": it references the plugin function that performs + * to validate HTTP response by HTTP client. This entry is mostly to + * expose the plugin local function. + */ + int (*callback) (/* plugin that ingested the records */ + struct flb_http_client *, + const void *, /* incoming response data */ + size_t, /* incoming response size */ + void **, /* output buffer */ + size_t *); /* output buffer size */ +}; + /* Set a request type */ struct flb_http_client { /* Upstream connection */ @@ -133,6 +198,10 @@ struct flb_http_client { /* Response */ struct flb_http_client_response resp; + /* Tests */ + int test_mode; + struct flb_test_http_response test_response; + /* Reference to Callback context */ void *cb_ctx; }; @@ -146,6 +215,13 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, const char *host, int port, const char *proxy, int flags); +/* For fulfilling HTTP response testing (dummy client) */ +struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags); + int flb_http_add_header(struct flb_http_client *c, const char *key, size_t key_len, const char *val, size_t val_len); @@ -161,6 +237,12 @@ int flb_http_set_keepalive(struct flb_http_client *c); int flb_http_set_content_encoding_gzip(struct flb_http_client *c); int flb_http_set_callback_context(struct flb_http_client *c, struct flb_callback *cb_ctx); +int flb_http_set_response_test(struct flb_http_client *c, char *test_name, + const void *data, size_t len, + int status, + void (*resp_callback) (void *, int, void *, size_t, void *), + void *resp_callback_data); +int flb_http_push_response(struct flb_http_client *c, const void *data, size_t len); int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed); int flb_http_do_request(struct flb_http_client *c, size_t *bytes); diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 72a1fadbbd1..e2ad9f30b4d 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -633,7 +633,7 @@ static int add_host_and_content_length(struct flb_http_client *c) return 0; } -struct flb_http_client *flb_http_client(struct flb_connection *u_conn, +struct flb_http_client *create_http_client(struct flb_connection *u_conn, int method, const char *uri, const char *body, size_t body_len, const char *host, int port, @@ -745,24 +745,81 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, c->query_string = p; } - /* Is Upstream connection using keepalive mode ? */ - if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { - c->flags |= FLB_HTTP_KA; - } - /* Response */ c->resp.content_length = -1; c->resp.connection_close = -1; - if ((flags & FLB_HTTP_10) == 0) { - c->flags |= FLB_HTTP_11; - } - if (body && body_len > 0) { c->body_buf = body; c->body_len = body_len; } + /* 'Read' buffer size */ + c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); + if (!c->resp.data) { + flb_errno(); + flb_http_client_destroy(c); + return NULL; + } + c->resp.data[0] = '\0'; + c->resp.data_len = 0; + c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; + c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; + + /* Tests */ + c->test_mode = FLB_FALSE; + c->test_response.callback = NULL; + + return c; +} + +struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + struct flb_http_client *c; + + c = create_http_client(u_conn, method, uri, + body, body_len, + host, port, + proxy, flags); + + if (!c) { + return NULL; + } + + return c; +} + +struct flb_http_client *flb_http_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + int ret; + struct flb_http_client *c; + + c = create_http_client(u_conn, method, uri, + body, body_len, + host, port, + proxy, flags); + + if (!c) { + return NULL; + } + + /* Is Upstream connection using keepalive mode ? */ + if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { + c->flags |= FLB_HTTP_KA; + } + + if ((flags & FLB_HTTP_10) == 0) { + c->flags |= FLB_HTTP_11; + } + add_host_and_content_length(c); /* Check proxy data */ @@ -776,18 +833,6 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, } } - /* 'Read' buffer size */ - c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); - if (!c->resp.data) { - flb_errno(); - flb_http_client_destroy(c); - return NULL; - } - c->resp.data[0] = '\0'; - c->resp.data_len = 0; - c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; - c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; - return c; } @@ -1071,6 +1116,91 @@ int flb_http_set_callback_context(struct flb_http_client *c, return 0; } +int flb_http_set_response_test(struct flb_http_client *c, char *test_name, + const void *data, size_t len, + int status, + void (*resp_callback) (void *, int, void *, size_t, void *), + void *resp_callback_data) +{ + if (!c) { + return -1; + } + + /* + * Enabling a test, set the http_client instance in 'test' mode, so no real + * http request is invoked, only the desired implemented test. + */ + + /* Response test */ + if (strcmp(test_name, "response") == 0) { + c->test_mode = FLB_TRUE; + c->test_response.rt_ctx = c; + c->test_response.rt_status = status; + c->test_response.rt_resp_callback = resp_callback; + c->test_response.rt_data = resp_callback_data; + if (data != NULL && len > 0) { + c->resp.payload = (char *)data; + c->resp.payload_size = len; + c->resp.status = status; + } + } + else { + return -1; + } + + return 0; +} + +static int flb_http_run_response_test(struct flb_http_client *c, + const void *data, size_t len) +{ + int ret = 0; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_test_http_response *htr; + + if (!c) { + return -1; + } + + htr = &c->test_response; + + /* Invoke the output plugin formatter test callback */ + ret = htr->callback(c, + data, len, + &out_buf, &out_size); + + /* Call the runtime test callback checker */ + if (htr->rt_resp_callback) { + htr->rt_resp_callback(htr->rt_ctx, + ret, + out_buf, out_size, + htr->rt_data); + } + else { + flb_free(out_buf); + } + + return 0; +} + +/* Push some response into the http client */ +static int flb_http_stub_response(struct flb_http_client *c) +{ + int ret = 0; + + if (!c) { + return -1; + } + + /* If http client's test_responses is registered, run the stub. */ + if (c->test_response.callback != NULL && c->resp.payload != NULL) { + ret = flb_http_run_response_test(c, c->resp.payload, c->resp.payload_size); + } + + return ret; +} + int flb_http_add_auth_header(struct flb_http_client *c, const char *user, const char *passwd, const char *header) { int ret; @@ -1362,6 +1492,10 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) { int ret; + if (c->test_mode == FLB_TRUE) { + return flb_http_stub_response(c); + } + ret = flb_http_do_request(c, bytes); if (ret != 0) { return ret; From 5b382ef3a4ba24abf202b0d5e0949c4c920dd51c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 17 Sep 2024 18:00:31 +0900 Subject: [PATCH 2/4] output: Add a capability to inject HTTP response testing environment Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_output.h | 62 +++++++++++++++++++++++++++++++++ src/flb_output.c | 1 + 2 files changed, 63 insertions(+) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 0dab6a8604d..cc2d203ca12 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -168,6 +168,66 @@ struct flb_test_out_formatter { size_t *); /* output buffer size */ }; +struct flb_test_out_response { + /* + * Runtime Library Mode + * ==================== + * When the runtime library enable the test formatter mode, it needs to + * keep a reference of the context and other information: + * + * - rt_ctx : context created by flb_create() + * + * - rt_ffd : this plugin assigned 'integer' created by flb_output() + * + * - rt_step_calback: intermediary function to receive the results of + * the formatter plugin test function. + * + * - rt_data: opaque data type for rt_step_callback() + */ + + /* runtime library context */ + void *rt_ctx; + + /* runtime library: assigned plugin integer */ + int rt_ffd; + + /* + * "runtime step callback": this function pointer is used by Fluent Bit + * library mode to reference a test function that must retrieve the + * results of 'callback'. Consider this an intermediary function to + * transfer the results to the runtime test. + * + * This function is private and should not be set manually in the plugin + * code, it's set on src/flb_lib.c . + */ + void (*rt_out_response) (void *, int, int, void *, size_t, void *); + + /* + * opaque data type passed by the runtime library to be used on + * rt_step_test(). + */ + void *rt_data; + + /* optional context for flush callback */ + void *flush_ctx; + + /* + * Callback + * ========= + * "Formatter callback": it references the plugin function that performs + * data formatting (msgpack -> local data). This entry is mostly to + * expose the plugin local function. + */ + int (*callback) (/* Fluent Bit context */ + struct flb_config *, + void *, /* plugin instance context */ + int status, /* HTTP status code */ + const void *, /* respond msgpack data */ + size_t, /* respond msgpack size */ + void **, /* output buffer */ + size_t *); /* output buffer size */ +}; + struct flb_output_plugin { /* * a 'mask' to define what kind of data the plugin can manage: @@ -241,6 +301,7 @@ struct flb_output_plugin { /* Tests */ struct flb_test_out_formatter test_formatter; + struct flb_test_out_response test_response; /* Link to global list from flb_config->outputs */ struct mk_list _head; @@ -391,6 +452,7 @@ struct flb_output_instance { /* Tests */ struct flb_test_out_formatter test_formatter; + struct flb_test_out_response test_response; /* * Buffer counter: it counts the total of disk space (filesystem) used by buffers diff --git a/src/flb_output.c b/src/flb_output.c index 90593905960..288fc9dbb1f 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -729,6 +729,7 @@ struct flb_output_instance *flb_output_new(struct flb_config *config, /* Tests */ instance->test_formatter.callback = plugin->test_formatter.callback; + instance->test_response.callback = plugin->test_response.callback; return instance; From 304524cd7e400728d84147cdb7bfa11afa8607ca Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 17 Sep 2024 18:02:42 +0900 Subject: [PATCH 3/4] lib: Implement injecting HTTP response mechanism Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_lib.h | 6 +++ src/flb_lib.c | 89 ++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index f96b06eff86..897ea0098ee 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -68,6 +68,9 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void *test_ctx); FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)); +FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_response) (void *, int, int, void *, size_t, void *), + void *out_callback_data); FLB_EXPORT int flb_filter_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_service_set(flb_ctx_t *ctx, ...); @@ -84,6 +87,9 @@ FLB_EXPORT int flb_loop(flb_ctx_t *ctx); FLB_EXPORT int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len); FLB_EXPORT int flb_lib_config_file(flb_ctx_t *ctx, const char *path); +/* Emulate ingestions of HTTP responses for output plugins */ +FLB_EXPORT int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len); + /* library context handling */ FLB_EXPORT void flb_context_set(flb_ctx_t *ctx); FLB_EXPORT flb_ctx_t *flb_context_get(); diff --git a/src/flb_lib.c b/src/flb_lib.c index 1821a2e61ad..b9674e06824 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -347,6 +347,37 @@ int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) return 0; } +int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_response) (void *, int, int, void *, size_t, void *), + void *out_callback_data) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* + * Enabling a test, set the output instance in 'test' mode, so no real + * flush callback is invoked, only the desired implemented test. + */ + + /* Response test */ + if (strcmp(test_name, "response") == 0) { + o_ins->test_mode = FLB_TRUE; + o_ins->test_response.rt_ctx = ctx; + o_ins->test_response.rt_ffd = ffd; + o_ins->test_response.rt_out_response = out_response; + o_ins->test_response.rt_data = out_callback_data; + } + else { + return -1; + } + + return 0; +} + static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val) { struct flb_kv *kv; @@ -638,6 +669,41 @@ int flb_lib_free(void* data) } +static int flb_output_run_response(flb_ctx_t *ctx, struct flb_output_instance *o_ins, + int status, const void *data, size_t len) +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_test_out_response *resp; + + if (!o_ins) { + return -1; + } + + resp = &o_ins->test_response; + + /* Invoke the input plugin formatter test callback */ + ret = resp->callback(ctx->config, + o_ins->context, + status, data, len, + &out_buf, &out_size); + + /* Call the runtime test callback checker */ + if (resp->rt_out_response) { + resp->rt_out_response(resp->rt_ctx, + resp->rt_ffd, + ret, + out_buf, out_size, + resp->rt_data); + } + else { + flb_free(out_buf); + } + + return 0; +} + /* Push some data into the Engine */ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) { @@ -662,6 +728,29 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) return ret; } +/* Emulate some data from the response */ +int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len) +{ + int ret; + struct flb_output_instance *o_ins; + + if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { + flb_error("[lib] cannot push data, engine is not running"); + return -1; + } + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* If input's test_formatter is registered, priorize to run it. */ + if (o_ins->test_response.callback != NULL) { + ret = flb_output_run_response(ctx, o_ins, status, data, len); + } + return ret; +} + static void flb_lib_worker(void *data) { int ret; From 04b7e158d8fc7c25eb4a90b7769b4b75daaeb81c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 18 Sep 2024 14:06:59 +0900 Subject: [PATCH 4/4] out_es: tests: Add HTTP response testing Signed-off-by: Hiroshi Hatake --- plugins/out_es/es.c | 73 ++++++++++++++ tests/runtime/data/es/json_es.h | 34 +++++++ tests/runtime/out_elasticsearch.c | 161 ++++++++++++++++++++++++++++++ 3 files changed, 268 insertions(+) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index b0773a40991..c896b1ed258 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } +static int elasticsearch_response_test(struct flb_config *config, + void *plugin_context, + int status, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret = 0; + struct flb_elasticsearch *ctx = plugin_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + size_t b_sent; + + /* Not retrieve upstream connection */ + u_conn = NULL; + + /* Compose HTTP Client request (dummy client) */ + c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri, + NULL, 0, NULL, 0, NULL, 0); + + flb_http_buffer_size(c, ctx->buffer_size); + + /* Just stubbing the HTTP responses */ + flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL); + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + if (c->resp.status != 200 && c->resp.status != 201) { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", + c->resp.status, ctx->uri, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", + c->resp.status, ctx->uri); + } + goto error; + } + + if (c->resp.payload_size > 0) { + /* + * Elasticsearch payload should be JSON, we convert it to msgpack + * and lookup the 'error' field. + */ + ret = elasticsearch_error_check(ctx, c); + } + else { + goto error; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + + return ret; + +error: + /* Cleanup */ + flb_http_client_destroy(c); + + return -2; +} + static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; @@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = { /* Test */ .test_formatter.callback = elasticsearch_format, + .test_response.callback = elasticsearch_response_test, /* Plugin flags */ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, diff --git a/tests/runtime/data/es/json_es.h b/tests/runtime/data/es/json_es.h index 40f8ab1cac4..91348ec47db 100755 --- a/tests/runtime/data/es/json_es.h +++ b/tests/runtime/data/es/json_es.h @@ -15,3 +15,37 @@ #define JSON_DOTS \ "[1448403340," \ "{\".le.vel\":\"error\", \".fo.o\":[{\".o.k\": [{\".b.ar\": \"baz\"}]}]}]" + +#define JSON_RESPONSE_SUCCESSES "{\"errors\":false,\"took\":0,\"items\":[" \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dcfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":6,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dsfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"d8fJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"eMfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":9,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_SUCCESSES_SIZE 783 + +#define JSON_RESPONSE_PARTIALLY_SUCCESS "{\"errors\":true,\"took\":316737025,\"items\":" \ + "[{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"hxELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iBELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iRELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"ihELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE 1322 diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9efe7610a95..eac72cbf321 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -799,6 +799,164 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +static void cb_check_response_success(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + TEST_CHECK(res_ret == 1); +} + +void flb_test_response_success() +{ + int ret; + char *response = "{\"took\":1,\"errors\":false,\"items\":[]}"; + int size = 37; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_response_successes() +{ + int ret; + char *response = JSON_RESPONSE_SUCCESSES; + int size = JSON_RESPONSE_SUCCESSES_SIZE; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +static void cb_check_response_partially_success(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + int composed_ret = 0; + composed_ret |= (1 << 0); + composed_ret |= (1 << 7); + + TEST_CHECK(res_ret == composed_ret); + /* Check whether contains a success flag or not */ + TEST_CHECK((res_ret & (1 << 0))); +} + +void flb_test_response_partially_success() +{ + int ret; + char *response = JSON_RESPONSE_PARTIALLY_SUCCESS; + int size = JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_partially_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -814,5 +972,8 @@ TEST_LIST = { {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, + {"response_success" , flb_test_response_success }, + {"response_successes", flb_test_response_successes }, + {"response_partially_success" , flb_test_response_partially_success }, {NULL, NULL} };