diff --git a/exporter/awsxrayexporter/awsxray.go b/exporter/awsxrayexporter/awsxray.go index 78ba0906ebb3..07ec1230bde6 100644 --- a/exporter/awsxrayexporter/awsxray.go +++ b/exporter/awsxrayexporter/awsxray.go @@ -5,7 +5,9 @@ package awsxrayexporter // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "encoding/base64" "errors" + "fmt" "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws/awserr" @@ -15,6 +17,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter/internal/translator" @@ -24,7 +27,10 @@ import ( ) const ( - maxSegmentsPerPut = int(50) // limit imposed by PutTraceSegments API + maxSegmentsPerPut = int(50) // limit imposed by PutTraceSegments API + otlpFormatPrefix = "T1S" // X-Ray PutTraceSegment API uses this prefix to detect the format + otlpFormatKeyIndexAllAttributes = "aws.xray.exporter.config.index_all_attributes" + otlpFormatKeyIndexAttributes = "aws.xray.exporter.config.indexed_attributes" ) // newTracesExporter creates an exporter.Traces that converts to an X-Ray PutTraceSegments @@ -57,7 +63,15 @@ func newTracesExporter( var err error logger.Debug("TracesExporter", typeLog, nameLog, zap.Int("#spans", td.SpanCount())) - documents := extractResourceSpans(cfg, logger, td) + var documents []*string + if cfg.TransitSpansInOtlpFormat { + documents, err = encodeOtlpAsBase64(td, cfg) + if err != nil { + return err + } + } else { // by default use xray format + documents = extractResourceSpans(cfg, logger, td) + } for offset := 0; offset < len(documents); offset += maxSegmentsPerPut { var nextOffset int @@ -137,3 +151,37 @@ func wrapErrorIfBadRequest(err error) error { } return err } + +// encodeOtlpAsBase64 builds bytes from traces and generate base64 value for them +func encodeOtlpAsBase64(td ptrace.Traces, cfg *Config) ([]*string, error) { + var documents []*string + for i := 0; i < td.ResourceSpans().Len(); i++ { + // 1. build a new trace with one resource span + singleTrace := ptrace.NewTraces() + td.ResourceSpans().At(i).CopyTo(singleTrace.ResourceSpans().AppendEmpty()) + + // 2. append index configuration to resource span as attributes, such that X-Ray Service build indexes based on them. + injectIndexConfigIntoOtlpPayload(singleTrace.ResourceSpans().At(0), cfg) + + // 3. Marshal single trace into proto bytes + bytes, err := ptraceotlp.NewExportRequestFromTraces(singleTrace).MarshalProto() + if err != nil { + return nil, fmt.Errorf("failed to marshal traces: %w", err) + } + + // 4. build bytes into base64 and append with PROTOCOL HEADER at the beginning + base64Str := otlpFormatPrefix + base64.StdEncoding.EncodeToString(bytes) + documents = append(documents, &base64Str) + } + + return documents, nil +} + +func injectIndexConfigIntoOtlpPayload(resourceSpan ptrace.ResourceSpans, cfg *Config) { + attributes := resourceSpan.Resource().Attributes() + attributes.PutBool(otlpFormatKeyIndexAllAttributes, cfg.IndexAllAttributes) + indexAttributes := attributes.PutEmptySlice(otlpFormatKeyIndexAttributes) + for _, indexAttribute := range cfg.IndexedAttributes { + indexAttributes.AppendEmpty().SetStr(indexAttribute) + } +} diff --git a/exporter/awsxrayexporter/awsxray_test.go b/exporter/awsxrayexporter/awsxray_test.go index a27c6044dfcc..4a76aee04928 100644 --- a/exporter/awsxrayexporter/awsxray_test.go +++ b/exporter/awsxrayexporter/awsxray_test.go @@ -6,6 +6,7 @@ package awsxrayexporter import ( "context" "crypto/rand" + "encoding/base64" "encoding/binary" "fmt" "testing" @@ -120,6 +121,79 @@ func TestMiddleware(t *testing.T) { handler.AssertCalled(t, "HandleResponse", mock.Anything, mock.Anything) } +func TestTraceExportOtlpFormat(t *testing.T) { + config := generateConfig(t) + config.TransitSpansInOtlpFormat = true + + traceExporter := initializeTracesExporter(t, generateConfig(t), telemetrytest.NewNopRegistry()) + ctx := context.Background() + td := constructSpanData() + err := traceExporter.ConsumeTraces(ctx, td) + assert.Error(t, err) + err = traceExporter.Shutdown(ctx) + assert.NoError(t, err) +} + +func TestEncodingOtlpFormat(t *testing.T) { + config1 := generateConfig(t) + config1.IndexAllAttributes = true + config1.IndexedAttributes = []string{"test", "test1", "test2"} + testEncodingOtlpFormatWithIndexConfiguration(t, config1) + + config2 := generateConfig(t) + config2.IndexedAttributes = []string{"test", "test1", "test2"} + testEncodingOtlpFormatWithIndexConfiguration(t, config2) + + config3 := generateConfig(t) + config3.IndexAllAttributes = true + testEncodingOtlpFormatWithIndexConfiguration(t, config3) + + config4 := generateConfig(t) + config4.IndexedAttributes = []string{} + testEncodingOtlpFormatWithIndexConfiguration(t, config4) + + config5 := generateConfig(t) + config5.IndexAllAttributes = false + testEncodingOtlpFormatWithIndexConfiguration(t, config5) + + config6 := generateConfig(t) + config6.IndexAllAttributes = false + config6.IndexedAttributes = []string{} + testEncodingOtlpFormatWithIndexConfiguration(t, config6) +} + +func testEncodingOtlpFormatWithIndexConfiguration(t *testing.T, config *Config) { + // 1. prepare 50 resource spans and encode them + td := constructMultiSpanData(50) + documents, err := encodeOtlpAsBase64(td, config) + assert.NoError(t, err) + assert.EqualValues(t, td.ResourceSpans().Len(), len(documents), "ensure #resourcespans same as #documents") + + // 2. ensure documents can be decoded back + unmarshaler := &ptrace.ProtoUnmarshaler{} + for i, document := range documents { + assert.EqualValues(t, "T1S", (*document)[0:3], "ensure protocol prefix") + decodedBytes, err := base64.StdEncoding.DecodeString((*document)[3:]) + assert.NoError(t, err) + + trace, err := unmarshaler.UnmarshalTraces(decodedBytes) + assert.NoError(t, err) + + trace.CopyTo(trace) + + // 3. ensure index configurations are carried + injectIndexConfigIntoOtlpPayload(td.ResourceSpans().At(i), config) + assert.EqualValues(t, td.ResourceSpans().At(i), trace.ResourceSpans().At(0)) + } +} + +func TestEncodingOtlpFormatWithEmptySpans(t *testing.T) { + td := constructMultiSpanData(0) + documents, err := encodeOtlpAsBase64(td, generateConfig(t)) + assert.NoError(t, err) + assert.EqualValues(t, 0, len(documents), "expect 0 document") +} + func BenchmarkForTracesExporter(b *testing.B) { traceExporter := initializeTracesExporter(b, generateConfig(b), telemetrytest.NewNopRegistry()) for i := 0; i < b.N; i++ { @@ -156,7 +230,6 @@ func generateConfig(t testing.TB) *Config { func constructSpanData() ptrace.Traces { resource := constructResource() - traces := ptrace.NewTraces() rspans := traces.ResourceSpans().AppendEmpty() resource.CopyTo(rspans.Resource()) @@ -165,6 +238,18 @@ func constructSpanData() ptrace.Traces { return traces } +func constructMultiSpanData(numSpans int) ptrace.Traces { + traces := ptrace.NewTraces() + for range numSpans { + resource := constructResource() + rspans := traces.ResourceSpans().AppendEmpty() + resource.CopyTo(rspans.Resource()) + ispans := rspans.ScopeSpans().AppendEmpty() + constructXrayTraceSpanData(ispans) + } + return traces +} + func constructW3CSpanData() ptrace.Traces { resource := constructResource() traces := ptrace.NewTraces() diff --git a/exporter/awsxrayexporter/config.go b/exporter/awsxrayexporter/config.go index 5f97907ce8e1..f7cbd9087473 100644 --- a/exporter/awsxrayexporter/config.go +++ b/exporter/awsxrayexporter/config.go @@ -30,6 +30,9 @@ type Config struct { // AWS client. MiddlewareID *component.ID `mapstructure:"middleware,omitempty"` + // X-Ray Export sends spans in its original otlp format to X-Ray Service when this flag is on + TransitSpansInOtlpFormat bool `mapstructure:"transit_spans_in_otlp_format,omitempty"` + // skipTimestampValidation if enabled, will skip timestamp validation logic on the trace ID skipTimestampValidation bool }