Skip to content

Commit

Permalink
move encoder into internal package
Browse files Browse the repository at this point in the history
  • Loading branch information
mterhar committed Dec 19, 2024
1 parent d53a881 commit 43cda73
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 135 deletions.
125 changes: 0 additions & 125 deletions receiver/libhoneyreceiver/encoder.go

This file was deleted.

125 changes: 125 additions & 0 deletions receiver/libhoneyreceiver/encoder/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package encoder // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder"

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
spb "google.golang.org/genproto/googleapis/rpc/status"
)

const (
PbContentType = "application/x-protobuf"
JsonContentType = "application/json"
MsgpackContentType = "application/x-msgpack"
)

var (
JsEncoder = &JsonEncoder{}
JsonPbMarshaler = &jsonpb.Marshaler{}
MpEncoder = &msgpackEncoder{}
)

type Encoder interface {
UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error)
UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error)
UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error)

MarshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error)
MarshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error)
MarshalLogsResponse(plogotlp.ExportResponse) ([]byte, error)

MarshalStatus(rsp *spb.Status) ([]byte, error)

ContentType() string
}

type JsonEncoder struct{}

func (JsonEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) {
req := ptraceotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (JsonEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (JsonEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) {
req := plogotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (JsonEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (JsonEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (JsonEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (JsonEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) {
buf := new(bytes.Buffer)
err := JsonPbMarshaler.Marshal(buf, resp)
return buf.Bytes(), err
}

func (JsonEncoder) ContentType() string {
return JsonContentType
}

// messagepack responses seem to work in JSON so leaving this alone for now.
type msgpackEncoder struct{}

func (msgpackEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) {
req := ptraceotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (msgpackEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (msgpackEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) {
req := plogotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (msgpackEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (msgpackEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (msgpackEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (msgpackEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) {
buf := new(bytes.Buffer)
err := JsonPbMarshaler.Marshal(buf, resp)
return buf.Bytes(), err
}

func (msgpackEncoder) ContentType() string {
return MsgpackContentType
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ type AttributesConfig struct {
DurationFields []string `mapstructure:"durationFields"`
}

// LibhoneyEvent is the event structure from libhoney
type LibhoneyEvent struct {
Samplerate int `json:"samplerate" msgpack:"samplerate"`
MsgPackTimestamp *time.Time `msgpack:"time"`
Time string `json:"time"` // should not be trusted. use MsgPackTimestamp
Data map[string]any `json:"data" msgpack:"data"`
}

// Overrides unmarshall to make sure the MsgPackTimestamp is set
// UnmarshalJSON overrides the unmarshall to make sure the MsgPackTimestamp is set
func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error {
type _libhoneyEvent LibhoneyEvent
tstr := eventtime.GetEventTimeDefaultString()
Expand All @@ -80,23 +81,26 @@ func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error {
return nil
}

// DebugString returns a string representation of the LibhoneyEvent
func (l *LibhoneyEvent) DebugString() string {
return fmt.Sprintf("%#v", l)
}

// returns log until we add the trace parser
// SignalType returns the type of signal this event represents. Only log is implemented for now.
func (l *LibhoneyEvent) SignalType() (string, error) {
return "log", nil
}

// GetService returns the service name from the event or the dataset name if no service name is found.
func (l *LibhoneyEvent) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) {
if serviceName, ok := l.Data[fields.Resources.ServiceName]; ok {
seen.NameCount[serviceName.(string)] += 1
seen.NameCount[serviceName.(string)]++
return serviceName.(string), nil
}
return dataset, errors.New("no service.name found in event")
}

// GetScope returns the scope key for the event. If the scope has not been seen before, it creates a new one.
func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) {
if scopeLibraryName, ok := l.Data[fields.Scopes.LibraryName]; ok {
scopeKey := serviceName + scopeLibraryName.(string)
Expand All @@ -122,6 +126,7 @@ func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serv
return "libhoney.receiver", errors.New("library name not found")
}

// SimpleScope is a simple struct to hold the scope data
type SimpleScope struct {
ServiceName string
LibraryName string
Expand Down
15 changes: 8 additions & 7 deletions receiver/libhoneyreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser"
)
Expand Down Expand Up @@ -216,7 +217,7 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque
r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time))
r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString()))
}
case jsonContentType:
case encoder.JsonContentType:
err = json.Unmarshal(body, &libhoneyevents)
if err != nil {
errorutil.HTTPError(resp, err)
Expand All @@ -241,20 +242,20 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque
}

noErrors := []byte(`{"errors":[]}`)
writeResponse(resp, enc.contentType(), http.StatusAccepted, noErrors)
writeResponse(resp, enc.ContentType(), http.StatusAccepted, noErrors)
}

func readContentType(resp http.ResponseWriter, req *http.Request) (encoder, bool) {
func readContentType(resp http.ResponseWriter, req *http.Request) (encoder.Encoder, bool) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return nil, false
}

switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) {
case jsonContentType:
return jsEncoder, true
case encoder.JsonContentType:
return encoder.JsEncoder, true
case "application/x-msgpack", "application/msgpack":
return mpEncoder, true
return encoder.MpEncoder, true
default:
handleUnmatchedContentType(resp)
return nil, false
Expand Down Expand Up @@ -282,5 +283,5 @@ func handleUnmatchedMethod(resp http.ResponseWriter) {

func handleUnmatchedContentType(resp http.ResponseWriter) {
status := http.StatusUnsupportedMediaType
writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType)))
writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, encoder.JsonContentType, encoder.PbContentType)))
}

0 comments on commit 43cda73

Please sign in to comment.