Skip to content

Commit

Permalink
Improve code readability; add content type
Browse files Browse the repository at this point in the history
  • Loading branch information
constanca-m committed Dec 9, 2024
1 parent 6a2fb81 commit 0d0b685
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 94 deletions.
107 changes: 71 additions & 36 deletions receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ const (
recordDelimiter = "\n"
)

var errInvalidRecords = errors.New("record format invalid")
var (
errInvalidRecords = errors.New("record format invalid")
errUnsupportedContentType = errors.New("content type not supported")
)

// Unmarshaler for the CloudWatch Log JSON record format.
type Unmarshaler struct {
Expand Down Expand Up @@ -65,13 +68,56 @@ func isCloudwatchMetrics(data []byte) bool {
return true
}

func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, error) {
ld := plog.NewLogs()
resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder)
func (u *Unmarshaler) addCloudwatchLog(
record []byte,
resourceLogs map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder,
ld plog.Logs,
) error {
var log cwlog.CWLog
if err := json.Unmarshal(record, &log); err != nil {
return err
}
attrs := cwlog.ResourceAttributes{
Owner: log.Owner,
LogGroup: log.LogGroup,
LogStream: log.LogStream,
}
lb, exists := resourceLogs[attrs]
if !exists {
lb = cwlog.NewResourceLogsBuilder(ld, attrs)
resourceLogs[attrs] = lb
}
lb.AddLog(log)
return nil
}

md := pmetric.NewMetrics()
resourceMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder)
func (u *Unmarshaler) addCloudwatchMetric(
record []byte,
resourceMetrics map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder,
md pmetric.Metrics,
) error {
var metric cwmetricstream.CWMetric
if err := json.Unmarshal(record, &metric); err != nil {
return err
}
attrs := cwmetricstream.ResourceAttributes{
MetricStreamName: metric.MetricStreamName,
Namespace: metric.Namespace,
AccountID: metric.AccountID,
Region: metric.Region,
}
mb, exists := resourceMetrics[attrs]
if !exists {
mb = cwmetricstream.NewResourceMetricsBuilder(md, attrs)
resourceMetrics[attrs] = mb
}
mb.AddMetric(metric)
return nil
}

func (u *Unmarshaler) unmarshalJSON(md pmetric.Metrics, ld plog.Logs, records [][]byte) error {
resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder)
resourceMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder)
for i, compressed := range records {
record, err := compression.Unzip(compressed)
if err != nil {
Expand All @@ -88,8 +134,7 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, er
}

if isCloudWatchLog(single) {
var log cwlog.CWLog
if err = json.Unmarshal(single, &log); err != nil {
if err = u.addCloudwatchLog(single, resourceLogs, ld); err != nil {
u.logger.Error(
"Unable to unmarshal record to cloudwatch log",
zap.Error(err),
Expand All @@ -98,22 +143,10 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, er
)
continue
}
attrs := cwlog.ResourceAttributes{
Owner: log.Owner,
LogGroup: log.LogGroup,
LogStream: log.LogStream,
}
lb, exists := resourceLogs[attrs]
if !exists {
lb = cwlog.NewResourceLogsBuilder(ld, attrs)
resourceLogs[attrs] = lb
}
lb.AddLog(log)
}

if isCloudwatchMetrics(single) {
var metric cwmetricstream.CWMetric
if err = json.Unmarshal(single, &metric); err != nil {
if err = u.addCloudwatchMetric(single, resourceMetrics, md); err != nil {
u.logger.Error(
"Unable to unmarshal input",
zap.Error(err),
Expand All @@ -122,29 +155,31 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, er
)
continue
}
attrs := cwmetricstream.ResourceAttributes{
MetricStreamName: metric.MetricStreamName,
Namespace: metric.Namespace,
AccountID: metric.AccountID,
Region: metric.Region,
}
mb, exists := resourceMetrics[attrs]
if !exists {
mb = cwmetricstream.NewResourceMetricsBuilder(md, attrs)
resourceMetrics[attrs] = mb
}
mb.AddMetric(metric)
}
}
}

if len(resourceLogs) == 0 && len(resourceMetrics) == 0 {
return md, ld, errInvalidRecords
return errInvalidRecords
}
return nil
}

func (u *Unmarshaler) Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error) {
ld := plog.NewLogs()
md := pmetric.NewMetrics()
switch contentType {
case "application/json":
return md, ld, u.unmarshalJSON(md, ld, records)
case "application/x-protobuf":
// TODO implement this
return md, ld, nil
default:
return md, ld, errUnsupportedContentType
}

return md, ld, nil
}

func (u Unmarshaler) Type() string {
func (u *Unmarshaler) Type() string {
return TypeStr
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ func TestType(t *testing.T) {
}

// Unmarshall cloudwatch metrics and logs
func TestUnmarshal_CW(t *testing.T) {
unmarshaler := NewUnmarshaler(zap.NewNop())
func TestUnmarshal_JSON(t *testing.T) {
t.Parallel()

unmarshaler := NewUnmarshaler(zap.NewNop())
testCases := map[string]struct {
dir string
filename string
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestUnmarshal_CW(t *testing.T) {
require.NoError(t, err)
records := [][]byte{compressedRecord}

metrics, logs, err := unmarshaler.Unmarshal(records)
metrics, logs, err := unmarshaler.Unmarshal("application/json", records)
require.Equal(t, testCase.err, err)

require.Equal(t, testCase.logResourceCount, logs.ResourceLogs().Len())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,11 @@ func TestUnmarshal(t *testing.T) {
records := [][]byte{record}

got, err := unmarshaler.Unmarshal(records)
if testCase.wantErr != nil {
require.Error(t, err)
require.Equal(t, testCase.wantErr, err)
} else {
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
gotMetricCount := 0
gotDatapointCount := 0
for i := 0; i < got.ResourceMetrics().Len(); i++ {
rm := got.ResourceMetrics().At(i)
require.Equal(t, 1, rm.ScopeMetrics().Len())
ilm := rm.ScopeMetrics().At(0)
gotMetricCount += ilm.Metrics().Len()
for j := 0; j < ilm.Metrics().Len(); j++ {
metric := ilm.Metrics().At(j)
gotDatapointCount += metric.Summary().DataPoints().Len()
}
}
require.Equal(t, testCase.wantMetricCount, gotMetricCount)
require.Equal(t, testCase.wantDatapointCount, gotDatapointCount)
}
require.Equal(t, testCase.wantErr, err)
require.NotNil(t, got)
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
require.Equal(t, testCase.wantMetricCount, got.MetricCount())
require.Equal(t, testCase.wantDatapointCount, got.DataPointCount())
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type LogsUnmarshaler interface {
// Unmarshaler deserializes the message body
type Unmarshaler interface {
// Unmarshal deserializes the records into metrics or logs.
Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, error)
Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error)

// Type of the serialized messages.
Type() string
Expand Down
8 changes: 7 additions & 1 deletion receiver/awsfirehosereceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,14 @@ func newLogsReceiver(
// Consume uses the configured unmarshaler to deserialize the records into a
// single plog.Logs. It will send the final result
// to the next consumer.
func (mc *logsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) {
func (mc *logsConsumer) Consume(
ctx context.Context,
contentType string,
records [][]byte,
commonAttributes map[string]string,
) (int, error) {
md, err := mc.unmarshaler.Unmarshal(records)

if err != nil {
return http.StatusBadRequest, err
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/awsfirehosereceiver/logs_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestLogsConsumer(t *testing.T) {
unmarshaler: unmarshalertest.NewErrLogs(testCase.unmarshalerErr),
consumer: consumertest.NewErr(testCase.consumerErr),
}
gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil)
gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, nil)
require.Equal(t, testCase.wantStatus, gotStatus)
require.Equal(t, testCase.wantErr, gotErr)
})
Expand All @@ -114,7 +114,7 @@ func TestLogsConsumer(t *testing.T) {
unmarshaler: unmarshalertest.NewWithLogs(base),
consumer: &rc,
}
gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{
gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, map[string]string{
"CommonAttributes": "Test",
})
require.Equal(t, http.StatusOK, gotStatus)
Expand Down
7 changes: 6 additions & 1 deletion receiver/awsfirehosereceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ func newMetricsReceiver(
// single pmetric.Metrics. If there are common attributes available, then it will
// attach those to each of the pcommon.Resources. It will send the final result
// to the next consumer.
func (mc *metricsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) {
func (mc *metricsConsumer) Consume(
ctx context.Context,
contentType string,
records [][]byte,
commonAttributes map[string]string,
) (int, error) {
md, err := mc.unmarshaler.Unmarshal(records)
if err != nil {
return http.StatusBadRequest, err
Expand Down
4 changes: 2 additions & 2 deletions receiver/awsfirehosereceiver/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMetricsConsumer(t *testing.T) {
unmarshaler: unmarshalertest.NewErrMetrics(testCase.unmarshalerErr),
consumer: consumertest.NewErr(testCase.consumerErr),
}
gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil)
gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, nil)
require.Equal(t, testCase.wantStatus, gotStatus)
require.Equal(t, testCase.wantErr, gotErr)
})
Expand All @@ -115,7 +115,7 @@ func TestMetricsConsumer(t *testing.T) {
unmarshaler: unmarshalertest.NewWithMetrics(base),
consumer: &rc,
}
gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{
gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, map[string]string{
"CommonAttributes": "Test",
})
require.Equal(t, http.StatusOK, gotStatus)
Expand Down
Loading

0 comments on commit 0d0b685

Please sign in to comment.