Skip to content

Commit

Permalink
Remove misleading metric filter warning (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Sep 7, 2023
1 parent c8a4d15 commit dfa89da
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 50 deletions.
34 changes: 20 additions & 14 deletions receiver/adapter/accumulator/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type otelAccumulator struct {
mutex sync.Mutex
}

var emptyMetricsBeforeFilteringError = errors.New("empty metrics before filtering metrics")
var (
errEmptyAfterConvert = errors.New("empty metrics after converting fields")
)

func NewAccumulator(input *models.RunningInput, ctx context.Context, consumer consumer.Metrics, logger *zap.Logger) OtelAccumulator {
_, isServiceInput := input.Input.(telegraf.ServiceInput)
Expand Down Expand Up @@ -118,15 +120,19 @@ func (o *otelAccumulator) addMetric(
// convertToOtelMetricsAndAddMetric converts Telegraf's Metric model to OTEL Stream Model
// and add the OTEl Metric to channel
func (o *otelAccumulator) convertToOtelMetricsAndAddMetric(m telegraf.Metric) {
mMetric, err := o.modifyMetricandConvertToOtelValue(m)
mMetric, err := o.modifyMetricAndConvertToOtelValue(m)
if err != nil {
if !errors.Is(err, emptyMetricsBeforeFilteringError) {
o.logger.Warn("Filter and convert failed",
zap.String("name", m.Name()),
zap.Any("tags", m.Tags()),
zap.Any("fields", m.Fields()),
zap.Any("type", m.Type()), zap.Error(err))
}
o.logger.Warn(
"Conversion of metric values failed",
zap.String("name", m.Name()),
zap.Any("tags", m.Tags()),
zap.Any("fields", m.Fields()),
zap.Any("type", m.Type()),
zap.Error(err),
)
}

if mMetric == nil {
return
}

Expand Down Expand Up @@ -161,19 +167,19 @@ func (o *otelAccumulator) GetOtelMetrics() pmetric.Metrics {
return finalMetrics
}

// modifyMetricandConvertToOtelValue modifies metric by filtering metrics, add prefix for each field in metrics, etc
// modifyMetricAndConvertToOtelValue modifies metric by filtering metrics, add prefix for each field in metrics, etc
// and convert to value supported by OTEL (int64 and float64).
// Distributions are not modified yet.
func (o *otelAccumulator) modifyMetricandConvertToOtelValue(m telegraf.Metric) (telegraf.Metric, error) {
func (o *otelAccumulator) modifyMetricAndConvertToOtelValue(m telegraf.Metric) (telegraf.Metric, error) {
if len(m.Fields()) == 0 {
return nil, emptyMetricsBeforeFilteringError
return nil, nil
}

// MakeMetric modifies metrics (e.g filter metrics, add prefix for measurement) by customer config
// https://github.com/influxdata/telegraf/blob/5479df2eb5e8401773d604a83590d789a158c735/models/running_input.go#L91-L114
mMetric := o.input.MakeMetric(m)
if mMetric == nil {
return nil, errors.New("empty metrics after filtering metrics")
return nil, nil
}

if m.Type() == telegraf.Histogram {
Expand All @@ -194,7 +200,7 @@ func (o *otelAccumulator) modifyMetricandConvertToOtelValue(m telegraf.Metric) (
}

if len(mMetric.Fields()) == 0 {
return nil, errors.New("empty metrics after final conversion")
return nil, errEmptyAfterConvert
}

return mMetric, nil
Expand Down
121 changes: 86 additions & 35 deletions receiver/adapter/accumulator/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -192,46 +193,96 @@ func Test_Accumulator_WithUnsupportedValueAndEmptyFields(t *testing.T) {
as.Equal(0, otelMetrics.ResourceMetrics().Len())
}

func Test_ModifyMetricandConvertMetricValue(t *testing.T) {
t.Helper()

func Test_ModifyMetricAndConvertMetricValue(t *testing.T) {
as := assert.New(t)
cfg := &models.InputConfig{
Filter: models.Filter{
FieldDrop: []string{"filtered_field"},
},
}
acc := newOtelAccumulatorWithConfig(as, nil, false, cfg)

acc := newOtelAccumulatorWithTestRunningInputs(as, nil, false)

metric := testutil.MustMetric(
"cpu",
map[string]string{
"instance_id": "mock",
testCases := map[string]struct {
metric telegraf.Metric
wantErr error
wantFields map[string]interface{}
wantDroppedFields []string
}{
"WithEmpty": {
metric: testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Now(),
telegraf.Gauge,
),
},
map[string]interface{}{
"tx": float64(4.5),
"rx": int32(3),
"error": false,
"client": "redis",
"WithFiltered": {
metric: testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"filtered_field": 1,
},
time.Now(),
telegraf.Gauge,
),
},
time.Now(),
telegraf.Gauge,
)

modifiedMetric, err := acc.modifyMetricandConvertToOtelValue(metric)
as.NoError(err)

txMetricValue, txMetricExist := modifiedMetric.GetField("tx")
as.True(txMetricExist)
as.Equal(float64(4.5), txMetricValue)

rxMetricValue, rxMetricExist := modifiedMetric.GetField("rx")
as.True(rxMetricExist)
as.Equal(int64(3), rxMetricValue)

errorMetricValue, errorMetricExist := modifiedMetric.GetField("error")
as.True(errorMetricExist)
as.Equal(int64(0), errorMetricValue)

_, clientMetricExist := modifiedMetric.GetField("client")
as.False(clientMetricExist)
"WithInvalidConvert": {
metric: testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"client": "redis",
},
time.Now(),
telegraf.Gauge,
),
wantErr: errEmptyAfterConvert,
},
"WithValid": {
metric: testutil.MustMetric(
"cpu",
map[string]string{
"instance_id": "mock",
},
map[string]interface{}{
"tx": 4.5,
"rx": int32(3),
"error": false,
"client": "redis",
},
time.Now(),
telegraf.Gauge,
),
wantFields: map[string]interface{}{
"tx": 4.5,
"rx": int64(3),
"error": int64(0),
},
wantDroppedFields: []string{"client"},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
got, err := acc.modifyMetricAndConvertToOtelValue(testCase.metric)
as.Equal(testCase.wantErr, err)
if len(testCase.wantFields) == 0 {
as.Nil(got)
} else {
for field, wantValue := range testCase.wantFields {
value, ok := got.GetField(field)
as.True(ok)
as.Equal(wantValue, value)
}
for _, field := range testCase.wantDroppedFields {
_, ok := got.GetField(field)
as.False(ok)
}
}
})
}
}

func Test_Accumulator_AddMetric(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion receiver/adapter/accumulator/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ func generateExpectedAttributes() pcommon.Map {
}

func newOtelAccumulatorWithTestRunningInputs(as *assert.Assertions, consumer consumer.Metrics, isServiceInput bool) *otelAccumulator {
return newOtelAccumulatorWithConfig(as, consumer, isServiceInput, &models.InputConfig{})
}

func newOtelAccumulatorWithConfig(as *assert.Assertions, consumer consumer.Metrics, isServiceInput bool, cfg *models.InputConfig) *otelAccumulator {
var input telegraf.Input
if isServiceInput {
input = &TestServiceRunningInput{}
} else {
input = &TestRunningInput{}
}
ri := models.NewRunningInput(input, &models.InputConfig{})
ri := models.NewRunningInput(input, cfg)
as.NoError(ri.Config.Filter.Compile())

return &otelAccumulator{
Expand Down

0 comments on commit dfa89da

Please sign in to comment.