diff --git a/prometheus-to-sd/translator/prometheus.go b/prometheus-to-sd/translator/prometheus.go index f49d39399..517c76d2b 100644 --- a/prometheus-to-sd/translator/prometheus.go +++ b/prometheus-to-sd/translator/prometheus.go @@ -17,9 +17,10 @@ limitations under the License. package translator import ( + "bytes" "crypto/tls" "fmt" - "io/ioutil" + "io" "net/http" "strings" "time" @@ -34,7 +35,8 @@ const customMetricsPrefix = "custom.googleapis.com" // PrometheusResponse represents unprocessed response from Prometheus endpoint. type PrometheusResponse struct { - rawResponse string + rawResponse []byte + header http.Header } var prometheusClient *http.Client @@ -71,14 +73,14 @@ func getPrometheusMetrics(config *config.SourceConfig) (*PrometheusResponse, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body - %v", err) } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("request failed - %q, response: %q", resp.Status, string(body)) } - return &PrometheusResponse{rawResponse: string(body)}, nil + return &PrometheusResponse{rawResponse: body, header: resp.Header}, nil } func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Response, err error) { @@ -86,6 +88,7 @@ func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Respons if err != nil { return nil, err } + request.Header.Set("Accept", string(expfmt.FmtProtoDelim)) if len(auth.Username) > 0 { request.SetBasicAuth(auth.Username, auth.Password) } else if len(auth.Token) > 0 { @@ -96,11 +99,24 @@ func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Respons // Build performs parsing and processing of the prometheus metrics response. func (p *PrometheusResponse) Build(config *config.CommonConfig, metricDescriptorCache *MetricDescriptorCache) (map[string]*dto.MetricFamily, error) { - parser := &expfmt.TextParser{} - metrics, err := parser.TextToMetricFamilies(strings.NewReader(p.rawResponse)) - if err != nil { - return nil, err + format := expfmt.ResponseFormat(p.header) + if format == expfmt.FmtUnknown { + return nil, fmt.Errorf("failed to parse format from header: %s", p.header.Get("Content-Type")) } + decoder := expfmt.NewDecoder(bytes.NewReader(p.rawResponse), format) + metrics := make(map[string]*dto.MetricFamily) + for { + metric := &dto.MetricFamily{} + err := decoder.Decode(metric) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + metrics[metric.GetName()] = metric + } + if config.OmitComponentName { metrics = OmitComponentName(metrics, config.SourceConfig.Component) } diff --git a/prometheus-to-sd/translator/translator.go b/prometheus-to-sd/translator/translator.go index 2d86b963d..5da71b76f 100644 --- a/prometheus-to-sd/translator/translator.go +++ b/prometheus-to-sd/translator/translator.go @@ -358,11 +358,13 @@ func convertToDistributionValue(h *dto.Histogram) *v3.Distribution { prevVal := uint64(0) lower := float64(0) + infSeen := false for _, b := range h.Bucket { upper := b.GetUpperBound() if !math.IsInf(b.GetUpperBound(), 1) { bounds = append(bounds, b.GetUpperBound()) } else { + infSeen = true upper = lower } val := b.GetCumulativeCount() - prevVal @@ -375,6 +377,11 @@ func convertToDistributionValue(h *dto.Histogram) *v3.Distribution { prevVal = b.GetCumulativeCount() } + // +Inf Bucket is implicit so it needs to be added + if !infSeen && count > int64(prevVal) { + values = append(values, count-int64(prevVal)) + } + return &v3.Distribution{ Count: count, Mean: mean, diff --git a/prometheus-to-sd/translator/translator_test.go b/prometheus-to-sd/translator/translator_test.go index 297d3147d..4b88fa960 100644 --- a/prometheus-to-sd/translator/translator_test.go +++ b/prometheus-to-sd/translator/translator_test.go @@ -18,6 +18,7 @@ package translator import ( "math" + "net/http" "reflect" "sort" "strings" @@ -82,7 +83,7 @@ var testLabelValue2 = "labelValue2" var now = time.Now() -var metricsResponse = &PrometheusResponse{rawResponse: ` +var metricsResponse = &PrometheusResponse{rawResponse: []byte(` # TYPE test_name counter test_name{labelName="labelValue1"} 42.0 test_name{labelName="labelValue2"} 106.0 @@ -105,8 +106,7 @@ test_histogram_sum 13.0 test_histogram_count 5 # TYPE untyped_metric untyped untyped_metric 98.6 -`, -} +`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}} var metrics = map[string]*dto.MetricFamily{ testMetricName: { @@ -704,7 +704,7 @@ func TestTranslatePrometheusToStackdriverWithLabelFiltering(t *testing.T) { } func TestTranslateSummary(t *testing.T) { - var intSummaryMetricsResponse = &PrometheusResponse{rawResponse: ` + var intSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(` # TYPE process_start_time_seconds gauge process_start_time_seconds 1234567890 # TYPE int_summary_metric summary @@ -713,8 +713,8 @@ int_summary_metric{quantile="0.9"} 8 int_summary_metric{quantile="0.99"} 8 int_summary_metric_sum 42 int_summary_metric_count 101010 -`} - var floatSummaryMetricsResponse = &PrometheusResponse{rawResponse: ` +`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}} + var floatSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(` # TYPE process_start_time_seconds gauge process_start_time_seconds 1234567890 # TYPE float_summary_metric summary @@ -723,8 +723,8 @@ float_summary_metric{quantile="0.9"} 8.123 float_summary_metric{quantile="0.99"} 8.123 float_summary_metric_sum 0.42 float_summary_metric_count 50 -`} - var labelIntSummaryMetricsResponse = &PrometheusResponse{rawResponse: ` +`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}} + var labelIntSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(` # TYPE process_start_time_seconds gauge process_start_time_seconds 1234567890 # TYPE int_summary_metric summary @@ -738,7 +738,7 @@ int_summary_metric_sum{label="l1"} 7 int_summary_metric_sum{label="l2"} 8 int_summary_metric_count{label="l1"} 9 int_summary_metric_count{label="l2"} 10 -`} +`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}} type summaryTest struct { description string @@ -928,7 +928,7 @@ func createDoublePoint(d float64, start time.Time, end time.Time) *v3.Point { func TestUpdateScrapes(t *testing.T) { tsb := NewTimeSeriesBuilder(CommonConfigWithMetrics([]string{testMetricName, floatMetricName}), buildCacheForTesting()) - scrape := &PrometheusResponse{rawResponse: ` + scrape := &PrometheusResponse{rawResponse: []byte(` # TYPE test_name counter test_name{labelName="labelValue1"} 42.0 test_name{labelName="labelValue2"} 106.0 @@ -936,17 +936,15 @@ test_name{labelName="labelValue2"} 106.0 float_metric 123.17 # TYPE test_name counter process_start_time_seconds 1234567890.0 -`, - } +`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}} tsb.Update(scrape, now) - scrape = &PrometheusResponse{rawResponse: ` + scrape = &PrometheusResponse{rawResponse: []byte(` # TYPE test_name counter test_name{labelName="labelValue1"} 42.0 test_name{labelName="labelValue2"} 601.0 # TYPE process_start_time_seconds gauge process_start_time_seconds 1234567890.0 -`, - } +`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}} tsb.Update(scrape, now) ts, timestamp, err := tsb.Build() assert.Equal(t, timestamp, now)