diff --git a/receiver/adapter/accumulator/accumulator.go b/receiver/adapter/accumulator/accumulator.go index 1b1150da52..3a22bfc955 100644 --- a/receiver/adapter/accumulator/accumulator.go +++ b/receiver/adapter/accumulator/accumulator.go @@ -48,7 +48,9 @@ type otelAccumulator struct { mutex sync.Mutex } -var emptyMetricsBeforeFilteringError = errors.New("empty metrics before filtering metrics") +var ( + errEmptyAfterConvert = errors.New("empty metrics after converting fields") +) func NewAccumulator(input *models.RunningInput, ctx context.Context, consumer consumer.Metrics, logger *zap.Logger) OtelAccumulator { _, isServiceInput := input.Input.(telegraf.ServiceInput) @@ -118,15 +120,19 @@ func (o *otelAccumulator) addMetric( // convertToOtelMetricsAndAddMetric converts Telegraf's Metric model to OTEL Stream Model // and add the OTEl Metric to channel func (o *otelAccumulator) convertToOtelMetricsAndAddMetric(m telegraf.Metric) { - mMetric, err := o.modifyMetricandConvertToOtelValue(m) + mMetric, err := o.modifyMetricAndConvertToOtelValue(m) if err != nil { - if !errors.Is(err, emptyMetricsBeforeFilteringError) { - o.logger.Warn("Filter and convert failed", - zap.String("name", m.Name()), - zap.Any("tags", m.Tags()), - zap.Any("fields", m.Fields()), - zap.Any("type", m.Type()), zap.Error(err)) - } + o.logger.Warn( + "Conversion of metric values failed", + zap.String("name", m.Name()), + zap.Any("tags", m.Tags()), + zap.Any("fields", m.Fields()), + zap.Any("type", m.Type()), + zap.Error(err), + ) + } + + if mMetric == nil { return } @@ -161,19 +167,19 @@ func (o *otelAccumulator) GetOtelMetrics() pmetric.Metrics { return finalMetrics } -// modifyMetricandConvertToOtelValue modifies metric by filtering metrics, add prefix for each field in metrics, etc +// modifyMetricAndConvertToOtelValue modifies metric by filtering metrics, add prefix for each field in metrics, etc // and convert to value supported by OTEL (int64 and float64). // Distributions are not modified yet. -func (o *otelAccumulator) modifyMetricandConvertToOtelValue(m telegraf.Metric) (telegraf.Metric, error) { +func (o *otelAccumulator) modifyMetricAndConvertToOtelValue(m telegraf.Metric) (telegraf.Metric, error) { if len(m.Fields()) == 0 { - return nil, emptyMetricsBeforeFilteringError + return nil, nil } // MakeMetric modifies metrics (e.g filter metrics, add prefix for measurement) by customer config // https://github.com/influxdata/telegraf/blob/5479df2eb5e8401773d604a83590d789a158c735/models/running_input.go#L91-L114 mMetric := o.input.MakeMetric(m) if mMetric == nil { - return nil, errors.New("empty metrics after filtering metrics") + return nil, nil } if m.Type() == telegraf.Histogram { @@ -194,7 +200,7 @@ func (o *otelAccumulator) modifyMetricandConvertToOtelValue(m telegraf.Metric) ( } if len(mMetric.Fields()) == 0 { - return nil, errors.New("empty metrics after final conversion") + return nil, errEmptyAfterConvert } return mMetric, nil diff --git a/receiver/adapter/accumulator/accumulator_test.go b/receiver/adapter/accumulator/accumulator_test.go index 0ca082a7eb..04a626149f 100644 --- a/receiver/adapter/accumulator/accumulator_test.go +++ b/receiver/adapter/accumulator/accumulator_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/consumertest" @@ -192,46 +193,96 @@ func Test_Accumulator_WithUnsupportedValueAndEmptyFields(t *testing.T) { as.Equal(0, otelMetrics.ResourceMetrics().Len()) } -func Test_ModifyMetricandConvertMetricValue(t *testing.T) { - t.Helper() - +func Test_ModifyMetricAndConvertMetricValue(t *testing.T) { as := assert.New(t) + cfg := &models.InputConfig{ + Filter: models.Filter{ + FieldDrop: []string{"filtered_field"}, + }, + } + acc := newOtelAccumulatorWithConfig(as, nil, false, cfg) - acc := newOtelAccumulatorWithTestRunningInputs(as, nil, false) - - metric := testutil.MustMetric( - "cpu", - map[string]string{ - "instance_id": "mock", + testCases := map[string]struct { + metric telegraf.Metric + wantErr error + wantFields map[string]interface{} + wantDroppedFields []string + }{ + "WithEmpty": { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Now(), + telegraf.Gauge, + ), }, - map[string]interface{}{ - "tx": float64(4.5), - "rx": int32(3), - "error": false, - "client": "redis", + "WithFiltered": { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "filtered_field": 1, + }, + time.Now(), + telegraf.Gauge, + ), }, - time.Now(), - telegraf.Gauge, - ) - - modifiedMetric, err := acc.modifyMetricandConvertToOtelValue(metric) - as.NoError(err) - - txMetricValue, txMetricExist := modifiedMetric.GetField("tx") - as.True(txMetricExist) - as.Equal(float64(4.5), txMetricValue) - - rxMetricValue, rxMetricExist := modifiedMetric.GetField("rx") - as.True(rxMetricExist) - as.Equal(int64(3), rxMetricValue) - - errorMetricValue, errorMetricExist := modifiedMetric.GetField("error") - as.True(errorMetricExist) - as.Equal(int64(0), errorMetricValue) - - _, clientMetricExist := modifiedMetric.GetField("client") - as.False(clientMetricExist) + "WithInvalidConvert": { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "client": "redis", + }, + time.Now(), + telegraf.Gauge, + ), + wantErr: errEmptyAfterConvert, + }, + "WithValid": { + metric: testutil.MustMetric( + "cpu", + map[string]string{ + "instance_id": "mock", + }, + map[string]interface{}{ + "tx": 4.5, + "rx": int32(3), + "error": false, + "client": "redis", + }, + time.Now(), + telegraf.Gauge, + ), + wantFields: map[string]interface{}{ + "tx": 4.5, + "rx": int64(3), + "error": int64(0), + }, + wantDroppedFields: []string{"client"}, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + got, err := acc.modifyMetricAndConvertToOtelValue(testCase.metric) + as.Equal(testCase.wantErr, err) + if len(testCase.wantFields) == 0 { + as.Nil(got) + } else { + for field, wantValue := range testCase.wantFields { + value, ok := got.GetField(field) + as.True(ok) + as.Equal(wantValue, value) + } + for _, field := range testCase.wantDroppedFields { + _, ok := got.GetField(field) + as.False(ok) + } + } + }) + } } func Test_Accumulator_AddMetric(t *testing.T) { diff --git a/receiver/adapter/accumulator/testutil.go b/receiver/adapter/accumulator/testutil.go index 26712bde9a..4454a93544 100644 --- a/receiver/adapter/accumulator/testutil.go +++ b/receiver/adapter/accumulator/testutil.go @@ -45,14 +45,17 @@ func generateExpectedAttributes() pcommon.Map { } func newOtelAccumulatorWithTestRunningInputs(as *assert.Assertions, consumer consumer.Metrics, isServiceInput bool) *otelAccumulator { + return newOtelAccumulatorWithConfig(as, consumer, isServiceInput, &models.InputConfig{}) +} +func newOtelAccumulatorWithConfig(as *assert.Assertions, consumer consumer.Metrics, isServiceInput bool, cfg *models.InputConfig) *otelAccumulator { var input telegraf.Input if isServiceInput { input = &TestServiceRunningInput{} } else { input = &TestRunningInput{} } - ri := models.NewRunningInput(input, &models.InputConfig{}) + ri := models.NewRunningInput(input, cfg) as.NoError(ri.Config.Filter.Compile()) return &otelAccumulator{