Skip to content

Commit

Permalink
processor_content_modifier: add support for OTel Logs Resource and Sc…
Browse files Browse the repository at this point in the history
…opes

The following patch extends the processor to allow to modify the resources and scopes
of Logs generated by an OpenTelemetry source.

The following new contexts are supported:

 - otel_resource_attributes: alter resource attributes
 - otel_scope_name: manipulate the scope name
 - otel_scope_version: manipulate the scope version
 - otel_scope_attributes: alter the scope attributes

example:

----- fluent-bit.yaml -----

pipeline:
  inputs:
    - name: opentelemetry
      port: ${FLUENT_BIT_TEST_LISTENER_PORT}
      processors:
        logs:
          - name: content_modifier
            context: otel_resource_attributes
            action: upsert
            key: "new_attr"
            value: "my_val"

          - name: content_modifier
            context: otel_resource_attributes
            action: delete
            key: "service.name"

          - name: content_modifier
            context: otel_scope_attributes
            action: upsert
            key: "my_new_scope_attr"
            value: "123"

          - name: content_modifier
            context: otel_scope_name
            action: upsert
            value: "new scope name"

          - name: content_modifier
            context: otel_scope_version
            action: upsert
            value: "3.1.0"

  outputs:
    - name: stdout
      match: '*'

    - name: opentelemetry
      match: '*'
      host: 127.0.0.1
      port: ${TEST_SUITE_HTTP_PORT}

----- end of file -----

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Jun 24, 2024
1 parent f5737c0 commit 7060979
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 7 deletions.
12 changes: 6 additions & 6 deletions plugins/processor_content_modifier/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ static int cb_process_traces(struct flb_processor_instance *ins,

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "action", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str),
"Action to perform over the content: insert, upsert, delete, rename or hash."
FLB_CONFIG_MAP_STR, "context", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str),
"Context where the action will be applied."
},

{
FLB_CONFIG_MAP_STR, "context", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str),
"Context to apply the action."
FLB_CONFIG_MAP_STR, "action", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str),
"Action to perform over the content: insert, upsert, delete, rename or hash."
},

{
Expand Down
5 changes: 5 additions & 0 deletions plugins/processor_content_modifier/cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum {
CM_CONTEXT_LOG_METADATA,
CM_CONTEXT_LOG_BODY,

CM_CONTEXT_OTEL_RESOURCE_ATTR,
CM_CONTEXT_OTEL_SCOPE_NAME,
CM_CONTEXT_OTEL_SCOPE_VERSION,
CM_CONTEXT_OTEL_SCOPE_ATTR,

/* Metrics */
CM_CONTEXT_METRIC_NAME,
CM_CONTEXT_METRIC_DESCRIPTION,
Expand Down
88 changes: 87 additions & 1 deletion plugins/processor_content_modifier/cm_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,92 @@ static int set_context(struct content_modifier_ctx *ctx)
strcasecmp(ctx->context_str, "record") == 0) {
context = CM_CONTEXT_LOG_BODY;
}
/*
* OpenTelemetry contexts
* ----------------------
*/
else if (strcasecmp(ctx->context_str, "otel_resource_attributes") == 0) {
context = CM_CONTEXT_OTEL_RESOURCE_ATTR;
}
else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) {
/*
* scope name is restricted to specific actions, make sure the user
* cannot messed it up
*
* action allowed ?
* -----------------------------
* CM_ACTION_INSERT Yes
* CM_ACTION_UPSERT Yes
* CM_ACTION_DELETE Yes
* CM_ACTION_RENAME No
* CM_ACTION_HASH Yes
* CM_ACTION_EXTRACT No
* CM_ACTION_CONVERT No
*/

if (ctx->action_type == CM_ACTION_RENAME ||
ctx->action_type == CM_ACTION_EXTRACT ||
ctx->action_type == CM_ACTION_CONVERT) {
flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'",
ctx->action_str, ctx->context_str);
return -1;
}

/* check that 'name' is the key set */
if (!ctx->key) {
ctx->key = flb_sds_create("name");
}
else if (strcasecmp(ctx->key, "name") != 0) {
flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'name', no '%s'",
ctx->context_str, ctx->key);
return -1;
}

context = CM_CONTEXT_OTEL_SCOPE_NAME;
}
else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) {
/*
* scope version, same as the name, it's restricted to specific actions, make sure the user
* cannot messed it up
*
* action allowed ?
* -----------------------------
* CM_ACTION_INSERT Yes
* CM_ACTION_UPSERT Yes
* CM_ACTION_DELETE Yes
* CM_ACTION_RENAME No
* CM_ACTION_HASH Yes
* CM_ACTION_EXTRACT No
* CM_ACTION_CONVERT No
*/

if (ctx->action_type == CM_ACTION_RENAME ||
ctx->action_type == CM_ACTION_EXTRACT ||
ctx->action_type == CM_ACTION_CONVERT) {
flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'",
ctx->action_str, ctx->context_str);
return -1;
}

/* check that 'version' is the key set */
if (!ctx->key) {
ctx->key = flb_sds_create("version");
}
else if (strcasecmp(ctx->key, "version") != 0) {
flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'version', no '%s'",
ctx->context_str, ctx->key);
return -1;
}
context = CM_CONTEXT_OTEL_SCOPE_VERSION;
}
else if (strcasecmp(ctx->context_str, "otel_scope_attributes") == 0) {
context = CM_CONTEXT_OTEL_SCOPE_ATTR;
}
else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) {
}
else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) {
context = CM_CONTEXT_OTEL_SCOPE_VERSION;
}
else {
flb_plg_error(ctx->ins, "unknown logs context '%s'", ctx->context_str);
return -1;
Expand Down Expand Up @@ -177,7 +263,7 @@ static int check_action_requirements(struct content_modifier_ctx *ctx)
/* these only requires a key, already validated (useless code) */
}
else if (ctx->action_type == CM_ACTION_INSERT || ctx->action_type == CM_ACTION_UPSERT ||
ctx->action_type == CM_ACTION_RENAME) {
ctx->action_type == CM_ACTION_RENAME) {

if (!ctx->value) {
flb_plg_error(ctx->ins, "value is required for action '%s'", ctx->action_str);
Expand Down
175 changes: 175 additions & 0 deletions plugins/processor_content_modifier/cm_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,25 +595,200 @@ static int run_action_convert(struct content_modifier_ctx *ctx,
return 0;
}

static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist)
{
int ret;
struct cfl_list *head;
struct cfl_list *tmp;
struct cfl_kvpair *kvpair;
struct cfl_variant *val;
struct cfl_kvlist *kvlist_tmp;

/* iterate resource to find the attributes field */
cfl_list_foreach_safe(head, tmp, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
if (cfl_sds_len(kvpair->key) != 10) {
continue;
}

if (strncmp(kvpair->key, "attributes", 10) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

return val;
}
}

/* create an empty kvlist as the value of attributes */
kvlist_tmp = cfl_kvlist_create();
if (!kvlist_tmp) {
return NULL;
}

/* create the attributes kvpair */
ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp);
if (ret != 0) {
cfl_kvlist_destroy(kvlist_tmp);
return NULL;
}

/* get the last kvpair from the list */
kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head);
if (!kvpair) {
return NULL;
}

return kvpair->val;
}


static struct cfl_variant *otel_get_attributes(int context, struct flb_mp_chunk_record *record)
{
int key_len;
const char *key_buf;
struct cfl_list *head;
struct cfl_object *obj = NULL;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;
struct cfl_variant *var_attr;

if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) {
key_buf = "resource";
key_len = 8;
}
else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) {
key_buf = "scope";
key_len = 5;
}
else {
return NULL;
}

obj = record->cobj_record;
kvlist = obj->variant->data.as_kvlist;
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != key_len) {
continue;
}

if (strncmp(kvpair->key, key_buf, key_len) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

var_attr = otel_get_or_create_attributes(val->data.as_kvlist);
if (!var_attr) {
return NULL;
}

return var_attr;
}
}

return NULL;
}

static struct cfl_variant *otel_get_scope(struct flb_mp_chunk_record *record)
{
struct cfl_list *head;
struct cfl_object *obj;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;

obj = record->cobj_record;
kvlist = obj->variant->data.as_kvlist;
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != 5) {
continue;
}

if (strncmp(kvpair->key, "scope", 5) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

return val;
}
}

return NULL;
}
int cm_logs_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct flb_mp_chunk_cobj *chunk_cobj,
const char *tag,
int tag_len)
{
int ret = -1;
int record_type;
struct flb_mp_chunk_record *record;
struct cfl_object *obj = NULL;
struct cfl_object obj_static;
struct cfl_variant *var;

/* Iterate records */
while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) {
obj = NULL;

/* Retrieve information about the record type */
ret = flb_log_event_decoder_get_record_type(&record->event, &record_type);
if (ret != 0) {
flb_plg_error(ctx->ins, "record has invalid event type");
continue;
}

/* retrieve the target cfl object */
if (ctx->context_type == CM_CONTEXT_LOG_METADATA) {
obj = record->cobj_metadata;
}
else if (ctx->context_type == CM_CONTEXT_LOG_BODY) {
obj = record->cobj_record;
}
else if (ctx->context_type == CM_CONTEXT_OTEL_RESOURCE_ATTR &&
record_type == FLB_LOG_EVENT_GROUP_START) {
var = otel_get_attributes(CM_CONTEXT_OTEL_RESOURCE_ATTR, record);
if (!var) {
continue;
}

obj_static.type = CFL_VARIANT_KVLIST;
obj_static.variant = var;
obj = &obj_static;
}
else if (ctx->context_type == CM_CONTEXT_OTEL_SCOPE_ATTR &&
record_type == FLB_LOG_EVENT_GROUP_START) {

var = otel_get_attributes(CM_CONTEXT_OTEL_SCOPE_ATTR, record);
if (!var) {
continue;
}

obj_static.type = CFL_VARIANT_KVLIST;
obj_static.variant = var;
obj = &obj_static;
}
else if ((ctx->context_type == CM_CONTEXT_OTEL_SCOPE_NAME || ctx->context_type == CM_CONTEXT_OTEL_SCOPE_VERSION) &&
record_type == FLB_LOG_EVENT_GROUP_START) {

var = otel_get_scope(record);
obj_static.type = CFL_VARIANT_KVLIST;
obj_static.variant = var;
obj = &obj_static;
}

if (!obj) {
continue;
}

/* the operation on top of the data type is unsupported */
if (obj->variant->type != CFL_VARIANT_KVLIST) {
Expand Down

0 comments on commit 7060979

Please sign in to comment.