Skip to content

Commit

Permalink
Split unmarshall metrics and logs in auto package
Browse files Browse the repository at this point in the history
  • Loading branch information
constanca-m committed Dec 9, 2024
1 parent 0d0b685 commit de12f6c
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 93 deletions.
6 changes: 5 additions & 1 deletion receiver/awsfirehosereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,21 @@ func validateRecordType(recordType string) error {
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
cwmsu := cwmetricstream.NewUnmarshaler(logger)
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
autoUnmarshaler := auto.NewUnmarshaler(logger)
return map[string]unmarshaler.MetricsUnmarshaler{
cwmsu.Type(): cwmsu,
otlpv1msu.Type(): otlpv1msu,
auto.TypeStr: autoUnmarshaler,
}
}

// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler {
u := cwlog.NewUnmarshaler(logger)
autoUnmarshaler := auto.NewUnmarshaler(logger)
return map[string]unmarshaler.LogsUnmarshaler{
u.Type(): u,
u.Type(): u,
auto.TypeStr: autoUnmarshaler,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
var (
errInvalidRecords = errors.New("record format invalid")
errUnsupportedContentType = errors.New("content type not supported")
errInvalidFormatStart = errors.New("unable to decode data length from message")
)

// Unmarshaler for the CloudWatch Log JSON record format.
Expand Down Expand Up @@ -115,8 +116,8 @@ func (u *Unmarshaler) addCloudwatchMetric(
return nil
}

func (u *Unmarshaler) unmarshalJSON(md pmetric.Metrics, ld plog.Logs, records [][]byte) error {
resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder)
func (u *Unmarshaler) unmarshalJSONMetrics(records [][]byte) (pmetric.Metrics, error) {
md := pmetric.NewMetrics()
resourceMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder)
for i, compressed := range records {
record, err := compression.Unzip(compressed)
Expand All @@ -127,57 +128,100 @@ func (u *Unmarshaler) unmarshalJSON(md pmetric.Metrics, ld plog.Logs, records []
)
continue
}
// Multiple metrics/logs in each record separated by newline character
for j, single := range bytes.Split(record, []byte(recordDelimiter)) {
if len(single) == 0 {
continue
}

if isCloudWatchLog(single) {
if err = u.addCloudwatchLog(single, resourceLogs, ld); err != nil {
if isCloudwatchMetrics(single) {
if err = u.addCloudwatchMetric(single, resourceMetrics, md); err != nil {
u.logger.Error(
"Unable to unmarshal record to cloudwatch log",
"Unable to unmarshal input",
zap.Error(err),
zap.Int("record_index", i),
zap.Int("single_index", j),
zap.Int("record_index", i),
)
continue
}
} else {
u.logger.Error(
"Unsupported metric type",
zap.Int("record_index", i),
zap.Int("single_index", j),
)
continue
}
}
}

if isCloudwatchMetrics(single) {
if err = u.addCloudwatchMetric(single, resourceMetrics, md); err != nil {
if len(resourceMetrics) == 0 {
return md, errInvalidRecords
}
return md, nil
}

func (u *Unmarshaler) unmarshalJSONLogs(records [][]byte) (plog.Logs, error) {
ld := plog.NewLogs()
resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder)
for i, compressed := range records {
record, err := compression.Unzip(compressed)
if err != nil {
u.logger.Error("Failed to unzip record",
zap.Error(err),
zap.Int("record_index", i),
)
continue
}

for j, single := range bytes.Split(record, []byte(recordDelimiter)) {
if len(single) == 0 {
continue
}

if isCloudWatchLog(single) {
if err = u.addCloudwatchLog(single, resourceLogs, ld); err != nil {
u.logger.Error(
"Unable to unmarshal input",
"Unable to unmarshal record to cloudwatch log",
zap.Error(err),
zap.Int("single_index", j),
zap.Int("record_index", i),
zap.Int("single_index", j),
)
continue
}
} else {
u.logger.Error(
"Unsupported log type",
zap.Int("record_index", i),
zap.Int("single_index", j),
)
continue
}
}
}

if len(resourceLogs) == 0 && len(resourceMetrics) == 0 {
return errInvalidRecords
if len(resourceLogs) == 0 {
return ld, errInvalidRecords
}
return nil
return ld, nil
}

func (u *Unmarshaler) Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error) {
ld := plog.NewLogs()
md := pmetric.NewMetrics()
func (u *Unmarshaler) UnmarshalLogs(contentType string, records [][]byte) (plog.Logs, error) {
switch contentType {
case "application/json":
return md, ld, u.unmarshalJSON(md, ld, records)
case "application/x-protobuf":
// TODO implement this
return md, ld, nil
return u.unmarshalJSONLogs(records)
default:
return md, ld, errUnsupportedContentType
return plog.NewLogs(), errUnsupportedContentType
}
}

func (u *Unmarshaler) UnmarshalMetrics(contentType string, records [][]byte) (pmetric.Metrics, error) {
switch contentType {
case "application/json":
return u.unmarshalJSONMetrics(records)
//TODO case "application/x-protobuf":
default:
return pmetric.NewMetrics(), errUnsupportedContentType
}
}

func (u *Unmarshaler) Type() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,76 @@ func TestType(t *testing.T) {
require.Equal(t, TypeStr, unmarshaler.Type())
}

// Unmarshall cloudwatch metrics and logs
func TestUnmarshal_JSON(t *testing.T) {
func TestUnmarshalMetrics_JSON(t *testing.T) {
t.Parallel()

unmarshaler := NewUnmarshaler(zap.NewNop())
testCases := map[string]struct {
dir string
filename string
logResourceCount int
logRecordCount int
metricResourceCount int
metricCount int
metricDataPointCount int
err error
}{
"cwmetric:WithMultipleRecords": {
dir: "cwmetricstream",
filename: "multiple_records",
metricResourceCount: 6,
metricCount: 33,
metricDataPointCount: 127,
},
"cwmetric:WithSingleRecord": {
dir: "cwmetricstream",
filename: "single_record",
metricResourceCount: 1,
metricCount: 1,
metricDataPointCount: 1,
},
"cwmetric:WithInvalidRecords": {
dir: "cwmetricstream",
filename: "invalid_records",
err: errInvalidRecords,
},
"cwmetric:WithSomeInvalidRecords": {
dir: "cwmetricstream",
filename: "some_invalid_records",
metricResourceCount: 5,
metricCount: 35,
metricDataPointCount: 88,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename))
require.NoError(t, err)

compressedRecord, err := compression.Zip(record)
require.NoError(t, err)
records := [][]byte{compressedRecord}

metrics, err := unmarshaler.UnmarshalMetrics("application/json", records)
require.Equal(t, testCase.err, err)

require.Equal(t, testCase.metricResourceCount, metrics.ResourceMetrics().Len())
require.Equal(t, testCase.metricDataPointCount, metrics.DataPointCount())
require.Equal(t, testCase.metricCount, metrics.MetricCount())
})
}
}

// Unmarshall cloudwatch metrics and logs
func TestUnmarshalLogs_JSON(t *testing.T) {
t.Parallel()

unmarshaler := NewUnmarshaler(zap.NewNop())
testCases := map[string]struct {
dir string
filename string
logResourceCount int
logRecordCount int
err error
}{
"cwlog:WithMultipleRecords": {
dir: "cwlog",
Expand Down Expand Up @@ -60,32 +116,6 @@ func TestUnmarshal_JSON(t *testing.T) {
logResourceCount: 3,
logRecordCount: 6,
},
"cwmetric:WithMultipleRecords": {
dir: "cwmetricstream",
filename: "multiple_records",
metricResourceCount: 6,
metricCount: 33,
metricDataPointCount: 127,
},
"cwmetric:WithSingleRecord": {
dir: "cwmetricstream",
filename: "single_record",
metricResourceCount: 1,
metricCount: 1,
metricDataPointCount: 1,
},
"cwmetric:WithInvalidRecords": {
dir: "cwmetricstream",
filename: "invalid_records",
err: errInvalidRecords,
},
"cwmetric:WithSomeInvalidRecords": {
dir: "cwmetricstream",
filename: "some_invalid_records",
metricResourceCount: 5,
metricCount: 35,
metricDataPointCount: 88,
},
}

for name, testCase := range testCases {
Expand All @@ -97,16 +127,11 @@ func TestUnmarshal_JSON(t *testing.T) {
require.NoError(t, err)
records := [][]byte{compressedRecord}

metrics, logs, err := unmarshaler.Unmarshal("application/json", records)
logs, err := unmarshaler.UnmarshalLogs("application/json", records)
require.Equal(t, testCase.err, err)

require.Equal(t, testCase.logResourceCount, logs.ResourceLogs().Len())
require.Equal(t, testCase.logRecordCount, logs.LogRecordCount())

require.Equal(t, testCase.metricResourceCount, metrics.ResourceMetrics().Len())
require.Equal(t, testCase.metricDataPointCount, metrics.DataPointCount())
require.Equal(t, testCase.metricCount, metrics.MetricCount())
})
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
return &Unmarshaler{logger}
}

// Unmarshal deserializes the records into cWLogs and uses the
// UnmarshalLogs deserializes the records into CWLog and uses the
// ResourceLogsBuilder to group them into a single plog.Logs.
// Skips invalid cWLogs received in the record.
func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) {
// Skips invalid CWLog received in the record.
func (u Unmarshaler) UnmarshalLogs(_ string, records [][]byte) (plog.Logs, error) {
md := plog.NewLogs()
builders := make(map[ResourceAttributes]*ResourceLogsBuilder)
for recordIndex, compressedRecord := range records {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestUnmarshal(t *testing.T) {
require.NoError(t, err)
records := [][]byte{compressedRecord}

got, err := unmarshaler.Unmarshal(records)
got, err := unmarshaler.UnmarshalLogs("", records)
require.Equal(t, testCase.wantErr, err)
require.NotNil(t, got)
require.Equal(t, testCase.wantResourceCount, got.ResourceLogs().Len())
Expand All @@ -79,7 +79,7 @@ func TestLogTimestamp(t *testing.T) {
require.NoError(t, err)
records := [][]byte{compressedRecord}

got, err := unmarshaler.Unmarshal(records)
got, err := unmarshaler.UnmarshalLogs("", records)
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, 1, got.ResourceLogs().Len())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
return &Unmarshaler{logger}
}

// Unmarshal deserializes the records into cWMetrics and uses the
// UnmarshalMetrics deserializes the records into CWMetric and uses the
// ResourceMetricsBuilder to group them into a single pmetric.Metrics.
// Skips invalid cWMetrics received in the record and
func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) {
// Skips invalid CWMetric received in the record.
func (u Unmarshaler) UnmarshalMetrics(_ string, records [][]byte) (pmetric.Metrics, error) {
md := pmetric.NewMetrics()
builders := make(map[ResourceAttributes]*ResourceMetricsBuilder)
for recordIndex, record := range records {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestUnmarshal(t *testing.T) {

records := [][]byte{record}

got, err := unmarshaler.Unmarshal(records)
got, err := unmarshaler.UnmarshalMetrics("", records)
require.Equal(t, testCase.wantErr, err)
require.NotNil(t, got)
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
return &Unmarshaler{logger}
}

// Unmarshal deserializes the records into pmetric.Metrics
func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) {
// UnmarshalMetrics deserializes the records into pmetric.Metrics
func (u Unmarshaler) UnmarshalMetrics(_ string, records [][]byte) (pmetric.Metrics, error) {
md := pmetric.NewMetrics()
for recordIndex, record := range records {
dataLen, pos := len(record), 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestUnmarshal(t *testing.T) {
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
got, err := unmarshaler.Unmarshal(testCase.records)
got, err := unmarshaler.UnmarshalMetrics("", testCase.records)
require.NoError(t, err)
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
require.Equal(t, testCase.wantMetricCount, got.MetricCount())
Expand Down
Loading

0 comments on commit de12f6c

Please sign in to comment.