Skip to content

Commit

Permalink
Add http collector client (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewDolan authored Oct 10, 2017
1 parent 1e25a64 commit 51f65a8
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 189 deletions.
180 changes: 15 additions & 165 deletions grpc_collector_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ import (
"google.golang.org/grpc/credentials"

// N.B.(jmacd): Do not use google.golang.org/glog in this package.

google_protobuf "github.com/golang/protobuf/ptypes/timestamp"
cpb "github.com/lightstep/lightstep-tracer-go/collectorpb"
ot "github.com/opentracing/opentracing-go"
"github.com/lightstep/lightstep-tracer-go/collectorpb"
)

const (
Expand All @@ -34,23 +31,21 @@ type grpcCollectorClient struct {
attributes map[string]string
reporterID uint64

// accessToken is the access token used for explicit trace
// collection requests.
// accessToken is the access token used for explicit trace collection requests.
accessToken string

verbose bool // whether to print verbose messages
maxLogKeyLen int // see GrpcOptions.MaxLogKeyLen
maxLogValueLen int // see GrpcOptions.MaxLogValueLen
maxReportingPeriod time.Duration // set by GrpcOptions.MaxReportingPeriod
reconnectPeriod time.Duration // set by GrpcOptions.ReconnectPeriod
reportingTimeout time.Duration // set by GrpcOptions.ReportTimeout

// Remote service that will receive reports.
hostPort string
grpcClient cpb.CollectorServiceClient
grpcClient collectorpb.CollectorServiceClient
connTimestamp time.Time
dialOptions []grpc.DialOption

// converters
converter *protoConverter

// For testing purposes only
grpcConnectorFactory ConnectorFactory
}
Expand All @@ -61,12 +56,10 @@ func newGrpcCollectorClient(opts Options, reporterID uint64, attributes map[stri
attributes: attributes,
maxReportingPeriod: opts.ReportingPeriod,
reportingTimeout: opts.ReportTimeout,
verbose: opts.Verbose,
maxLogKeyLen: opts.MaxLogKeyLen,
maxLogValueLen: opts.MaxLogValueLen,
reporterID: reporterID,
hostPort: opts.Collector.HostPort(),
reconnectPeriod: time.Duration(float64(opts.ReconnectPeriod) * (1 + 0.2*rand.Float64())),
converter: newProtoConverter(opts),
grpcConnectorFactory: opts.ConnFactory,
}

Expand All @@ -89,7 +82,7 @@ func (client *grpcCollectorClient) ConnectClient() (Connection, error) {
return nil, err
}

grpcClient, ok := uncheckedClient.(cpb.CollectorServiceClient)
grpcClient, ok := uncheckedClient.(collectorpb.CollectorServiceClient)
if !ok {
return nil, fmt.Errorf("Grpc connector factory did not provide valid client!")
}
Expand All @@ -103,7 +96,7 @@ func (client *grpcCollectorClient) ConnectClient() (Connection, error) {
}

conn = transport
client.grpcClient = cpb.NewCollectorServiceClient(transport)
client.grpcClient = collectorpb.NewCollectorServiceClient(transport)
}
client.connTimestamp = now
return conn, nil
Expand All @@ -113,158 +106,15 @@ func (client *grpcCollectorClient) ShouldReconnect() bool {
return time.Now().Sub(client.connTimestamp) > client.reconnectPeriod
}

func (client *grpcCollectorClient) translateTags(tags ot.Tags) []*cpb.KeyValue {
kvs := make([]*cpb.KeyValue, 0, len(tags))
for key, tag := range tags {
kv := client.convertToKeyValue(key, tag)
kvs = append(kvs, kv)
}
return kvs
}

func (client *grpcCollectorClient) convertToKeyValue(key string, value interface{}) *cpb.KeyValue {
kv := cpb.KeyValue{Key: key}
v := reflect.ValueOf(value)
k := v.Kind()
switch k {
case reflect.String:
kv.Value = &cpb.KeyValue_StringValue{StringValue: v.String()}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
kv.Value = &cpb.KeyValue_IntValue{IntValue: v.Convert(intType).Int()}
case reflect.Float32, reflect.Float64:
kv.Value = &cpb.KeyValue_DoubleValue{DoubleValue: v.Float()}
case reflect.Bool:
kv.Value = &cpb.KeyValue_BoolValue{BoolValue: v.Bool()}
default:
kv.Value = &cpb.KeyValue_StringValue{StringValue: fmt.Sprint(v)}
maybeLogInfof("value: %v, %T, is an unsupported type, and has been converted to string", client.verbose, v, v)
}
return &kv
}

func (client *grpcCollectorClient) translateLogs(lrs []ot.LogRecord, buffer *reportBuffer) []*cpb.Log {
logs := make([]*cpb.Log, len(lrs))
for i, lr := range lrs {
logs[i] = &cpb.Log{
Timestamp: translateTime(lr.Timestamp),
}
marshalFields(client, logs[i], lr.Fields, buffer)
}
return logs
}

func (client *grpcCollectorClient) translateRawSpan(rs RawSpan, buffer *reportBuffer) *cpb.Span {
s := &cpb.Span{
SpanContext: translateSpanContext(rs.Context),
OperationName: rs.Operation,
References: translateParentSpanID(rs.ParentSpanID),
StartTimestamp: translateTime(rs.Start),
DurationMicros: translateDuration(rs.Duration),
Tags: client.translateTags(rs.Tags),
Logs: client.translateLogs(rs.Logs, buffer),
}
return s
}

func (client *grpcCollectorClient) convertRawSpans(buffer *reportBuffer) []*cpb.Span {
spans := make([]*cpb.Span, len(buffer.rawSpans))
for i, rs := range buffer.rawSpans {
s := client.translateRawSpan(rs, buffer)
spans[i] = s
}
return spans
}

func (client *grpcCollectorClient) makeReportRequest(buffer *reportBuffer) *cpb.ReportRequest {
spans := client.convertRawSpans(buffer)
reporter := convertToReporter(client.attributes, client.reporterID)

req := cpb.ReportRequest{
Reporter: reporter,
Auth: &cpb.Auth{AccessToken: client.accessToken},
Spans: spans,
InternalMetrics: convertToInternalMetrics(buffer),
}
return &req

}

func (client *grpcCollectorClient) Report(ctx context.Context, buffer *reportBuffer) (collectorResponse, error) {
resp, err := client.grpcClient.Report(ctx, client.makeReportRequest(buffer))
resp, err := client.grpcClient.Report(ctx, client.converter.toReportRequest(
client.reporterID,
client.attributes,
client.accessToken,
buffer,
))
if err != nil {
return nil, err
}

return resp, nil
}

func translateAttributes(atts map[string]string) []*cpb.KeyValue {
tags := make([]*cpb.KeyValue, 0, len(atts))
for k, v := range atts {
tags = append(tags, &cpb.KeyValue{Key: k, Value: &cpb.KeyValue_StringValue{StringValue: v}})
}
return tags
}

func convertToReporter(atts map[string]string, id uint64) *cpb.Reporter {
return &cpb.Reporter{
ReporterId: id,
Tags: translateAttributes(atts),
}
}

func generateMetricsSample(b *reportBuffer) []*cpb.MetricsSample {
return []*cpb.MetricsSample{
&cpb.MetricsSample{
Name: spansDropped,
Value: &cpb.MetricsSample_IntValue{IntValue: b.droppedSpanCount},
},
&cpb.MetricsSample{
Name: logEncoderErrors,
Value: &cpb.MetricsSample_IntValue{IntValue: b.logEncoderErrorCount},
},
}
}

func convertToInternalMetrics(b *reportBuffer) *cpb.InternalMetrics {
return &cpb.InternalMetrics{
StartTimestamp: translateTime(b.reportStart),
DurationMicros: translateDurationFromOldestYoungest(b.reportStart, b.reportEnd),
Counts: generateMetricsSample(b),
}
}

func translateSpanContext(sc SpanContext) *cpb.SpanContext {
return &cpb.SpanContext{
TraceId: sc.TraceID,
SpanId: sc.SpanID,
Baggage: sc.Baggage,
}
}

func translateParentSpanID(pid uint64) []*cpb.Reference {
if pid == 0 {
return nil
}
return []*cpb.Reference{
&cpb.Reference{
Relationship: cpb.Reference_CHILD_OF,
SpanContext: &cpb.SpanContext{SpanId: pid},
},
}
}

func translateTime(t time.Time) *google_protobuf.Timestamp {
return &google_protobuf.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
}

func translateDuration(d time.Duration) uint64 {
return uint64(d) / 1000
}

func translateDurationFromOldestYoungest(ot time.Time, yt time.Time) uint64 {
return translateDuration(yt.Sub(ot))
}
137 changes: 137 additions & 0 deletions http_collector_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package lightstep

import (
"bytes"
"github.com/golang/protobuf/proto"
"github.com/lightstep/lightstep-tracer-go/collectorpb"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"io/ioutil"
"net/http"
"time"
)

const (
collectorHttpMethod = "POST"
collectorHttpPath = "/v1/report"
collectorHttpContentTypeHeaderValue = "application/octet-stream"

contentTypeHeaderKey = "Content-Type"
)

// grpcCollectorClient specifies how to send reports back to a LightStep
// collector via grpc.
type httpCollectorClient struct {
// auth and runtime information
reporterID uint64
accessToken string // accessToken is the access token used for explicit trace collection requests.
attributes map[string]string

reportTimeout time.Duration

// Remote service that will receive reports.
socketAddress string
client *http.Client

// converters
converter *protoConverter
}

type transportCloser struct {
transport http2.Transport
}

func (closer *transportCloser) Close() error {
closer.transport.CloseIdleConnections()

return nil
}

func newHttpCollectorClient(opts Options, reporterID uint64, attributes map[string]string) *httpCollectorClient {
return &httpCollectorClient{
reporterID: reporterID,
accessToken: opts.AccessToken,
attributes: attributes,
reportTimeout: opts.ReportTimeout,
socketAddress: opts.Collector.HostPort(),
converter: newProtoConverter(opts),
}
}

func (client *httpCollectorClient) ConnectClient() (Connection, error) {
transport := &http2.Transport{}

client.client = &http.Client{
Transport: transport,
Timeout: client.reportTimeout,
}

return &transportCloser{}, nil
}

func (client *httpCollectorClient) ShouldReconnect() bool {
// http2 will handle connection reuse under the hood
return false
}

func (client *httpCollectorClient) Report(context context.Context, buffer *reportBuffer) (collectorResponse, error) {
httpRequest, err := client.toRequest(context, buffer)
if err != nil {
return nil, err
}

httpResponse, err := client.client.Do(httpRequest)
if err != nil {
return nil, err
}
defer httpResponse.Body.Close()

response, err := client.toResponse(httpResponse)
if err != nil {
return nil, err
}

return response, nil
}

func (client *httpCollectorClient) toRequest(
context context.Context,
buffer *reportBuffer,
) (*http.Request, error) {
protoRequest := client.converter.toReportRequest(
client.reporterID,
client.attributes,
client.accessToken,
buffer,
)

buf, err := proto.Marshal(protoRequest)
if err != nil {
return nil, err
}

requestBody := bytes.NewReader(buf)

request, err := http.NewRequest(collectorHttpMethod, client.socketAddress+collectorHttpPath, requestBody)
if err != nil {
return nil, err
}
request = request.WithContext(context)
request.Header.Set(contentTypeHeaderKey, collectorHttpContentTypeHeaderValue)

return request, nil
}

func (client *httpCollectorClient) toResponse(response *http.Response) (collectorResponse, error) {
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}

protoResponse := &collectorpb.ReportResponse{}
if err := proto.Unmarshal(body, protoResponse); err != nil {
return nil, err
}

return protoResponse, nil
}
2 changes: 1 addition & 1 deletion lightstep-tracer-common
9 changes: 5 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@ type Options struct {
// Set Verbose to true to enable more text logging.
Verbose bool `yaml:"verbose"`

// DEPRECATED: set `UseThrift` to true if you do not want gRPC
UseGRPC bool `yaml:"usegrpc"`

// Switch to
// Force the use of a specific transport protocol.
// If multiple are set to true, the following order is used to select for the first option: thrift, http, grpc.
// If none are set to true, GRPC is defaulted to.
UseThrift bool `yaml:"use_thrift"`
UseHttp bool `yaml:"use_http"`
UseGRPC bool `yaml:"usegrpc"`

ReconnectPeriod time.Duration `yaml:"reconnect_period"`

Expand Down
Loading

0 comments on commit 51f65a8

Please sign in to comment.