From de12f6c9ab8c35c7a21d33cae1c7f754d372db67 Mon Sep 17 00:00:00 2001 From: constanca Date: Mon, 9 Dec 2024 11:49:34 +0100 Subject: [PATCH] Split unmarshall metrics and logs in auto package --- receiver/awsfirehosereceiver/factory.go | 6 +- .../internal/unmarshaler/auto/unmarshaller.go | 88 ++++++++++++----- .../unmarshaler/auto/unmarshaller_test.go | 97 ++++++++++++------- .../internal/unmarshaler/cwlog/unmarshaler.go | 6 +- .../unmarshaler/cwlog/unmarshaler_test.go | 4 +- .../unmarshaler/cwmetricstream/unmarshaler.go | 6 +- .../cwmetricstream/unmarshaler_test.go | 2 +- .../otlpmetricstream/unmarshaler.go | 4 +- .../otlpmetricstream/unmarshaler_test.go | 2 +- .../internal/unmarshaler/unmarshaler.go | 15 +-- .../unmarshalertest/nop_logs_unmarshaler.go | 8 +- .../nop_logs_unmarshaler_test.go | 6 +- .../nop_metrics_unmarshaler.go | 8 +- .../nop_metrics_unmarshaler_test.go | 6 +- receiver/awsfirehosereceiver/logs_receiver.go | 2 +- .../awsfirehosereceiver/metrics_receiver.go | 2 +- 16 files changed, 169 insertions(+), 93 deletions(-) diff --git a/receiver/awsfirehosereceiver/factory.go b/receiver/awsfirehosereceiver/factory.go index e376595cae7f..c78a27419df4 100644 --- a/receiver/awsfirehosereceiver/factory.go +++ b/receiver/awsfirehosereceiver/factory.go @@ -61,17 +61,21 @@ func validateRecordType(recordType string) error { func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler { cwmsu := cwmetricstream.NewUnmarshaler(logger) otlpv1msu := otlpmetricstream.NewUnmarshaler(logger) + autoUnmarshaler := auto.NewUnmarshaler(logger) return map[string]unmarshaler.MetricsUnmarshaler{ cwmsu.Type(): cwmsu, otlpv1msu.Type(): otlpv1msu, + auto.TypeStr: autoUnmarshaler, } } // defaultLogsUnmarshalers creates a map of the available logs unmarshalers. func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler { u := cwlog.NewUnmarshaler(logger) + autoUnmarshaler := auto.NewUnmarshaler(logger) return map[string]unmarshaler.LogsUnmarshaler{ - u.Type(): u, + u.Type(): u, + auto.TypeStr: autoUnmarshaler, } } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go index efe3cc519c74..be4f8b04fdf6 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go @@ -23,6 +23,7 @@ const ( var ( errInvalidRecords = errors.New("record format invalid") errUnsupportedContentType = errors.New("content type not supported") + errInvalidFormatStart = errors.New("unable to decode data length from message") ) // Unmarshaler for the CloudWatch Log JSON record format. @@ -115,8 +116,8 @@ func (u *Unmarshaler) addCloudwatchMetric( return nil } -func (u *Unmarshaler) unmarshalJSON(md pmetric.Metrics, ld plog.Logs, records [][]byte) error { - resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder) +func (u *Unmarshaler) unmarshalJSONMetrics(records [][]byte) (pmetric.Metrics, error) { + md := pmetric.NewMetrics() resourceMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder) for i, compressed := range records { record, err := compression.Unzip(compressed) @@ -127,57 +128,100 @@ func (u *Unmarshaler) unmarshalJSON(md pmetric.Metrics, ld plog.Logs, records [] ) continue } - // Multiple metrics/logs in each record separated by newline character for j, single := range bytes.Split(record, []byte(recordDelimiter)) { if len(single) == 0 { continue } - if isCloudWatchLog(single) { - if err = u.addCloudwatchLog(single, resourceLogs, ld); err != nil { + if isCloudwatchMetrics(single) { + if err = u.addCloudwatchMetric(single, resourceMetrics, md); err != nil { u.logger.Error( - "Unable to unmarshal record to cloudwatch log", + "Unable to unmarshal input", zap.Error(err), - zap.Int("record_index", i), zap.Int("single_index", j), + zap.Int("record_index", i), ) continue } + } else { + u.logger.Error( + "Unsupported metric type", + zap.Int("record_index", i), + zap.Int("single_index", j), + ) + continue } + } + } - if isCloudwatchMetrics(single) { - if err = u.addCloudwatchMetric(single, resourceMetrics, md); err != nil { + if len(resourceMetrics) == 0 { + return md, errInvalidRecords + } + return md, nil +} + +func (u *Unmarshaler) unmarshalJSONLogs(records [][]byte) (plog.Logs, error) { + ld := plog.NewLogs() + resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder) + for i, compressed := range records { + record, err := compression.Unzip(compressed) + if err != nil { + u.logger.Error("Failed to unzip record", + zap.Error(err), + zap.Int("record_index", i), + ) + continue + } + + for j, single := range bytes.Split(record, []byte(recordDelimiter)) { + if len(single) == 0 { + continue + } + + if isCloudWatchLog(single) { + if err = u.addCloudwatchLog(single, resourceLogs, ld); err != nil { u.logger.Error( - "Unable to unmarshal input", + "Unable to unmarshal record to cloudwatch log", zap.Error(err), - zap.Int("single_index", j), zap.Int("record_index", i), + zap.Int("single_index", j), ) continue } + } else { + u.logger.Error( + "Unsupported log type", + zap.Int("record_index", i), + zap.Int("single_index", j), + ) + continue } } } - if len(resourceLogs) == 0 && len(resourceMetrics) == 0 { - return errInvalidRecords + if len(resourceLogs) == 0 { + return ld, errInvalidRecords } - return nil + return ld, nil } -func (u *Unmarshaler) Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error) { - ld := plog.NewLogs() - md := pmetric.NewMetrics() +func (u *Unmarshaler) UnmarshalLogs(contentType string, records [][]byte) (plog.Logs, error) { switch contentType { case "application/json": - return md, ld, u.unmarshalJSON(md, ld, records) - case "application/x-protobuf": - // TODO implement this - return md, ld, nil + return u.unmarshalJSONLogs(records) default: - return md, ld, errUnsupportedContentType + return plog.NewLogs(), errUnsupportedContentType } +} +func (u *Unmarshaler) UnmarshalMetrics(contentType string, records [][]byte) (pmetric.Metrics, error) { + switch contentType { + case "application/json": + return u.unmarshalJSONMetrics(records) + //TODO case "application/x-protobuf": + default: + return pmetric.NewMetrics(), errUnsupportedContentType + } } func (u *Unmarshaler) Type() string { diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go index 70fb84cb2bcf..184f1bd3cb4c 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go @@ -16,20 +16,76 @@ func TestType(t *testing.T) { require.Equal(t, TypeStr, unmarshaler.Type()) } -// Unmarshall cloudwatch metrics and logs -func TestUnmarshal_JSON(t *testing.T) { +func TestUnmarshalMetrics_JSON(t *testing.T) { t.Parallel() unmarshaler := NewUnmarshaler(zap.NewNop()) testCases := map[string]struct { dir string filename string - logResourceCount int - logRecordCount int metricResourceCount int metricCount int metricDataPointCount int err error + }{ + "cwmetric:WithMultipleRecords": { + dir: "cwmetricstream", + filename: "multiple_records", + metricResourceCount: 6, + metricCount: 33, + metricDataPointCount: 127, + }, + "cwmetric:WithSingleRecord": { + dir: "cwmetricstream", + filename: "single_record", + metricResourceCount: 1, + metricCount: 1, + metricDataPointCount: 1, + }, + "cwmetric:WithInvalidRecords": { + dir: "cwmetricstream", + filename: "invalid_records", + err: errInvalidRecords, + }, + "cwmetric:WithSomeInvalidRecords": { + dir: "cwmetricstream", + filename: "some_invalid_records", + metricResourceCount: 5, + metricCount: 35, + metricDataPointCount: 88, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) + require.NoError(t, err) + + compressedRecord, err := compression.Zip(record) + require.NoError(t, err) + records := [][]byte{compressedRecord} + + metrics, err := unmarshaler.UnmarshalMetrics("application/json", records) + require.Equal(t, testCase.err, err) + + require.Equal(t, testCase.metricResourceCount, metrics.ResourceMetrics().Len()) + require.Equal(t, testCase.metricDataPointCount, metrics.DataPointCount()) + require.Equal(t, testCase.metricCount, metrics.MetricCount()) + }) + } +} + +// Unmarshall cloudwatch metrics and logs +func TestUnmarshalLogs_JSON(t *testing.T) { + t.Parallel() + + unmarshaler := NewUnmarshaler(zap.NewNop()) + testCases := map[string]struct { + dir string + filename string + logResourceCount int + logRecordCount int + err error }{ "cwlog:WithMultipleRecords": { dir: "cwlog", @@ -60,32 +116,6 @@ func TestUnmarshal_JSON(t *testing.T) { logResourceCount: 3, logRecordCount: 6, }, - "cwmetric:WithMultipleRecords": { - dir: "cwmetricstream", - filename: "multiple_records", - metricResourceCount: 6, - metricCount: 33, - metricDataPointCount: 127, - }, - "cwmetric:WithSingleRecord": { - dir: "cwmetricstream", - filename: "single_record", - metricResourceCount: 1, - metricCount: 1, - metricDataPointCount: 1, - }, - "cwmetric:WithInvalidRecords": { - dir: "cwmetricstream", - filename: "invalid_records", - err: errInvalidRecords, - }, - "cwmetric:WithSomeInvalidRecords": { - dir: "cwmetricstream", - filename: "some_invalid_records", - metricResourceCount: 5, - metricCount: 35, - metricDataPointCount: 88, - }, } for name, testCase := range testCases { @@ -97,16 +127,11 @@ func TestUnmarshal_JSON(t *testing.T) { require.NoError(t, err) records := [][]byte{compressedRecord} - metrics, logs, err := unmarshaler.Unmarshal("application/json", records) + logs, err := unmarshaler.UnmarshalLogs("application/json", records) require.Equal(t, testCase.err, err) require.Equal(t, testCase.logResourceCount, logs.ResourceLogs().Len()) require.Equal(t, testCase.logRecordCount, logs.LogRecordCount()) - - require.Equal(t, testCase.metricResourceCount, metrics.ResourceMetrics().Len()) - require.Equal(t, testCase.metricDataPointCount, metrics.DataPointCount()) - require.Equal(t, testCase.metricCount, metrics.MetricCount()) }) } - } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index f0a0a63feab0..f931aee57d10 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -34,10 +34,10 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into cWLogs and uses the +// UnmarshalLogs deserializes the records into CWLog and uses the // ResourceLogsBuilder to group them into a single plog.Logs. -// Skips invalid cWLogs received in the record. -func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { +// Skips invalid CWLog received in the record. +func (u Unmarshaler) UnmarshalLogs(_ string, records [][]byte) (plog.Logs, error) { md := plog.NewLogs() builders := make(map[ResourceAttributes]*ResourceLogsBuilder) for recordIndex, compressedRecord := range records { diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go index 718b59d4eca0..8ef446f92bd4 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go @@ -61,7 +61,7 @@ func TestUnmarshal(t *testing.T) { require.NoError(t, err) records := [][]byte{compressedRecord} - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalLogs("", records) require.Equal(t, testCase.wantErr, err) require.NotNil(t, got) require.Equal(t, testCase.wantResourceCount, got.ResourceLogs().Len()) @@ -79,7 +79,7 @@ func TestLogTimestamp(t *testing.T) { require.NoError(t, err) records := [][]byte{compressedRecord} - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalLogs("", records) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, 1, got.ResourceLogs().Len()) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go index 015b774bcfa1..009deee3e1db 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go @@ -36,10 +36,10 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into cWMetrics and uses the +// UnmarshalMetrics deserializes the records into CWMetric and uses the // ResourceMetricsBuilder to group them into a single pmetric.Metrics. -// Skips invalid cWMetrics received in the record and -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { +// Skips invalid CWMetric received in the record. +func (u Unmarshaler) UnmarshalMetrics(_ string, records [][]byte) (pmetric.Metrics, error) { md := pmetric.NewMetrics() builders := make(map[ResourceAttributes]*ResourceMetricsBuilder) for recordIndex, record := range records { diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go index e3aab6551abe..b284f59dfad5 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go @@ -56,7 +56,7 @@ func TestUnmarshal(t *testing.T) { records := [][]byte{record} - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalMetrics("", records) require.Equal(t, testCase.wantErr, err) require.NotNil(t, got) require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go index c3dde9699e90..4a82e68dea34 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go @@ -36,8 +36,8 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into pmetric.Metrics -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { +// UnmarshalMetrics deserializes the records into pmetric.Metrics +func (u Unmarshaler) UnmarshalMetrics(_ string, records [][]byte) (pmetric.Metrics, error) { md := pmetric.NewMetrics() for recordIndex, record := range records { dataLen, pos := len(record), 0 diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go index 3edff7005b82..85d57a92c90a 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go @@ -97,7 +97,7 @@ func TestUnmarshal(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - got, err := unmarshaler.Unmarshal(testCase.records) + got, err := unmarshaler.UnmarshalMetrics("", testCase.records) require.NoError(t, err) require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) require.Equal(t, testCase.wantMetricCount, got.MetricCount()) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go index 37a57503daf2..bd4302c31968 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go @@ -10,8 +10,8 @@ import ( // MetricsUnmarshaler deserializes the message body type MetricsUnmarshaler interface { - // Unmarshal deserializes the records into metrics. - Unmarshal(records [][]byte) (pmetric.Metrics, error) + // UnmarshalMetrics deserializes the records into metrics. + UnmarshalMetrics(contentType string, records [][]byte) (pmetric.Metrics, error) // Type of the serialized messages. Type() string @@ -19,8 +19,8 @@ type MetricsUnmarshaler interface { // LogsUnmarshaler deserializes the message body type LogsUnmarshaler interface { - // Unmarshal deserializes the records into logs. - Unmarshal(records [][]byte) (plog.Logs, error) + // UnmarshalLogs deserializes the records into logs. + UnmarshalLogs(contentType string, records [][]byte) (plog.Logs, error) // Type of the serialized messages. Type() string @@ -28,8 +28,11 @@ type LogsUnmarshaler interface { // Unmarshaler deserializes the message body type Unmarshaler interface { - // Unmarshal deserializes the records into metrics or logs. - Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error) + // UnmarshalMetrics deserializes the records into metrics. + UnmarshalMetrics(contentType string, records [][]byte) (pmetric.Metrics, error) + + // UnmarshalLogs deserializes the records into logs. + UnmarshalLogs(contentType string, records [][]byte) (plog.Logs, error) // Type of the serialized messages. Type() string diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go index 79f29caecfdb..3aa722451682 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go @@ -25,19 +25,19 @@ func NewNopLogs() *NopLogsUnmarshaler { } // NewWithLogs provides a nop logs unmarshaler with the passed -// in logs as the result of the Unmarshal and no error. +// in logs as the result of the UnmarshalLogs and no error. func NewWithLogs(logs plog.Logs) *NopLogsUnmarshaler { return &NopLogsUnmarshaler{logs: logs} } // NewErrLogs provides a nop logs unmarshaler with the passed -// in error as the Unmarshal error. +// in error as the UnmarshalLogs error. func NewErrLogs(err error) *NopLogsUnmarshaler { return &NopLogsUnmarshaler{err: err} } -// Unmarshal deserializes the records into logs. -func (u *NopLogsUnmarshaler) Unmarshal([][]byte) (plog.Logs, error) { +// UnmarshalLogs deserializes the records into logs. +func (u *NopLogsUnmarshaler) UnmarshalLogs(string, [][]byte) (plog.Logs, error) { return u.logs, u.err } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go index ce90c351cfbd..612420dd8e49 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go @@ -13,7 +13,7 @@ import ( func TestNewNopLogs(t *testing.T) { unmarshaler := NewNopLogs() - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalLogs("", nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, typeStr, unmarshaler.Type()) @@ -23,7 +23,7 @@ func TestNewWithLogs(t *testing.T) { logs := plog.NewLogs() logs.ResourceLogs().AppendEmpty() unmarshaler := NewWithLogs(logs) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalLogs("", nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, logs, got) @@ -33,7 +33,7 @@ func TestNewWithLogs(t *testing.T) { func TestNewErrLogs(t *testing.T) { wantErr := errors.New("test error") unmarshaler := NewErrLogs(wantErr) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalLogs("", nil) require.Error(t, err) require.Equal(t, wantErr, err) require.NotNil(t, got) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go index a8f5c36319e3..402736cdba94 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go @@ -27,19 +27,19 @@ func NewNopMetrics() *NopMetricsUnmarshaler { } // NewWithMetrics provides a nop metrics unmarshaler with the passed -// in metrics as the result of the Unmarshal and no error. +// in metrics as the result of the UnmarshalMetrics and no error. func NewWithMetrics(metrics pmetric.Metrics) *NopMetricsUnmarshaler { return &NopMetricsUnmarshaler{metrics: metrics} } // NewErrMetrics provides a nop metrics unmarshaler with the passed -// in error as the Unmarshal error. +// in error as the UnmarshalMetrics error. func NewErrMetrics(err error) *NopMetricsUnmarshaler { return &NopMetricsUnmarshaler{err: err} } -// Unmarshal deserializes the records into metrics. -func (u *NopMetricsUnmarshaler) Unmarshal([][]byte) (pmetric.Metrics, error) { +// UnmarshalMetrics deserializes the records into metrics. +func (u *NopMetricsUnmarshaler) UnmarshalMetrics(string, [][]byte) (pmetric.Metrics, error) { return u.metrics, u.err } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go index 572c39bc475c..ddb32ce966e2 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go @@ -13,7 +13,7 @@ import ( func TestNewNopMetrics(t *testing.T) { unmarshaler := NewNopMetrics() - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalMetrics("", nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, typeStr, unmarshaler.Type()) @@ -23,7 +23,7 @@ func TestNewWithMetrics(t *testing.T) { metrics := pmetric.NewMetrics() metrics.ResourceMetrics().AppendEmpty() unmarshaler := NewWithMetrics(metrics) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalMetrics("", nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, metrics, got) @@ -33,7 +33,7 @@ func TestNewWithMetrics(t *testing.T) { func TestNewErrMetrics(t *testing.T) { wantErr := errors.New("test error") unmarshaler := NewErrMetrics(wantErr) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalMetrics("", nil) require.Error(t, err) require.Equal(t, wantErr, err) require.NotNil(t, got) diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 862b9b2840fc..09d7c7252b51 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -68,7 +68,7 @@ func (mc *logsConsumer) Consume( records [][]byte, commonAttributes map[string]string, ) (int, error) { - md, err := mc.unmarshaler.Unmarshal(records) + md, err := mc.unmarshaler.UnmarshalLogs(contentType, records) if err != nil { return http.StatusBadRequest, err diff --git a/receiver/awsfirehosereceiver/metrics_receiver.go b/receiver/awsfirehosereceiver/metrics_receiver.go index 788316bbdd32..757160e02274 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver.go +++ b/receiver/awsfirehosereceiver/metrics_receiver.go @@ -70,7 +70,7 @@ func (mc *metricsConsumer) Consume( records [][]byte, commonAttributes map[string]string, ) (int, error) { - md, err := mc.unmarshaler.Unmarshal(records) + md, err := mc.unmarshaler.UnmarshalMetrics(contentType, records) if err != nil { return http.StatusBadRequest, err }