diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go index 9c4f1f1344fb..efe3cc519c74 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go @@ -20,7 +20,10 @@ const ( recordDelimiter = "\n" ) -var errInvalidRecords = errors.New("record format invalid") +var ( + errInvalidRecords = errors.New("record format invalid") + errUnsupportedContentType = errors.New("content type not supported") +) // Unmarshaler for the CloudWatch Log JSON record format. type Unmarshaler struct { @@ -65,13 +68,56 @@ func isCloudwatchMetrics(data []byte) bool { return true } -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, error) { - ld := plog.NewLogs() - resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder) +func (u *Unmarshaler) addCloudwatchLog( + record []byte, + resourceLogs map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder, + ld plog.Logs, +) error { + var log cwlog.CWLog + if err := json.Unmarshal(record, &log); err != nil { + return err + } + attrs := cwlog.ResourceAttributes{ + Owner: log.Owner, + LogGroup: log.LogGroup, + LogStream: log.LogStream, + } + lb, exists := resourceLogs[attrs] + if !exists { + lb = cwlog.NewResourceLogsBuilder(ld, attrs) + resourceLogs[attrs] = lb + } + lb.AddLog(log) + return nil +} - md := pmetric.NewMetrics() - resourceMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder) +func (u *Unmarshaler) addCloudwatchMetric( + record []byte, + resourceMetrics map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder, + md pmetric.Metrics, +) error { + var metric cwmetricstream.CWMetric + if err := json.Unmarshal(record, &metric); err != nil { + return err + } + attrs := cwmetricstream.ResourceAttributes{ + MetricStreamName: metric.MetricStreamName, + Namespace: metric.Namespace, + AccountID: metric.AccountID, + Region: metric.Region, + } + mb, exists := resourceMetrics[attrs] + if !exists { + mb = cwmetricstream.NewResourceMetricsBuilder(md, attrs) + resourceMetrics[attrs] = mb + } + mb.AddMetric(metric) + return nil +} +func (u *Unmarshaler) unmarshalJSON(md pmetric.Metrics, ld plog.Logs, records [][]byte) error { + resourceLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder) + resourceMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder) for i, compressed := range records { record, err := compression.Unzip(compressed) if err != nil { @@ -88,8 +134,7 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, er } if isCloudWatchLog(single) { - var log cwlog.CWLog - if err = json.Unmarshal(single, &log); err != nil { + if err = u.addCloudwatchLog(single, resourceLogs, ld); err != nil { u.logger.Error( "Unable to unmarshal record to cloudwatch log", zap.Error(err), @@ -98,22 +143,10 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, er ) continue } - attrs := cwlog.ResourceAttributes{ - Owner: log.Owner, - LogGroup: log.LogGroup, - LogStream: log.LogStream, - } - lb, exists := resourceLogs[attrs] - if !exists { - lb = cwlog.NewResourceLogsBuilder(ld, attrs) - resourceLogs[attrs] = lb - } - lb.AddLog(log) } if isCloudwatchMetrics(single) { - var metric cwmetricstream.CWMetric - if err = json.Unmarshal(single, &metric); err != nil { + if err = u.addCloudwatchMetric(single, resourceMetrics, md); err != nil { u.logger.Error( "Unable to unmarshal input", zap.Error(err), @@ -122,29 +155,31 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, er ) continue } - attrs := cwmetricstream.ResourceAttributes{ - MetricStreamName: metric.MetricStreamName, - Namespace: metric.Namespace, - AccountID: metric.AccountID, - Region: metric.Region, - } - mb, exists := resourceMetrics[attrs] - if !exists { - mb = cwmetricstream.NewResourceMetricsBuilder(md, attrs) - resourceMetrics[attrs] = mb - } - mb.AddMetric(metric) } } } if len(resourceLogs) == 0 && len(resourceMetrics) == 0 { - return md, ld, errInvalidRecords + return errInvalidRecords + } + return nil +} + +func (u *Unmarshaler) Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error) { + ld := plog.NewLogs() + md := pmetric.NewMetrics() + switch contentType { + case "application/json": + return md, ld, u.unmarshalJSON(md, ld, records) + case "application/x-protobuf": + // TODO implement this + return md, ld, nil + default: + return md, ld, errUnsupportedContentType } - return md, ld, nil } -func (u Unmarshaler) Type() string { +func (u *Unmarshaler) Type() string { return TypeStr } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go index ffc36c01293e..70fb84cb2bcf 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go @@ -17,9 +17,10 @@ func TestType(t *testing.T) { } // Unmarshall cloudwatch metrics and logs -func TestUnmarshal_CW(t *testing.T) { - unmarshaler := NewUnmarshaler(zap.NewNop()) +func TestUnmarshal_JSON(t *testing.T) { + t.Parallel() + unmarshaler := NewUnmarshaler(zap.NewNop()) testCases := map[string]struct { dir string filename string @@ -96,7 +97,7 @@ func TestUnmarshal_CW(t *testing.T) { require.NoError(t, err) records := [][]byte{compressedRecord} - metrics, logs, err := unmarshaler.Unmarshal(records) + metrics, logs, err := unmarshaler.Unmarshal("application/json", records) require.Equal(t, testCase.err, err) require.Equal(t, testCase.logResourceCount, logs.ResourceLogs().Len()) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go index 16a517db1756..e3aab6551abe 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go @@ -57,28 +57,11 @@ func TestUnmarshal(t *testing.T) { records := [][]byte{record} got, err := unmarshaler.Unmarshal(records) - if testCase.wantErr != nil { - require.Error(t, err) - require.Equal(t, testCase.wantErr, err) - } else { - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) - gotMetricCount := 0 - gotDatapointCount := 0 - for i := 0; i < got.ResourceMetrics().Len(); i++ { - rm := got.ResourceMetrics().At(i) - require.Equal(t, 1, rm.ScopeMetrics().Len()) - ilm := rm.ScopeMetrics().At(0) - gotMetricCount += ilm.Metrics().Len() - for j := 0; j < ilm.Metrics().Len(); j++ { - metric := ilm.Metrics().At(j) - gotDatapointCount += metric.Summary().DataPoints().Len() - } - } - require.Equal(t, testCase.wantMetricCount, gotMetricCount) - require.Equal(t, testCase.wantDatapointCount, gotDatapointCount) - } + require.Equal(t, testCase.wantErr, err) + require.NotNil(t, got) + require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) + require.Equal(t, testCase.wantMetricCount, got.MetricCount()) + require.Equal(t, testCase.wantDatapointCount, got.DataPointCount()) }) } } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go index a9b55c996f8d..37a57503daf2 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go @@ -29,7 +29,7 @@ type LogsUnmarshaler interface { // Unmarshaler deserializes the message body type Unmarshaler interface { // Unmarshal deserializes the records into metrics or logs. - Unmarshal(records [][]byte) (pmetric.Metrics, plog.Logs, error) + Unmarshal(contentType string, records [][]byte) (pmetric.Metrics, plog.Logs, error) // Type of the serialized messages. Type() string diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 570e6cf1e745..862b9b2840fc 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -62,8 +62,14 @@ func newLogsReceiver( // Consume uses the configured unmarshaler to deserialize the records into a // single plog.Logs. It will send the final result // to the next consumer. -func (mc *logsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { +func (mc *logsConsumer) Consume( + ctx context.Context, + contentType string, + records [][]byte, + commonAttributes map[string]string, +) (int, error) { md, err := mc.unmarshaler.Unmarshal(records) + if err != nil { return http.StatusBadRequest, err } diff --git a/receiver/awsfirehosereceiver/logs_receiver_test.go b/receiver/awsfirehosereceiver/logs_receiver_test.go index 6739f8137929..8d75b37f0141 100644 --- a/receiver/awsfirehosereceiver/logs_receiver_test.go +++ b/receiver/awsfirehosereceiver/logs_receiver_test.go @@ -100,7 +100,7 @@ func TestLogsConsumer(t *testing.T) { unmarshaler: unmarshalertest.NewErrLogs(testCase.unmarshalerErr), consumer: consumertest.NewErr(testCase.consumerErr), } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) + gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, nil) require.Equal(t, testCase.wantStatus, gotStatus) require.Equal(t, testCase.wantErr, gotErr) }) @@ -114,7 +114,7 @@ func TestLogsConsumer(t *testing.T) { unmarshaler: unmarshalertest.NewWithLogs(base), consumer: &rc, } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ + gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, map[string]string{ "CommonAttributes": "Test", }) require.Equal(t, http.StatusOK, gotStatus) diff --git a/receiver/awsfirehosereceiver/metrics_receiver.go b/receiver/awsfirehosereceiver/metrics_receiver.go index 4a5128583ac0..788316bbdd32 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver.go +++ b/receiver/awsfirehosereceiver/metrics_receiver.go @@ -64,7 +64,12 @@ func newMetricsReceiver( // single pmetric.Metrics. If there are common attributes available, then it will // attach those to each of the pcommon.Resources. It will send the final result // to the next consumer. -func (mc *metricsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { +func (mc *metricsConsumer) Consume( + ctx context.Context, + contentType string, + records [][]byte, + commonAttributes map[string]string, +) (int, error) { md, err := mc.unmarshaler.Unmarshal(records) if err != nil { return http.StatusBadRequest, err diff --git a/receiver/awsfirehosereceiver/metrics_receiver_test.go b/receiver/awsfirehosereceiver/metrics_receiver_test.go index d32ec4efc8a5..46fd27b6e09b 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver_test.go +++ b/receiver/awsfirehosereceiver/metrics_receiver_test.go @@ -101,7 +101,7 @@ func TestMetricsConsumer(t *testing.T) { unmarshaler: unmarshalertest.NewErrMetrics(testCase.unmarshalerErr), consumer: consumertest.NewErr(testCase.consumerErr), } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) + gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, nil) require.Equal(t, testCase.wantStatus, gotStatus) require.Equal(t, testCase.wantErr, gotErr) }) @@ -115,7 +115,7 @@ func TestMetricsConsumer(t *testing.T) { unmarshaler: unmarshalertest.NewWithMetrics(base), consumer: &rc, } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ + gotStatus, gotErr := mc.Consume(context.TODO(), "", nil, map[string]string{ "CommonAttributes": "Test", }) require.Equal(t, http.StatusOK, gotStatus) diff --git a/receiver/awsfirehosereceiver/receiver.go b/receiver/awsfirehosereceiver/receiver.go index baa9750b6162..b79193e7f2bb 100644 --- a/receiver/awsfirehosereceiver/receiver.go +++ b/receiver/awsfirehosereceiver/receiver.go @@ -12,7 +12,6 @@ import ( "io" "net" "net/http" - "strconv" "sync" "time" @@ -41,7 +40,7 @@ var ( // The firehoseConsumer is responsible for using the unmarshaler and the consumer. type firehoseConsumer interface { // Consume unmarshalls and consumes the records. - Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) + Consume(ctx context.Context, contentType string, records [][]byte, commonAttributes map[string]string) (int, error) } // firehoseReceiver @@ -154,6 +153,7 @@ func (fmr *firehoseReceiver) Shutdown(context.Context) error { func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + // extract and validate request id requestID := r.Header.Get(headerFirehoseRequestID) if requestID == "" { fmr.settings.Logger.Error( @@ -165,6 +165,7 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { } fmr.settings.Logger.Debug("Processing Firehose request", zap.String("RequestID", requestID)) + // validate access key if statusCode, err := fmr.validate(r); err != nil { fmr.settings.Logger.Error( "Invalid Firehose request", @@ -174,18 +175,21 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // parse the body body, err := fmr.getBody(r) if err != nil { fmr.sendResponse(w, requestID, http.StatusBadRequest, err) return } + // unmarshall request var fr firehoseRequest if err = json.Unmarshal(body, &fr); err != nil { fmr.sendResponse(w, requestID, http.StatusBadRequest, err) return } + // validate request id if fr.RequestID == "" { fmr.sendResponse(w, requestID, http.StatusBadRequest, errInBodyMissingRequestID) return @@ -194,24 +198,14 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - records := make([][]byte, 0, len(fr.Records)) - for index, record := range fr.Records { - if record.Data != "" { - var decoded []byte - decoded, err = base64.StdEncoding.DecodeString(record.Data) - if err != nil { - fmr.sendResponse( - w, - requestID, - http.StatusBadRequest, - fmt.Errorf("unable to base64 decode the record at index %d: %w", index, err), - ) - return - } - records = append(records, decoded) - } + // decode records + records, err := fmr.decodeRecords(fr.Records) + if err != nil { + fmr.sendResponse(w, requestID, http.StatusBadRequest, err) + return } + // extract common attributes commonAttributes, err := fmr.getCommonAttributes(r) if err != nil { fmr.settings.Logger.Error( @@ -220,7 +214,9 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) } - statusCode, err := fmr.consumer.Consume(ctx, records, commonAttributes) + // consume telemetry + contentType := r.Header.Get(headerContentType) + statusCode, err := fmr.consumer.Consume(ctx, contentType, records, commonAttributes) if err != nil { fmr.settings.Logger.Error( "Unable to consume records", @@ -233,6 +229,21 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmr.sendResponse(w, requestID, http.StatusOK, nil) } +func (fmr *firehoseReceiver) decodeRecords(records []firehoseRecord) ([][]byte, error) { + decodedRecords := make([][]byte, 0, len(records)) + for index, record := range records { + if record.Data == "" { + continue + } + decoded, err := base64.StdEncoding.DecodeString(record.Data) + if err != nil { + return nil, fmt.Errorf("unable to base64 decode the record at index %d: %w", index, err) + } + decodedRecords = append(decodedRecords, decoded) + } + return decodedRecords, nil +} + // validate checks the Firehose access key in the header against // the one passed into the Config func (fmr *firehoseReceiver) validate(r *http.Request) (int, error) { @@ -252,11 +263,7 @@ func (fmr *firehoseReceiver) getBody(r *http.Request) ([]byte, error) { if err != nil { return nil, err } - err = r.Body.Close() - if err != nil { - return nil, err - } - return body, nil + return body, r.Body.Close() } // getCommonAttributes unmarshalls the common attributes from the request header @@ -285,7 +292,6 @@ func (fmr *firehoseReceiver) sendResponse(w http.ResponseWriter, requestID strin } payload, _ := json.Marshal(body) w.Header().Set(headerContentType, "application/json") - w.Header().Set(headerContentLength, strconv.Itoa(len(payload))) w.WriteHeader(statusCode) if _, err = w.Write(payload); err != nil { fmr.settings.Logger.Error("Failed to send response", zap.Error(err)) diff --git a/receiver/awsfirehosereceiver/receiver_test.go b/receiver/awsfirehosereceiver/receiver_test.go index 2a70846462f5..8ab3a826ee46 100644 --- a/receiver/awsfirehosereceiver/receiver_test.go +++ b/receiver/awsfirehosereceiver/receiver_test.go @@ -40,7 +40,7 @@ func newNopFirehoseConsumer(statusCode int, err error) *nopFirehoseConsumer { return &nopFirehoseConsumer{statusCode, err} } -func (nfc *nopFirehoseConsumer) Consume(context.Context, [][]byte, map[string]string) (int, error) { +func (nfc *nopFirehoseConsumer) Consume(context.Context, string, [][]byte, map[string]string) (int, error) { return nfc.statusCode, nfc.err }