Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_client: out_es: lib: implement http client response testing mechanism #9401

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,71 @@ struct flb_http_debug {
int (*cb_debug_request_payload);
};

/* To make opaque struct */
struct flb_http_client;

/*
* Tests callbacks
* ===============
*/
struct flb_test_http_response {
/*
* Response Test Mode
* ====================
* When the response test enable the test response mode, it needs to
* keep a reference of the context and other information:
*
* - rt_ctx : flb_http_client context
*
* - rt_status : HTTP response code
*
* - rt_in_callback: intermediary function to receive the results of
* the http response test function.
*
* - rt_data: opaque data type for rt_in_callback()
*/

/* runtime library context */
void *rt_ctx;

/* HTTP status */
int rt_status;

/* optional response context */
void *response_ctx;

/*
* "response test callback": this function pointer is used by Fluent Bit
* http client testing mode to reference a test function that must retrieve the
* results of 'callback'. Consider this an intermediary function to
* transfer the results to the runtime test.
*
* This function is private and should not be set manually in the plugin
* code, it's set on src/flb_http_client.c .
*/
void (*rt_resp_callback) (void *, int, void *, size_t, void *);

/*
* opaque data type passed by the runtime library to be used on
* rt_in_callback().
*/
void *rt_data;

/*
* Callback
* =========
* "HTTP response callback": it references the plugin function that performs
* to validate HTTP response by HTTP client. This entry is mostly to
* expose the plugin local function.
*/
int (*callback) (/* plugin that ingested the records */
struct flb_http_client *,
const void *, /* incoming response data */
size_t, /* incoming response size */
void **, /* output buffer */
size_t *); /* output buffer size */
};

/* Set a request type */
struct flb_http_client {
/* Upstream connection */
Expand Down Expand Up @@ -133,6 +198,10 @@ struct flb_http_client {
/* Response */
struct flb_http_client_response resp;

/* Tests */
int test_mode;
struct flb_test_http_response test_response;

/* Reference to Callback context */
void *cb_ctx;
};
Expand All @@ -146,6 +215,13 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn,
const char *host, int port,
const char *proxy, int flags);

/* For fulfilling HTTP response testing (dummy client) */
struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn,
int method, const char *uri,
const char *body, size_t body_len,
const char *host, int port,
const char *proxy, int flags);

int flb_http_add_header(struct flb_http_client *c,
const char *key, size_t key_len,
const char *val, size_t val_len);
Expand All @@ -161,6 +237,12 @@ int flb_http_set_keepalive(struct flb_http_client *c);
int flb_http_set_content_encoding_gzip(struct flb_http_client *c);
int flb_http_set_callback_context(struct flb_http_client *c,
struct flb_callback *cb_ctx);
int flb_http_set_response_test(struct flb_http_client *c, char *test_name,
const void *data, size_t len,
int status,
void (*resp_callback) (void *, int, void *, size_t, void *),
void *resp_callback_data);
int flb_http_push_response(struct flb_http_client *c, const void *data, size_t len);

int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed);
int flb_http_do_request(struct flb_http_client *c, size_t *bytes);
Expand Down
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
void *test_ctx);
FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name,
void (*cb)(char *, void *, void *));
FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name,
void (*out_response) (void *, int, int, void *, size_t, void *),
void *out_callback_data);

FLB_EXPORT int flb_filter_set(flb_ctx_t *ctx, int ffd, ...);
FLB_EXPORT int flb_service_set(flb_ctx_t *ctx, ...);
Expand All @@ -84,6 +87,9 @@ FLB_EXPORT int flb_loop(flb_ctx_t *ctx);
FLB_EXPORT int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len);
FLB_EXPORT int flb_lib_config_file(flb_ctx_t *ctx, const char *path);

/* Emulate ingestions of HTTP responses for output plugins */
FLB_EXPORT int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len);

/* library context handling */
FLB_EXPORT void flb_context_set(flb_ctx_t *ctx);
FLB_EXPORT flb_ctx_t *flb_context_get();
Expand Down
62 changes: 62 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,66 @@ struct flb_test_out_formatter {
size_t *); /* output buffer size */
};

struct flb_test_out_response {
/*
* Runtime Library Mode
* ====================
* When the runtime library enable the test formatter mode, it needs to
* keep a reference of the context and other information:
*
* - rt_ctx : context created by flb_create()
*
* - rt_ffd : this plugin assigned 'integer' created by flb_output()
*
* - rt_step_calback: intermediary function to receive the results of
* the formatter plugin test function.
*
* - rt_data: opaque data type for rt_step_callback()
*/

/* runtime library context */
void *rt_ctx;

/* runtime library: assigned plugin integer */
int rt_ffd;

/*
* "runtime step callback": this function pointer is used by Fluent Bit
* library mode to reference a test function that must retrieve the
* results of 'callback'. Consider this an intermediary function to
* transfer the results to the runtime test.
*
* This function is private and should not be set manually in the plugin
* code, it's set on src/flb_lib.c .
*/
void (*rt_out_response) (void *, int, int, void *, size_t, void *);

/*
* opaque data type passed by the runtime library to be used on
* rt_step_test().
*/
void *rt_data;

/* optional context for flush callback */
void *flush_ctx;

/*
* Callback
* =========
* "Formatter callback": it references the plugin function that performs
* data formatting (msgpack -> local data). This entry is mostly to
* expose the plugin local function.
*/
int (*callback) (/* Fluent Bit context */
struct flb_config *,
void *, /* plugin instance context */
int status, /* HTTP status code */
const void *, /* respond msgpack data */
size_t, /* respond msgpack size */
void **, /* output buffer */
size_t *); /* output buffer size */
};

struct flb_output_plugin {
/*
* a 'mask' to define what kind of data the plugin can manage:
Expand Down Expand Up @@ -241,6 +301,7 @@ struct flb_output_plugin {

/* Tests */
struct flb_test_out_formatter test_formatter;
struct flb_test_out_response test_response;

/* Link to global list from flb_config->outputs */
struct mk_list _head;
Expand Down Expand Up @@ -391,6 +452,7 @@ struct flb_output_instance {

/* Tests */
struct flb_test_out_formatter test_formatter;
struct flb_test_out_response test_response;

/*
* Buffer counter: it counts the total of disk space (filesystem) used by buffers
Expand Down
73 changes: 73 additions & 0 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

static int elasticsearch_response_test(struct flb_config *config,
void *plugin_context,
int status,
const void *data, size_t bytes,
void **out_data, size_t *out_size)
{
int ret = 0;
struct flb_elasticsearch *ctx = plugin_context;
struct flb_connection *u_conn;
struct flb_http_client *c;
size_t b_sent;

/* Not retrieve upstream connection */
u_conn = NULL;

/* Compose HTTP Client request (dummy client) */
c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri,
NULL, 0, NULL, 0, NULL, 0);

flb_http_buffer_size(c, ctx->buffer_size);

/* Just stubbing the HTTP responses */
flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL);

ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
goto error;
}
if (ret != 0) {
flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
goto error;
}
else {
/* The request was issued successfully, validate the 'error' field */
flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
if (c->resp.status != 200 && c->resp.status != 201) {
if (c->resp.payload_size > 0) {
flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n",
c->resp.status, ctx->uri, c->resp.payload);
}
else {
flb_plg_error(ctx->ins, "HTTP status=%i URI=%s",
c->resp.status, ctx->uri);
}
goto error;
}

if (c->resp.payload_size > 0) {
/*
* Elasticsearch payload should be JSON, we convert it to msgpack
* and lookup the 'error' field.
*/
ret = elasticsearch_error_check(ctx, c);
}
else {
goto error;
}
}

/* Cleanup */
flb_http_client_destroy(c);

return ret;

error:
/* Cleanup */
flb_http_client_destroy(c);

return -2;
}

static int cb_es_exit(void *data, struct flb_config *config)
{
struct flb_elasticsearch *ctx = data;
Expand Down Expand Up @@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = {

/* Test */
.test_formatter.callback = elasticsearch_format,
.test_response.callback = elasticsearch_response_test,

/* Plugin flags */
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
Expand Down
Loading
Loading