Skip to content

Commit

Permalink
Separate trace handling from Observable. (#319)
Browse files Browse the repository at this point in the history
This allows trace spans to be created with a different granularity
than that of Observable metrics. In particular, this commit replaces
the nested send / receive spans into a single send and receive span
created in the client. In addition, encode / decode reporters are
created without access to context making them unable to attach tracing
data to the active trace. With the reduced granularity of emitted
spans, it should now be appropriate to always enable trace
handling. As such, the EnableTracing function has been marked as
deprecated and will have no effect.

The client send and receive spans have been renamed with the prefix
"cloudevents" in order to distinguish them from send and receive spans
created by other libraries. Additionally these spans are annotated
with cloud event attributes from handled events.

Signed-off-by: Ian Milligan <[email protected]>
  • Loading branch information
ian-mi authored Feb 28, 2020
1 parent 16dc438 commit b72bc2a
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 213 deletions.
4 changes: 0 additions & 4 deletions cmd/samples/stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec"
"github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json"
"github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml"
"github.com/cloudevents/sdk-go/pkg/cloudevents/observability"

"contrib.go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
Expand All @@ -35,9 +34,6 @@ func main() {
log.Fatalf("failed to create client, %v", err)
}

// Uncomment the following to see that tracing can be disabled.
observability.EnableTracing(false)

go mainSender()
go mainMetrics()

Expand Down
4 changes: 2 additions & 2 deletions pkg/binding/example_using_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const count = 3 // Example ends after this many events.

// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
c, err := client.New(NewExTransport(nil, w))
c, err := client.New(NewExTransport(nil, w), client.WithoutTracePropagation())
if err != nil {
return err
}
Expand Down Expand Up @@ -44,7 +44,7 @@ func runReceiver(r io.Reader) error {
}
return nil
}
c, err := client.New(NewExTransport(r, nil))
c, err := client.New(NewExTransport(r, nil), client.WithoutTracePropagation())
if err != nil {
return err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/cloudevents/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ type ceClient struct {
// error.
func (c *ceClient) Send(ctx context.Context, event cloudevents.Event) (context.Context, *cloudevents.Event, error) {
ctx, r := observability.NewReporter(ctx, reportSend)

ctx, span := trace.StartSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(eventTraceAttributes(event.Context)...)
}

rctx, resp, err := c.obsSend(ctx, event)
if err != nil {
r.Error()
Expand Down Expand Up @@ -130,17 +137,22 @@ func (c *ceClient) obsSend(ctx context.Context, event cloudevents.Event) (contex

// Receive is called from from the transport on event delivery.
func (c *ceClient) Receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
var r observability.Reporter
ctx, r := observability.NewReporter(ctx, reportReceive)

var span *trace.Span
if !c.transport.HasTracePropagation() {
if ext, ok := extensions.GetDistributedTracingExtension(event); ok {
if sc, err := ext.ToSpanContext(); err == nil {
ctx, r = observability.NewReporterWithRemoteParent(ctx, reportReceive, sc)
}
ctx, span = ext.StartChildSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindServer))
}
}
if r == nil {
ctx, r = observability.NewReporter(ctx, reportReceive)
if span == nil {
ctx, span = trace.StartSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindServer))
}
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(eventTraceAttributes(event.Context)...)
}

err := c.obsReceive(ctx, event, resp)
if err != nil {
r.Error()
Expand Down
105 changes: 45 additions & 60 deletions pkg/cloudevents/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client_test
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -15,9 +14,9 @@ import (

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"github.com/cloudevents/sdk-go/pkg/cloudevents/observability"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"github.com/lightstep/tracecontext.go/traceparent"
"go.opencensus.io/trace"

"github.com/google/go-cmp/cmp"
Expand All @@ -44,6 +43,22 @@ func simpleBinaryClient(target string) client.Client {
return nil
}

c, err := client.New(t, client.WithoutTracePropagation())
if err != nil {
return nil
}
return c
}

func simpleTracingBinaryClient(target string) client.Client {
t, err := cehttp.New(
cehttp.WithTarget(target),
cehttp.WithBinaryEncoding(),
)
if err != nil {
return nil
}

c, err := client.New(t)
if err != nil {
return nil
Expand All @@ -60,7 +75,7 @@ func simpleStructuredClient(target string) client.Client {
return nil
}

c, err := client.New(t)
c, err := client.New(t, client.WithoutTracePropagation())
if err != nil {
return nil
}
Expand Down Expand Up @@ -280,15 +295,14 @@ func TestTracingClientSend(t *testing.T) {
now := time.Now()

testCases := map[string]struct {
c func(target string) client.Client
event cloudevents.Event
resp *http.Response
want *requestValidation
wantErr string
sample bool
c func(target string) client.Client
event cloudevents.Event
resp *http.Response
tpHeader string
sample bool
}{
"send unsampled": {
c: simpleBinaryClient,
c: simpleTracingBinaryClient,
event: cloudevents.Event{
Context: cloudevents.EventContextV01{
EventType: "unit.test.client",
Expand All @@ -304,19 +318,10 @@ func TestTracingClientSend(t *testing.T) {
resp: &http.Response{
StatusCode: http.StatusAccepted,
},
want: &requestValidation{
Headers: map[string][]string{
"ce-specversion": {"1.0"},
"ce-id": {"AABBCCDDEE"},
"ce-time": {now.UTC().Format(time.RFC3339Nano)},
"ce-type": {"unit.test.client"},
"ce-source": {"/unit/test/client"},
},
Body: `{"msg":"hello","sq":42}`,
},
tpHeader: "ce-traceparent",
},
"send sampled": {
c: simpleBinaryClient,
c: simpleTracingBinaryClient,
event: cloudevents.Event{
Context: cloudevents.EventContextV01{
EventType: "unit.test.client",
Expand All @@ -332,17 +337,8 @@ func TestTracingClientSend(t *testing.T) {
resp: &http.Response{
StatusCode: http.StatusAccepted,
},
want: &requestValidation{
Headers: map[string][]string{
"ce-specversion": {"1.0"},
"ce-id": {"AABBCCDDEE"},
"ce-time": {now.UTC().Format(time.RFC3339Nano)},
"ce-type": {"unit.test.client"},
"ce-source": {"/unit/test/client"},
},
Body: `{"msg":"hello","sq":42}`,
},
sample: true,
sample: true,
tpHeader: "ce-traceparent",
},
}
for n, tc := range testCases {
Expand All @@ -365,41 +361,31 @@ func TestTracingClientSend(t *testing.T) {
}
ctx, span := trace.StartSpan(context.TODO(), "test-span", trace.WithSampler(sampler))
sc := span.SpanContext()
want := requestValidation{
Host: tc.want.Host,
Headers: tc.want.Headers.Clone(),
Body: tc.want.Body,
}
tp := "00-" + hex.EncodeToString(sc.TraceID[:]) + "-" + hex.EncodeToString(sc.SpanID[:])
if tc.sample {
tp += "-01"
} else {
tp += "-00"
}
want.Headers.Add("ce-traceparent", tp)

_, _, err := c.Send(ctx, tc.event)
span.End()

if tc.wantErr != "" {
if err == nil {
t.Fatalf("failed to return expected error, got nil")
}
want := tc.wantErr
got := err.Error()
if !strings.Contains(got, want) {
t.Fatalf("failed to return expected error, got %q, want %q", err, want)
}
return
} else {
if err != nil {
t.Fatalf("failed to send event: %s", err)
}
if err != nil {
t.Fatalf("failed to send event: %s", err)
}

rv := handler.popRequest(t)

assertEquality(t, server.URL, want, rv)
var got traceparent.TraceParent
if tp := rv.Headers.Get(tc.tpHeader); tp == "" {
t.Fatal("missing traceparent header")
} else {
got, err = traceparent.ParseString(tp)
if err != nil {
t.Fatalf("invalid traceparent: %s", err)
}
}
if got.TraceID != sc.TraceID {
t.Errorf("unexpected trace id: want %s got %s", sc.TraceID, got.TraceID)
}
if got.Flags.Recorded != tc.sample {
t.Errorf("unexpected recorded flag: want %t got %t", tc.sample, got.Flags.Recorded)
}
})
}
}
Expand Down Expand Up @@ -691,7 +677,6 @@ func TestClientReceive(t *testing.T) {

func TestTracedClientReceive(t *testing.T) {
now := time.Now()
observability.EnableTracing(true)

testCases := map[string]struct {
optsFn func(port int, path string) []cehttp.Option
Expand Down
37 changes: 25 additions & 12 deletions pkg/cloudevents/client/observability.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package client

import (
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/observability"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
)

var (
Expand All @@ -29,22 +31,18 @@ type observed int32
var _ observability.Observable = observed(0)

const (
clientSpanName = "cloudevents.client"

specversionAttr = "cloudevents.specversion"
typeAttr = "cloudevents.type"
sourceAttr = "cloudevents.source"
subjectAttr = "cloudevents.subject"
datacontenttypeAttr = "cloudevents.datacontenttype"

reportSend observed = iota
reportReceive
)

// TraceName implements Observable.TraceName
func (o observed) TraceName() string {
switch o {
case reportSend:
return "client/send"
case reportReceive:
return "client/receive"
default:
return "client/unknown"
}
}

// MethodName implements Observable.MethodName
func (o observed) MethodName() string {
switch o {
Expand All @@ -61,3 +59,18 @@ func (o observed) MethodName() string {
func (o observed) LatencyMs() *stats.Float64Measure {
return LatencyMs
}

func eventTraceAttributes(e cloudevents.EventContextReader) []trace.Attribute {
as := []trace.Attribute{
trace.StringAttribute(specversionAttr, e.GetSpecVersion()),
trace.StringAttribute(typeAttr, e.GetType()),
trace.StringAttribute(sourceAttr, e.GetSource()),
}
if sub := e.GetSubject(); sub != "" {
as = append(as, trace.StringAttribute(subjectAttr, sub))
}
if dct := e.GetDataContentType(); dct != "" {
as = append(as, trace.StringAttribute(datacontenttypeAttr, dct))
}
return as
}
12 changes: 0 additions & 12 deletions pkg/cloudevents/datacodec/json/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ const (
reportDecode
)

// TraceName implements Observable.TraceName
func (o observed) TraceName() string {
switch o {
case reportEncode:
return "datacodec/json/encode"
case reportDecode:
return "datacodec/json/decode"
default:
return "datacodec/json/unknown"
}
}

// MethodName implements Observable.MethodName
func (o observed) MethodName() string {
switch o {
Expand Down
12 changes: 0 additions & 12 deletions pkg/cloudevents/datacodec/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ const (
reportDecode
)

// TraceName implements Observable.TraceName
func (o observed) TraceName() string {
switch o {
case reportEncode:
return "datacodec/encode"
case reportDecode:
return "datacodec/decode"
default:
return "datacodec/unknown"
}
}

// MethodName implements Observable.MethodName
func (o observed) MethodName() string {
switch o {
Expand Down
12 changes: 0 additions & 12 deletions pkg/cloudevents/datacodec/xml/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ const (
reportDecode
)

// TraceName implements Observable.TraceName
func (o observed) TraceName() string {
switch o {
case reportEncode:
return "datacodec/xml/encode"
case reportDecode:
return "datacodec/xml/decode"
default:
return "datacodec/xml/unknown"
}
}

// MethodName implements Observable.MethodName
func (o observed) MethodName() string {
switch o {
Expand Down
Loading

0 comments on commit b72bc2a

Please sign in to comment.