From 17fbf59446c267827d5f6888490de8b1903abb9c Mon Sep 17 00:00:00 2001 From: alrex Date: Tue, 31 Mar 2020 15:20:35 -0700 Subject: [PATCH] Adding metrics (#249) This change adds metrics reporting to the tracer. Co-Authored-By: Isobel Redelmeier --- CHANGELOG.md | 6 +- README.md | 2 + VERSION | 2 +- constants/constants.go | 10 + events.go | 101 ++++++++- go.mod | 5 +- go.sum | 17 +- internal/metrics/internal_test.go | 47 +++++ internal/metrics/metrics.go | 116 ++++++++++ internal/metrics/package.go | 1 + internal/metrics/reporter.go | 340 ++++++++++++++++++++++++++++++ internal/metrics/reporter_test.go | 147 +++++++++++++ lightstepoc/options.go | 3 +- options.go | 59 +++++- tracer.go | 80 ++++++- tracer_test.go | 1 + version.go | 2 +- 17 files changed, 904 insertions(+), 35 deletions(-) create mode 100644 constants/constants.go create mode 100644 internal/metrics/internal_test.go create mode 100644 internal/metrics/metrics.go create mode 100644 internal/metrics/package.go create mode 100644 internal/metrics/reporter.go create mode 100644 internal/metrics/reporter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e048be801..98dd9e6f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog -## [Pending Release](https://github.com/lightstep/lightstep-tracer-go/compare/v0.19.0...HEAD) +## [Pending Release](https://github.com/lightstep/lightstep-tracer-go/compare/v0.20.0...HEAD) + +## [v0.20.0](https://github.com/lightstep/lightstep-tracer-go/compare/v0.19.0...v0.20.0) +* Adding support for reporting metrics to Lightstep +* Updating opencensus dependency ## [v0.19.0](https://github.com/lightstep/lightstep-tracer-go/compare/v0.18.1...v0.19.0) * Add flush duration to status report struct. diff --git a/README.md b/README.md index bf607c1f8..bce062667 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,8 @@ logAndMetricsHandler := func(event lightstep.Event){ switch event := event.(type) { case EventStatusReport: metrics.Count("tracer.dropped_spans", event.DroppedSpans()) + case MetricEventStatusReport: + metrics.Count("tracer.sent_metrics", event.SentMetrics()) case ErrorEvent: logger.Error("LS Tracer error: %s", event) default: diff --git a/VERSION b/VERSION index 1cf0537c3..5a03fb737 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.19.0 +0.20.0 diff --git a/constants/constants.go b/constants/constants.go new file mode 100644 index 000000000..d40c085a8 --- /dev/null +++ b/constants/constants.go @@ -0,0 +1,10 @@ +package constants + +const ( + // ComponentNameKey is the tag key identifying the service + ComponentNameKey = "lightstep.component_name" + // HostnameKey is the tag key identifying hostname + HostnameKey = "lightstep.hostname" + // ServiceVersionKey is the tag key identifying service version + ServiceVersionKey = "service.version" +) diff --git a/events.go b/events.go index 4c6b7b679..d7e816ea4 100644 --- a/events.go +++ b/events.go @@ -101,11 +101,11 @@ func (e *eventFlushError) State() EventFlushErrorState { } func (e *eventFlushError) String() string { - return e.err.Error() + return e.Error() } func (e *eventFlushError) Error() string { - return e.err.Error() + return e.Err().Error() } func (e *eventFlushError) Err() error { @@ -131,11 +131,11 @@ func (*eventConnectionError) Event() {} func (*eventConnectionError) EventConnectionError() {} func (e *eventConnectionError) String() string { - return e.err.Error() + return e.Error() } func (e *eventConnectionError) Error() string { - return e.err.Error() + return e.Err().Error() } func (e *eventConnectionError) Err() error { @@ -173,6 +173,22 @@ type EventStatusReport interface { FlushDuration() time.Duration } +// MetricEventStatusReport occurs every time metrics are sent successfully.. It +// contains all metrics collected since the previous successful flush. +type MetricEventStatusReport interface { + Event + MetricEventStatusReport() + + // StartTime is the earliest time a span was added to the report buffer. + StartTime() time.Time + + // FinishTime is the latest time a span was added to the report buffer. + FinishTime() time.Time + + // SentMetrics is the number of metrics sent in the report buffer. + SentMetrics() int +} + type eventStatusReport struct { startTime time.Time finishTime time.Time @@ -270,11 +286,11 @@ func (e *eventUnsupportedTracer) Tracer() opentracing.Tracer { } func (e *eventUnsupportedTracer) String() string { - return e.err.Error() + return e.Error() } func (e *eventUnsupportedTracer) Error() string { - return e.err.Error() + return e.Err().Error() } func (e *eventUnsupportedTracer) Err() error { @@ -322,11 +338,11 @@ func (e *eventUnsupportedValue) Value() interface{} { } func (e *eventUnsupportedValue) String() string { - return e.err.Error() + return e.Error() } func (e *eventUnsupportedValue) Error() string { - return e.err.Error() + return e.Err().Error() } func (e *eventUnsupportedValue) Err() error { @@ -353,3 +369,72 @@ func (eventTracerDisabled) EventTracerDisabled() {} func (eventTracerDisabled) String() string { return tracerDisabled } + +type EventSystemMetricsMeasurementFailed interface { + ErrorEvent +} + +type eventSystemMetricsMeasurementFailed struct { + err error +} + +func newEventSystemMetricsMeasurementFailed(err error) *eventSystemMetricsMeasurementFailed { + return &eventSystemMetricsMeasurementFailed{ + err: err, + } +} + +func (e *eventSystemMetricsMeasurementFailed) Event() {} + +func (e *eventSystemMetricsMeasurementFailed) String() string { + return e.Error() +} + +func (e *eventSystemMetricsMeasurementFailed) Error() string { + return e.Err().Error() +} + +func (e *eventSystemMetricsMeasurementFailed) Err() error { + return e.err +} + +type eventSystemMetricsStatusReport struct { + startTime time.Time + finishTime time.Time + sentMetrics int +} + +func newEventSystemMetricsStatusReport( + startTime, finishTime time.Time, + sentMetrics int, +) *eventSystemMetricsStatusReport { + return &eventSystemMetricsStatusReport{ + startTime: startTime, + finishTime: finishTime, + sentMetrics: sentMetrics, + } +} + +func (e *eventSystemMetricsStatusReport) Event() {} + +func (e *eventSystemMetricsStatusReport) MetricEventStatusReport() {} + +func (e *eventSystemMetricsStatusReport) String() string { + return fmt.Sprint( + "METRICS STATUS REPORT start: ", e.startTime, + ", end: ", e.finishTime, + ", sent metrics: ", e.sentMetrics, + ) +} + +func (e *eventSystemMetricsStatusReport) StartTime() time.Time { + return e.startTime +} + +func (e *eventSystemMetricsStatusReport) FinishTime() time.Time { + return e.finishTime +} + +func (e *eventSystemMetricsStatusReport) SentMetrics() int { + return e.sentMetrics +} diff --git a/go.mod b/go.mod index 056a8ae81..bbf24c573 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,15 @@ module github.com/lightstep/lightstep-tracer-go go 1.12 require ( + github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-ole/go-ole v1.2.4 // indirect github.com/gogo/protobuf v1.2.1 - github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743 + github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20200305213919-a88bf8de3718 github.com/onsi/ginkgo v1.7.0 github.com/onsi/gomega v1.4.3 github.com/opentracing/opentracing-go v1.0.2 + github.com/shirou/gopsutil v2.20.1+incompatible go.opencensus.io v0.22.3 google.golang.org/grpc v1.21.0 ) diff --git a/go.sum b/go.sum index 4b5e2a954..76f25f21e 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,15 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= +github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -22,8 +26,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743 h1:143Bb8f8DuGWck/xpNUOckBVYfFbBTnLevfRZ1aVVqo= -github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= +github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20200305213919-a88bf8de3718 h1:lrdADj7ifyBpqGJ+cT4vE5ztUoAF87uUf76+epwPViY= +github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20200305213919-a88bf8de3718/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -33,6 +37,8 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY= +github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -47,24 +53,19 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd h1:r7DufRZuZbWB7j439YfAzP8RPDa9unLkpwQKUYbIMPI= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -79,7 +80,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190530194941-fb225487d101 h1:wuGevabY6r+ivPNagjUXGGxF+GqgMd+dBhjsxW4q9u4= google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s= -google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0= @@ -90,7 +90,6 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/metrics/internal_test.go b/internal/metrics/internal_test.go new file mode 100644 index 000000000..1b9675180 --- /dev/null +++ b/internal/metrics/internal_test.go @@ -0,0 +1,47 @@ +package metrics + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" +) + +var _ = ginkgo.Describe("the reporter", func() { + ginkgo.Describe("configuration", func() { + ginkgo.It("uses default values", func() { + defaultReporter := NewReporter() + gomega.Expect(defaultReporter.tracerID).To(gomega.Equal(uint64(0))) + gomega.Expect(defaultReporter.attributes).To(gomega.Equal(map[string]string{})) + gomega.Expect(defaultReporter.address).To(gomega.Equal(fmt.Sprintf("%s%s", defaultReporterAddress, reporterPath))) + gomega.Expect(defaultReporter.timeout).To(gomega.Equal(defaultReporterTimeout)) + gomega.Expect(defaultReporter.measurementDuration).To(gomega.Equal(defaultReporterMeasurementDuration)) + gomega.Expect(defaultReporter.accessToken).To(gomega.Equal("")) + }) + ginkgo.It("uses configured values", func() { + duration := time.Second * 1 + accessToken := "token-1234" + tracerID := uint64(1234) + attributes := map[string]string{ + "key1": "val1", + "key2": "val2", + } + address := "http://localhost:8080" + defaultReporter := NewReporter( + WithReporterTracerID(tracerID), + WithReporterAttributes(attributes), + WithReporterAddress(address), + WithReporterTimeout(duration), + WithReporterMeasurementDuration(duration), + WithReporterAccessToken(accessToken), + ) + gomega.Expect(defaultReporter.tracerID).To(gomega.Equal(tracerID)) + gomega.Expect(defaultReporter.attributes).To(gomega.Equal(attributes)) + gomega.Expect(defaultReporter.address).To(gomega.Equal(fmt.Sprintf("%s%s", address, reporterPath))) + gomega.Expect(defaultReporter.timeout).To(gomega.Equal(duration)) + gomega.Expect(defaultReporter.measurementDuration).To(gomega.Equal(duration)) + gomega.Expect(defaultReporter.accessToken).To(gomega.Equal(accessToken)) + }) + }) +}) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 000000000..b00bbe97d --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,116 @@ +package metrics + +import ( + "context" + "os" + "runtime" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" + "github.com/shirou/gopsutil/net" + "github.com/shirou/gopsutil/process" +) + +type Metrics struct { + ProcessCPU ProcessCPU + CPU map[string]CPU + NIC map[string]NIC + Memory Memory + Runtime Runtime +} + +type Runtime struct { + NumGC uint64 + NumGoroutine uint64 +} + +type ProcessCPU struct { + User float64 + System float64 +} + +type CPU struct { + User float64 + System float64 + Usage float64 + Total float64 +} + +type NIC struct { + BytesReceived uint64 + BytesSent uint64 +} + +type Memory struct { + Available uint64 + Total uint64 + HeapAlloc uint64 +} + +func Measure(ctx context.Context) (Metrics, error) { + p, err := process.NewProcess(int32(os.Getpid())) // TODO: cache the process + if err != nil { + return Metrics{}, err + } + + processTimes, err := p.TimesWithContext(ctx) // returns user and system time for process + if err != nil { + return Metrics{}, err + } + + systemTimes, err := cpu.TimesWithContext(ctx, false) + if err != nil { + return Metrics{}, err + } + + netStats, err := net.IOCountersWithContext(ctx, false) + if err != nil { + return Metrics{}, err + } + + var rtm runtime.MemStats + runtime.ReadMemStats(&rtm) + runtime := Runtime{ + NumGC: uint64(rtm.NumGC), + NumGoroutine: uint64(runtime.NumGoroutine()), + } + + memStats, err := mem.VirtualMemoryWithContext(ctx) + if err != nil { + return Metrics{}, err + } + + metrics := Metrics{ + ProcessCPU: ProcessCPU{ + User: processTimes.User, + System: processTimes.System, + }, + CPU: make(map[string]CPU, len(systemTimes)), + NIC: make(map[string]NIC, len(netStats)), + Memory: Memory{ + Available: memStats.Available, + Total: memStats.Total, + HeapAlloc: rtm.HeapAlloc, + }, + Runtime: runtime, + } + + for _, t := range systemTimes { + usage := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + metrics.CPU[t.CPU] = CPU{ + User: t.User, + System: t.System, + Usage: usage, + Total: usage + t.Idle, + } + } + + for _, counters := range netStats { + metrics.NIC[counters.Name] = NIC{ + BytesReceived: counters.BytesRecv, + BytesSent: counters.BytesSent, + } + } + + return metrics, nil +} diff --git a/internal/metrics/package.go b/internal/metrics/package.go new file mode 100644 index 000000000..06e6cbd66 --- /dev/null +++ b/internal/metrics/package.go @@ -0,0 +1 @@ +package metrics // import "github.com/lightstep/lightstep-tracer-go/internal/metrics" diff --git a/internal/metrics/reporter.go b/internal/metrics/reporter.go new file mode 100644 index 000000000..e65eee6c5 --- /dev/null +++ b/internal/metrics/reporter.go @@ -0,0 +1,340 @@ +package metrics + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/base32" + "fmt" + "io" + mathrand "math/rand" + "net/http" + "strings" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" + "github.com/lightstep/lightstep-tracer-common/golang/gogo/collectorpb" + "github.com/lightstep/lightstep-tracer-common/golang/gogo/metricspb" + "github.com/lightstep/lightstep-tracer-go/constants" +) + +const ( + defaultReporterAddress = "https://ingest.lightstep.com:443" + defaultReporterTimeout = time.Second * 5 + defaultReporterMeasurementDuration = time.Second * 30 + defaultMaxDuration = time.Minute * 10 +) + +var ( + acceptHeader = http.CanonicalHeaderKey("Accept") + contentTypeHeader = http.CanonicalHeaderKey("Content-Type") + accessTokenHeader = http.CanonicalHeaderKey("Lightstep-Access-Token") +) + +const ( + reporterPath = "/metrics" + + idempotencyKeyByteLength = 30 + protoContentType = "application/octet-stream" + + ReporterPlatformKey = "lightstep.reporter_platform" + ReporterPlatformVersionKey = "lightstep.reporter_platform_version" + ReporterVersionKey = "lightstep.reporter_version" +) + +type Reporter struct { + client *http.Client + tracerID uint64 + attributes map[string]string + address string + timeout time.Duration + measurementDuration time.Duration + accessToken string + stored Metrics + collectorReporter *collectorpb.Reporter + labels []*collectorpb.KeyValue + Start time.Time + End time.Time + MetricsCount int + skippedInitialReport bool +} + +func attributesToTags(attributes map[string]string) []*collectorpb.KeyValue { + tags := []*collectorpb.KeyValue{} + for k, v := range attributes { + tags = append(tags, &collectorpb.KeyValue{Key: k, Value: &collectorpb.KeyValue_StringValue{StringValue: v}}) + } + return tags +} + +func getLabels(attributes map[string]string) []*collectorpb.KeyValue { + labels := []*collectorpb.KeyValue{} + filters := []string{ + constants.ComponentNameKey, + constants.ServiceVersionKey, + constants.HostnameKey, + } + for k, v := range attributes { + for _, l := range filters { + if k == l { + if len(v) > 0 { + labels = append(labels, &collectorpb.KeyValue{Key: k, Value: &collectorpb.KeyValue_StringValue{StringValue: v}}) + } + break + } + } + } + return labels +} + +func NewReporter(opts ...ReporterOption) *Reporter { + c := newConfig(opts...) + + return &Reporter{ + client: &http.Client{}, + tracerID: c.tracerID, + attributes: c.attributes, + address: fmt.Sprintf("%s%s", c.address, reporterPath), + timeout: c.timeout, + accessToken: c.accessToken, + collectorReporter: &collectorpb.Reporter{ + ReporterId: c.tracerID, + Tags: attributesToTags(c.attributes), + }, + measurementDuration: c.measurementDuration, + labels: getLabels(c.attributes), + } +} + +func (r *Reporter) prepareRequest(m Metrics) (*metricspb.IngestRequest, error) { + idempotencyKey, err := generateIdempotencyKey() + if err != nil { + return nil, err + } + return &metricspb.IngestRequest{ + IdempotencyKey: idempotencyKey, + Reporter: r.collectorReporter, + }, nil +} + +func (r *Reporter) addFloat(key string, value float64, kind metricspb.MetricKind, intervals int64) *metricspb.MetricPoint { + return &metricspb.MetricPoint{ + Kind: kind, + MetricName: key, + Labels: r.labels, + Value: &metricspb.MetricPoint_DoubleValue{ + DoubleValue: value, + }, + Start: &types.Timestamp{ + Seconds: r.Start.Unix(), + Nanos: int32(r.Start.Nanosecond()), + }, + Duration: &types.Duration{ + Seconds: int64(r.measurementDuration.Seconds()) * intervals, + }, + } +} + +// Measure takes a snapshot of system metrics and sends them +// to a LightStep endpoint. +func (r *Reporter) Measure(ctx context.Context, intervals int64) error { + start := time.Now() + r.Start = start + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + m, err := Measure(ctx) + if err != nil { + return err + } + + if !r.skippedInitialReport { + // intentionally throw away the initial delta report + // and measure again + r.skippedInitialReport = true + r.stored = m + m, err = Measure(ctx) + if err != nil { + return err + } + } + + pb, err := r.prepareRequest(m) + if err != nil { + return err + } + + pb.Points = append(pb.Points, r.addFloat("runtime.go.cpu.user", m.ProcessCPU.User-r.stored.ProcessCPU.User, metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("runtime.go.cpu.sys", m.ProcessCPU.System-r.stored.ProcessCPU.System, metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("runtime.go.gc.count", float64(m.Runtime.NumGC-r.stored.Runtime.NumGC), metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("runtime.go.goroutine", float64(m.Runtime.NumGoroutine), metricspb.MetricKind_GAUGE, intervals)) + + pb.Points = append(pb.Points, r.addFloat("mem.available", float64(m.Memory.Available), metricspb.MetricKind_GAUGE, intervals)) + pb.Points = append(pb.Points, r.addFloat("mem.total", float64(m.Memory.Total), metricspb.MetricKind_GAUGE, intervals)) + pb.Points = append(pb.Points, r.addFloat("runtime.go.mem.heap_alloc", float64(m.Memory.HeapAlloc), metricspb.MetricKind_GAUGE, intervals)) + + for label, cpu := range m.CPU { + pb.Points = append(pb.Points, r.addFloat("cpu.sys", cpu.System-r.stored.CPU[label].System, metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("cpu.user", cpu.User-r.stored.CPU[label].User, metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("cpu.total", cpu.Total-r.stored.CPU[label].Total, metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("cpu.usage", cpu.Usage-r.stored.CPU[label].Usage, metricspb.MetricKind_COUNTER, intervals)) + } + for label, nic := range m.NIC { + pb.Points = append(pb.Points, r.addFloat("net.bytes_recv", float64(nic.BytesReceived-r.stored.NIC[label].BytesReceived), metricspb.MetricKind_COUNTER, intervals)) + pb.Points = append(pb.Points, r.addFloat("net.bytes_sent", float64(nic.BytesSent-r.stored.NIC[label].BytesSent), metricspb.MetricKind_COUNTER, intervals)) + } + // ingest drops metrics with duration greater than defaultMaxDuration + if (intervals * int64(r.measurementDuration.Seconds())) <= int64(defaultMaxDuration.Seconds()) { + err = r.send(ctx, pb) + if err != nil { + return err + } + } + + r.stored = m + r.MetricsCount = len(pb.Points) + r.End = time.Now() + return nil +} + +func (r *Reporter) send(ctx context.Context, ingestRequest *metricspb.IngestRequest) error { + b, err := proto.Marshal(ingestRequest) + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, r.address, bytes.NewReader(b)) + if err != nil { + return err + } + + req = req.WithContext(ctx) + + req.Header.Set(contentTypeHeader, protoContentType) + req.Header.Set(acceptHeader, protoContentType) + req.Header.Set(accessTokenHeader, r.accessToken) + + retries := uint(0) + waited := 0 + for { + res, err := r.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode == http.StatusOK { + return nil + } + if !retryable(res.StatusCode) { + return fmt.Errorf("request to %s failed: %d", r.address, res.StatusCode) + } + if (time.Duration(waited) * time.Millisecond) > r.timeout { + return fmt.Errorf("request to %s failed: too many retries", r.address) + } + res.Body.Close() + retries++ + backoff := calculateBackoff(retries) + time.Sleep(time.Duration(backoff) * time.Millisecond) + waited += backoff + } +} + +type ReporterOption func(*config) + +// WithReporterTracerID sets the tracer ID reported back to LightStep +func WithReporterTracerID(tracerID uint64) ReporterOption { + return func(c *config) { + c.tracerID = tracerID + } +} + +// WithReporterAttributes sets attributes reported back to LightStep +func WithReporterAttributes(attributes map[string]string) ReporterOption { + return func(c *config) { + c.attributes = make(map[string]string, len(attributes)) + for k, v := range attributes { + c.attributes[k] = v + } + } +} + +// WithReporterAddress sets the address of the LightStep endpoint +func WithReporterAddress(address string) ReporterOption { + return func(c *config) { + c.address = address + } +} + +// WithReporterTimeout sets the timeout when communicating with LightStep +func WithReporterTimeout(timeout time.Duration) ReporterOption { + return func(c *config) { + if timeout > 0 { + c.timeout = timeout + } + } +} + +// WithReporterMeasurementDuration sets the duration reported back to LightStep +func WithReporterMeasurementDuration(measurementDuration time.Duration) ReporterOption { + return func(c *config) { + if measurementDuration > 0 { + c.measurementDuration = measurementDuration + } + } +} + +// WithReporterAccessToken sets an access token for communicating with LightStep +func WithReporterAccessToken(accessToken string) ReporterOption { + return func(c *config) { + c.accessToken = accessToken + } +} + +type config struct { + tracerID uint64 + attributes map[string]string + address string + timeout time.Duration + measurementDuration time.Duration + accessToken string +} + +func newConfig(opts ...ReporterOption) config { + var c config + + defaultOpts := []ReporterOption{ + WithReporterAttributes(make(map[string]string)), + WithReporterAddress(defaultReporterAddress), + WithReporterTimeout(defaultReporterTimeout), + WithReporterMeasurementDuration(defaultReporterMeasurementDuration), + } + + for _, opt := range append(defaultOpts, opts...) { + opt(&c) + } + + return c +} + +func generateIdempotencyKey() (string, error) { + b := make([]byte, idempotencyKeyByteLength) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return "", err + } + + return strings.ToLower(base32.StdEncoding.EncodeToString(b)), nil +} + +func retryable(code int) bool { + return code == http.StatusTooManyRequests || + code == http.StatusBadGateway || + code == http.StatusGatewayTimeout || + code == http.StatusServiceUnavailable || + code == http.StatusRequestTimeout +} + +func calculateBackoff(retries uint) int { + secondInMillis := 1000 + multiplier := 1 << (retries - 1) + return (multiplier * mathrand.Intn(secondInMillis)) + (multiplier * secondInMillis) +} diff --git a/internal/metrics/reporter_test.go b/internal/metrics/reporter_test.go new file mode 100644 index 000000000..71061cb5f --- /dev/null +++ b/internal/metrics/reporter_test.go @@ -0,0 +1,147 @@ +package metrics_test + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/lightstep/lightstep-tracer-common/golang/gogo/metricspb" + "github.com/lightstep/lightstep-tracer-go/internal/metrics" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ingestRequest metricspb.IngestRequest +var server *httptest.Server +var url string +var statusCode int + +var _ = BeforeSuite(func() { + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + status := http.StatusOK + if statusCode != 0 { + status = statusCode + } + w.WriteHeader(status) + body, _ := ioutil.ReadAll(r.Body) + err := proto.Unmarshal(body, &ingestRequest) + if !Expect(err).To(BeNil()) { + return + } + }) + server = httptest.NewServer(h) + url = fmt.Sprintf("http://%s", server.Listener.Addr().String()) +}) + +var _ = AfterSuite(func() { + server.Close() +}) + +var _ = Describe("Reporter", func() { + var reporter *metrics.Reporter + + BeforeEach(func() { + reporter = metrics.NewReporter( + metrics.WithReporterAddress(url), + ) + ingestRequest = metricspb.IngestRequest{} + }) + + Describe("Measure", func() { + It("should return an IngestRequest", func() { + err := reporter.Measure(context.Background(), 1) + if !Expect(err).To(BeNil()) { + return + } + // check expected metrics are present and of the right type + points := ingestRequest.GetPoints() + + expected := map[string]interface{}{ + "cpu.user": metricspb.MetricKind_COUNTER, + "cpu.sys": metricspb.MetricKind_COUNTER, + "cpu.usage": metricspb.MetricKind_COUNTER, + "cpu.total": metricspb.MetricKind_COUNTER, + "net.bytes_sent": metricspb.MetricKind_COUNTER, + "net.bytes_recv": metricspb.MetricKind_COUNTER, + "mem.total": metricspb.MetricKind_GAUGE, + "mem.available": metricspb.MetricKind_GAUGE, + "runtime.go.cpu.user": metricspb.MetricKind_COUNTER, + "runtime.go.cpu.sys": metricspb.MetricKind_COUNTER, + "runtime.go.mem.heap_alloc": metricspb.MetricKind_GAUGE, + "runtime.go.gc.count": metricspb.MetricKind_COUNTER, + "runtime.go.goroutine": metricspb.MetricKind_GAUGE, + } + Expect(points).To(HaveLen(len(expected))) + for _, point := range points { + name := point.GetMetricName() + Expect(point.Kind).To(Equal(expected[name])) + } + }) + }) + Describe("Measure fails unretryably", func() { + It("should return an error", func() { + var unretryable = []struct { + code int + expected string + }{ + {http.StatusBadRequest, fmt.Sprintf("%d", http.StatusBadRequest)}, + {http.StatusUnauthorized, fmt.Sprintf("%d", http.StatusUnauthorized)}, + {http.StatusForbidden, fmt.Sprintf("%d", http.StatusForbidden)}, + {http.StatusNotFound, fmt.Sprintf("%d", http.StatusNotFound)}, + {http.StatusNotImplemented, fmt.Sprintf("%d", http.StatusNotImplemented)}, + } + for _, t := range unretryable { + statusCode = t.code + err := reporter.Measure(context.Background(), 1) + Expect(err).To(Not(BeNil())) + Expect(err.Error()).To(ContainSubstring(t.expected)) + } + }) + }) + Describe("Measure fails retryably", func() { + It("should return after retrying", func() { + var retryable = []struct { + code int + expected string + }{ + {http.StatusTooManyRequests, "context deadline exceeded"}, + {http.StatusBadGateway, "context deadline exceeded"}, + {http.StatusGatewayTimeout, "context deadline exceeded"}, + {http.StatusServiceUnavailable, "context deadline exceeded"}, + {http.StatusRequestTimeout, "context deadline exceeded"}, + } + reporter = metrics.NewReporter( + metrics.WithReporterAddress(url), + metrics.WithReporterTimeout(10*time.Millisecond), + ) + for _, t := range retryable { + statusCode = t.code + err := reporter.Measure(context.Background(), 1) + Expect(err).To(Not(BeNil())) + Expect(err.Error()).To(ContainSubstring(t.expected)) + } + }) + }) + Describe("Duration exceeds max", func() { + It("should not send a report", func() { + tenMinutesOfIntervals := int64(21) // 21 * 30s > 10min + err := reporter.Measure(context.Background(), tenMinutesOfIntervals) + if !Expect(err).To(BeNil()) { + return + } + // check expected metrics are present and of the right type + points := ingestRequest.GetPoints() + Expect(points).To(HaveLen(0)) + }) + }) +}) + +func TestLightstepMetricsGo(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "LightstepMetricsGo Suite") +} diff --git a/lightstepoc/options.go b/lightstepoc/options.go index 2fb9555e3..34af0fb32 100644 --- a/lightstepoc/options.go +++ b/lightstepoc/options.go @@ -2,6 +2,7 @@ package lightstepoc import ( "github.com/lightstep/lightstep-tracer-go" + "github.com/lightstep/lightstep-tracer-go/constants" "github.com/opentracing/opentracing-go" ) @@ -58,7 +59,7 @@ func WithMetaEventReportingEnabled(metaEventReportingEnabled bool) Option { func WithComponentName(componentName string) Option { return func(c *config) { if componentName != "" { - c.tracerOptions.Tags[lightstep.ComponentNameKey] = componentName + c.tracerOptions.Tags[constants.ComponentNameKey] = componentName } } } diff --git a/options.go b/options.go index ff385b7c8..390f65e51 100644 --- a/options.go +++ b/options.go @@ -9,6 +9,7 @@ import ( "strings" "time" // N.B.(jmacd): Do not use google.golang.org/glog in this package. + "github.com/lightstep/lightstep-tracer-go/constants" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" ) @@ -20,6 +21,11 @@ const ( DefaultSecurePort = 443 DefaultGRPCCollectorHost = "collector-grpc.lightstep.com" + DefaultSystemMetricsHost = "ingest.lightstep.com" + + DefaultSystemMetricsMeasurementFrequency = 30 * time.Second + DefaultSystemMetricsTimeout = 5 * time.Second + DefaultMaxReportingPeriod = 2500 * time.Millisecond DefaultMinReportingPeriod = 500 * time.Millisecond DefaultMaxSpans = 1000 @@ -35,11 +41,12 @@ const ( // Tag and Tracer Attribute keys. const ( - ParentSpanGUIDKey = "parent_span_guid" // ParentSpanGUIDKey is the tag key used to record the relationship between child and parent spans. - ComponentNameKey = "lightstep.component_name" - GUIDKey = "lightstep.guid" // <- runtime guid, not span guid - HostnameKey = "lightstep.hostname" + ParentSpanGUIDKey = "parent_span_guid" // ParentSpanGUIDKey is the tag key used to record the relationship between child and parent spans. + ComponentNameKey = constants.ComponentNameKey // NOTE: these will be deprecated in favour of the constants package + GUIDKey = "lightstep.guid" // <- runtime guid, not span guid + HostnameKey = constants.HostnameKey // NOTE: these will be deprecated in favour of the constants package CommandLineKey = "lightstep.command_line" + ServiceVersionKey = constants.ServiceVersionKey // NOTE: these will be deprecated in favour of the constants package TracerPlatformKey = "lightstep.tracer_platform" TracerPlatformValue = "go" @@ -106,6 +113,13 @@ func (e Endpoint) scheme() string { return secureScheme } +type SystemMetricsOptions struct { + Disabled bool `yaml:"disabled"` + Endpoint Endpoint `yaml:"endpoint"` + MeasurementFrequency time.Duration `yaml:"measurement_frequency"` + Timeout time.Duration `yaml:"timeout"` +} + // Options control how the LightStep Tracer behaves. type Options struct { // AccessToken is the unique API key for your LightStep project. It is @@ -198,6 +212,8 @@ type Options struct { // Enable LightStep Meta Event Logging MetaEventReportingEnabled bool `yaml:"meta_event_reporting_enabled" json:"meta_event_reporting_enabled"` + + SystemMetrics SystemMetricsOptions `yaml:"system_metrics"` } // Initialize validates options, and sets default values for unset options. @@ -241,12 +257,12 @@ func (opts *Options) Initialize() error { } // Set some default attributes if not found in options - if _, found := opts.Tags[ComponentNameKey]; !found { - opts.Tags[ComponentNameKey] = path.Base(os.Args[0]) + if _, found := opts.Tags[constants.ComponentNameKey]; !found { + opts.Tags[constants.ComponentNameKey] = path.Base(os.Args[0]) } - if _, found := opts.Tags[HostnameKey]; !found { + if _, found := opts.Tags[constants.HostnameKey]; !found { hostname, _ := os.Hostname() - opts.Tags[HostnameKey] = hostname + opts.Tags[constants.HostnameKey] = hostname } if _, found := opts.Tags[CommandLineKey]; !found { opts.Tags[CommandLineKey] = strings.Join(os.Args, " ") @@ -266,6 +282,26 @@ func (opts *Options) Initialize() error { } } + if opts.SystemMetrics.Endpoint.Host == "" { + opts.SystemMetrics.Endpoint.Host = DefaultSystemMetricsHost + } + + if opts.SystemMetrics.Endpoint.Port <= 0 { + opts.SystemMetrics.Endpoint.Port = DefaultSecurePort + + if opts.SystemMetrics.Endpoint.Plaintext { + opts.SystemMetrics.Endpoint.Port = DefaultPlainPort + } + } + + if opts.SystemMetrics.MeasurementFrequency <= 0 { + opts.SystemMetrics.MeasurementFrequency = DefaultSystemMetricsMeasurementFrequency + } + + if opts.SystemMetrics.Timeout <= 0 { + opts.SystemMetrics.Timeout = DefaultSystemMetricsTimeout + } + return nil } @@ -281,6 +317,13 @@ func (opts *Options) Validate() error { return err } } + + if !opts.SystemMetrics.Disabled && len(opts.SystemMetrics.Endpoint.CustomCACertFile) != 0 { + if _, err := os.Stat(opts.SystemMetrics.Endpoint.CustomCACertFile); os.IsNotExist(err) { + return err + } + } + return nil } diff --git a/tracer.go b/tracer.go index 04fb9fa58..395732cd7 100644 --- a/tracer.go +++ b/tracer.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/lightstep/lightstep-tracer-go/constants" + "github.com/lightstep/lightstep-tracer-go/internal/metrics" "github.com/opentracing/opentracing-go" ) @@ -42,14 +44,18 @@ type tracerImpl struct { opts Options // report loop management - closeOnce sync.Once - closeReportLoopChannel chan struct{} - reportLoopClosedChannel chan struct{} + closeOnce sync.Once + closeReportLoopChannel chan struct{} + closeSystemMetricsLoopChannel chan struct{} + reportLoopClosedChannel chan struct{} converter *protoConverter accessToken string attributes map[string]string + metricsReporter *metrics.Reporter + metricsMeasurementFrequency time.Duration + ////////////////////////////////////////////////////////// // MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE ////////////////////////////////////////////////////////// @@ -116,10 +122,11 @@ func CreateTracer(opts Options) (Tracer, error) { attributes[TracerPlatformVersionKey] = runtime.Version() attributes[TracerVersionKey] = TracerVersionValue + tracerID := genSeededGUID() now := time.Now() impl := &tracerImpl{ opts: opts, - reporterID: genSeededGUID(), + reporterID: tracerID, buffer: newSpansBuffer(opts.MaxBufferedSpans), flushing: newSpansBuffer(opts.MaxBufferedSpans), closeReportLoopChannel: make(chan struct{}), @@ -127,6 +134,22 @@ func CreateTracer(opts Options) (Tracer, error) { converter: newProtoConverter(opts), accessToken: opts.AccessToken, attributes: attributes, + metricsReporter: metrics.NewReporter( + metrics.WithReporterTracerID(tracerID), + metrics.WithReporterAccessToken(opts.AccessToken), + metrics.WithReporterTimeout(opts.SystemMetrics.Timeout), + metrics.WithReporterAddress(opts.SystemMetrics.Endpoint.urlWithoutPath()), + metrics.WithReporterAttributes(map[string]string{ + metrics.ReporterPlatformKey: TracerPlatformValue, + metrics.ReporterPlatformVersionKey: runtime.Version(), + metrics.ReporterVersionKey: TracerVersionValue, + constants.HostnameKey: attributes[constants.HostnameKey], + constants.ServiceVersionKey: attributes[constants.ServiceVersionKey], + constants.ComponentNameKey: attributes[constants.ComponentNameKey], + }), + metrics.WithReporterMeasurementDuration(opts.SystemMetrics.MeasurementFrequency), + ), + metricsMeasurementFrequency: opts.SystemMetrics.MeasurementFrequency, } impl.buffer.setCurrent(now) @@ -158,6 +181,10 @@ func CreateTracer(opts Options) (Tracer, error) { impl.propagators[builtin] = propagator } + if !opts.SystemMetrics.Disabled { + go impl.systemMetricsLoop() + } + return impl, nil } @@ -222,7 +249,12 @@ func (tracer *tracerImpl) reconnectClient(now time.Time) { func (tracer *tracerImpl) Close(ctx context.Context) { tracer.closeOnce.Do(func() { // notify report loop that we are closing - close(tracer.closeReportLoopChannel) + if tracer.closeReportLoopChannel != nil { + close(tracer.closeReportLoopChannel) + } + if tracer.closeSystemMetricsLoopChannel != nil { + close(tracer.closeSystemMetricsLoopChannel) + } select { case <-tracer.reportLoopClosedChannel: tracer.Flush(ctx) @@ -448,3 +480,41 @@ func (tracer *tracerImpl) reportLoop() { } } } + +func (tracer *tracerImpl) systemMetricsLoop() { + ticker := time.NewTicker(tracer.metricsMeasurementFrequency) + intervals := int64(1) + + measure := func(intervals int64) int64 { + ctx, cancel := context.WithTimeout(context.Background(), tracer.metricsMeasurementFrequency) + defer cancel() + + if err := tracer.metricsReporter.Measure(ctx, intervals); err != nil { + emitEvent(newEventSystemMetricsMeasurementFailed(err)) + intervals++ + } else { + intervals = 1 + } + emitEvent(newEventSystemMetricsStatusReport( + tracer.metricsReporter.Start, + tracer.metricsReporter.End, + tracer.metricsReporter.MetricsCount), + ) + return intervals + } + + intervals = measure(intervals) + + for { + select { + case <-ticker.C: + if tracer.disabled { + return + } + + intervals = measure(intervals) + case <-tracer.closeSystemMetricsLoopChannel: + return + } + } +} diff --git a/tracer_test.go b/tracer_test.go index b205f6225..e1533e8e7 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -36,6 +36,7 @@ var _ = Describe("Tracer", func() { BeforeEach(func() { opts.UseGRPC = true + opts.SystemMetrics.Disabled = true fakeClient = new(collectorpbfakes.FakeCollectorServiceClient) fakeClient.ReportReturns(&collectorpb.ReportResponse{}, nil) diff --git a/version.go b/version.go index d0c208c78..391a7ddd8 100644 --- a/version.go +++ b/version.go @@ -1,4 +1,4 @@ package lightstep // TracerVersionValue provides the current version of the lightstep-tracer-go release -const TracerVersionValue = "0.19.0" +const TracerVersionValue = "0.20.0"