diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..e3a67a0 --- /dev/null +++ b/client/client.go @@ -0,0 +1,227 @@ +package client + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "strings" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/prometheus-community/pushprox/util" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + scrapeErrorCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "pushprox_client_scrape_errors_total", + Help: "Number of scrape errors", + }, + ) + pushErrorCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "pushprox_client_push_errors_total", + Help: "Number of push errors", + }, + ) + pollErrorCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "pushprox_client_poll_errors_total", + Help: "Number of poll errors", + }, + ) +) + +func init() { + prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter) +} + +func DefaultBackoff() backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.InitialInterval = 1 * time.Second + b.Multiplier = 1.5 + b.MaxInterval = 5 * time.Second + b.MaxElapsedTime = time.Duration(0) + return b +} + +// Coordinator for scrape requests and responses +type Coordinator struct { + logger *slog.Logger + client *http.Client + bo backoff.BackOff + fqdn string + proxyUrl string +} + +func NewCoordinator(logger *slog.Logger, bo backoff.BackOff, client *http.Client, fqdn, proxyURL string) (*Coordinator, error) { + if fqdn == "" { + return nil, errors.New("fqdn must be specified") + } + if proxyURL == "" { + return nil, errors.New("proxyURL must be specified") + } + if bo == nil { + logger.Warn("No backoff provided, using default") + bo = DefaultBackoff() + } + c := &Coordinator{ + logger: logger, + client: client, + bo: bo, + fqdn: fqdn, + proxyUrl: proxyURL, + } + return c, nil +} + +func (c *Coordinator) Start(ctx context.Context) { + c.loop(ctx) +} + +func (c *Coordinator) handleErr(request *http.Request, err error) { + c.logger.Error("Coordinator error", "error", err) + scrapeErrorCounter.Inc() + resp := &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(strings.NewReader(err.Error())), + Header: http.Header{}, + } + if err = c.doPush(resp, request); err != nil { + pushErrorCounter.Inc() + c.logger.Warn("Failed to push failed scrape response:", "err", err) + return + } + c.logger.Info("Pushed failed scrape response") +} + +func (c *Coordinator) doScrape(request *http.Request) { + logger := c.logger.With("scrape_id", request.Header.Get("id")) + timeout, err := util.GetHeaderTimeout(request.Header) + if err != nil { + c.handleErr(request, err) + return + } + ctx, cancel := context.WithTimeout(request.Context(), timeout) + defer cancel() + request = request.WithContext(ctx) + // We cannot handle https requests at the proxy, as we would only + // see a CONNECT, so use a URL parameter to trigger it. + params := request.URL.Query() + if params.Get("_scheme") == "https" { + request.URL.Scheme = "https" + params.Del("_scheme") + request.URL.RawQuery = params.Encode() + } + + if request.URL.Hostname() != c.fqdn { + c.handleErr(request, errors.New("scrape target doesn't match client fqdn")) + return + } + + scrapeResp, err := c.client.Do(request) + if err != nil { + c.handleErr(request, fmt.Errorf("failed to scrape %s: %w", request.URL.String(), err)) + return + } + logger.Info("Retrieved scrape response") + if err = c.doPush(scrapeResp, request); err != nil { + pushErrorCounter.Inc() + logger.Warn("Failed to push scrape response:", "err", err) + return + } + logger.Info("Pushed scrape result") +} + +// Report the result of the scrape back up to the proxy. +func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request) error { + resp.Header.Set("id", origRequest.Header.Get("id")) // Link the request and response + // Remaining scrape deadline. + deadline, _ := origRequest.Context().Deadline() + resp.Header.Set("X-Prometheus-Scrape-Timeout", fmt.Sprintf("%f", float64(time.Until(deadline))/1e9)) + + base, err := url.Parse(c.proxyUrl) + if err != nil { + return err + } + u, err := url.Parse("push") + if err != nil { + return err + } + url := base.ResolveReference(u) + + buf := &bytes.Buffer{} + //nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111 + resp.Write(buf) + request := &http.Request{ + Method: "POST", + URL: url, + Body: io.NopCloser(buf), + ContentLength: int64(buf.Len()), + } + request = request.WithContext(origRequest.Context()) + if _, err = c.client.Do(request); err != nil { + return err + } + return nil +} + +func (c *Coordinator) doPoll(ctx context.Context) error { + base, err := url.Parse(c.proxyUrl) + if err != nil { + c.logger.Error("Error parsing url:", "err", err) + return fmt.Errorf("error parsing url: %w", err) + } + u, err := url.Parse("poll") + if err != nil { + c.logger.Error("Error parsing url:", "err", err) + return fmt.Errorf("error parsing url poll: %w", err) + } + pollUrl := base.ResolveReference(u) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, pollUrl.String(), strings.NewReader(c.fqdn)) + if err != nil { + c.logger.Error("Error creating request:", "err", err) + } + resp, err := c.client.Do(req) + if err != nil { + c.logger.Error("Error polling:", "err", err) + return fmt.Errorf("error polling: %w", err) + } + defer resp.Body.Close() + + request, err := http.ReadRequest(bufio.NewReader(resp.Body)) + if err != nil { + c.logger.Error("Error reading request:", "err", err) + return fmt.Errorf("error reading request: %w", err) + } + c.logger.Info("Got scrape request", "scrape_id", request.Header.Get("id"), "url", request.URL) + + request.RequestURI = "" + + go c.doScrape(request) + + return nil +} + +func (c *Coordinator) loop(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + op := func() error { + return c.doPoll(ctx) + } + + for ctx.Err() == nil { + if err := backoff.RetryNotify(op, c.bo, func(err error, _ time.Duration) { + pollErrorCounter.Inc() + }); err != nil { + c.logger.Error("backoff returned error", "error", err) + } + } +} diff --git a/cmd/client/main_test.go b/client/client_test.go similarity index 82% rename from cmd/client/main_test.go rename to client/client_test.go index 2625461..2929c1e 100644 --- a/cmd/client/main_test.go +++ b/client/client_test.go @@ -11,9 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package client import ( + "context" "errors" "fmt" "net/http" @@ -23,13 +24,13 @@ import ( "github.com/prometheus/common/promslog" ) -func prepareTest() (*httptest.Server, Coordinator) { +func prepareTest() (*httptest.Server, *Coordinator) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintln(w, "GET /index.html HTTP/1.0\n\nOK") })) - c := Coordinator{logger: promslog.NewNopLogger()} - *proxyURL = ts.URL + c, _ := NewCoordinator(promslog.NewNopLogger(), nil, ts.Client(), ts.URL, ts.URL) + return ts, c } @@ -42,8 +43,7 @@ func TestDoScrape(t *testing.T) { t.Fatal(err) } req.Header.Add("X-Prometheus-Scrape-Timeout-Seconds", "10.0") - *myFqdn = ts.URL - c.doScrape(req, ts.Client()) + c.doScrape(req) } func TestHandleErr(t *testing.T) { @@ -54,13 +54,13 @@ func TestHandleErr(t *testing.T) { if err != nil { t.Fatal(err) } - c.handleErr(req, ts.Client(), errors.New("test error")) + c.handleErr(req, errors.New("test error")) } func TestLoop(t *testing.T) { ts, c := prepareTest() defer ts.Close() - if err := c.doPoll(ts.Client()); err != nil { + if err := c.doPoll(context.Background()); err != nil { t.Fatal(err) } } diff --git a/cmd/client/main.go b/cmd/client/main.go index efe6de2..5186ae7 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -14,18 +14,11 @@ package main import ( - "bufio" - "bytes" "context" "crypto/tls" "crypto/x509" - "errors" - "fmt" - "io" - "log/slog" "net" "net/http" - "net/url" "os" "strings" "time" @@ -33,8 +26,7 @@ import ( "github.com/Showmax/go-fqdn" "github.com/alecthomas/kingpin/v2" "github.com/cenkalti/backoff/v4" - "github.com/prometheus-community/pushprox/util" - "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus-community/pushprox/client" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promslog" "github.com/prometheus/common/promslog/flag" @@ -52,31 +44,6 @@ var ( retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration() ) -var ( - scrapeErrorCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "pushprox_client_scrape_errors_total", - Help: "Number of scrape errors", - }, - ) - pushErrorCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "pushprox_client_push_errors_total", - Help: "Number of push errors", - }, - ) - pollErrorCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "pushprox_client_poll_errors_total", - Help: "Number of poll errors", - }, - ) -) - -func init() { - prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter) -} - func newBackOffFromFlags() backoff.BackOff { b := backoff.NewExponentialBackOff() b.InitialInterval = *retryInitialWait @@ -85,167 +52,26 @@ func newBackOffFromFlags() backoff.BackOff { b.MaxElapsedTime = time.Duration(0) return b } - -// Coordinator for scrape requests and responses -type Coordinator struct { - logger *slog.Logger -} - -func (c *Coordinator) handleErr(request *http.Request, client *http.Client, err error) { - c.logger.Error("Coordinator error", "error", err) - scrapeErrorCounter.Inc() - resp := &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: io.NopCloser(strings.NewReader(err.Error())), - Header: http.Header{}, - } - if err = c.doPush(resp, request, client); err != nil { - pushErrorCounter.Inc() - c.logger.Warn("Failed to push failed scrape response:", "err", err) - return - } - c.logger.Info("Pushed failed scrape response") -} - -func (c *Coordinator) doScrape(request *http.Request, client *http.Client) { - logger := c.logger.With("scrape_id", request.Header.Get("id")) - timeout, err := util.GetHeaderTimeout(request.Header) - if err != nil { - c.handleErr(request, client, err) - return - } - ctx, cancel := context.WithTimeout(request.Context(), timeout) - defer cancel() - request = request.WithContext(ctx) - // We cannot handle https requests at the proxy, as we would only - // see a CONNECT, so use a URL parameter to trigger it. - params := request.URL.Query() - if params.Get("_scheme") == "https" { - request.URL.Scheme = "https" - params.Del("_scheme") - request.URL.RawQuery = params.Encode() - } - - if request.URL.Hostname() != *myFqdn { - c.handleErr(request, client, errors.New("scrape target doesn't match client fqdn")) - return - } - - scrapeResp, err := client.Do(request) - if err != nil { - c.handleErr(request, client, fmt.Errorf("failed to scrape %s: %w", request.URL.String(), err)) - return - } - logger.Info("Retrieved scrape response") - if err = c.doPush(scrapeResp, request, client); err != nil { - pushErrorCounter.Inc() - logger.Warn("Failed to push scrape response:", "err", err) - return - } - logger.Info("Pushed scrape result") -} - -// Report the result of the scrape back up to the proxy. -func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request, client *http.Client) error { - resp.Header.Set("id", origRequest.Header.Get("id")) // Link the request and response - // Remaining scrape deadline. - deadline, _ := origRequest.Context().Deadline() - resp.Header.Set("X-Prometheus-Scrape-Timeout", fmt.Sprintf("%f", float64(time.Until(deadline))/1e9)) - - base, err := url.Parse(*proxyURL) - if err != nil { - return err - } - u, err := url.Parse("push") - if err != nil { - return err - } - url := base.ResolveReference(u) - - buf := &bytes.Buffer{} - //nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111 - resp.Write(buf) - request := &http.Request{ - Method: "POST", - URL: url, - Body: io.NopCloser(buf), - ContentLength: int64(buf.Len()), - } - request = request.WithContext(origRequest.Context()) - if _, err = client.Do(request); err != nil { - return err - } - return nil -} - -func (c *Coordinator) doPoll(client *http.Client) error { - base, err := url.Parse(*proxyURL) - if err != nil { - c.logger.Error("Error parsing url:", "err", err) - return fmt.Errorf("error parsing url: %w", err) - } - u, err := url.Parse("poll") - if err != nil { - c.logger.Error("Error parsing url:", "err", err) - return fmt.Errorf("error parsing url poll: %w", err) - } - url := base.ResolveReference(u) - resp, err := client.Post(url.String(), "", strings.NewReader(*myFqdn)) - if err != nil { - c.logger.Error("Error polling:", "err", err) - return fmt.Errorf("error polling: %w", err) - } - defer resp.Body.Close() - - request, err := http.ReadRequest(bufio.NewReader(resp.Body)) - if err != nil { - c.logger.Error("Error reading request:", "err", err) - return fmt.Errorf("error reading request: %w", err) - } - c.logger.Info("Got scrape request", "scrape_id", request.Header.Get("id"), "url", request.URL) - - request.RequestURI = "" - - go c.doScrape(request, client) - - return nil -} - -func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) { - op := func() error { - return c.doPoll(client) - } - - for { - if err := backoff.RetryNotify(op, bo, func(err error, _ time.Duration) { - pollErrorCounter.Inc() - }); err != nil { - c.logger.Error("backoff returned error", "error", err) - } - } -} - func main() { promslogConfig := promslog.Config{} flag.AddFlags(kingpin.CommandLine, &promslogConfig) kingpin.HelpFlag.Short('h') kingpin.Parse() logger := promslog.New(&promslogConfig) - coordinator := Coordinator{logger: logger} if *proxyURL == "" { - coordinator.logger.Error("--proxy-url flag must be specified.") + logger.Error("--proxy-url flag must be specified.") os.Exit(1) } // Make sure proxyURL ends with a single '/' *proxyURL = strings.TrimRight(*proxyURL, "/") + "/" - coordinator.logger.Info("URL and FQDN info", "proxy_url", *proxyURL, "fqdn", *myFqdn) + logger.Info("URL and FQDN info", "proxy_url", *proxyURL, "fqdn", *myFqdn) tlsConfig := &tls.Config{} if *tlsCert != "" { cert, err := tls.LoadX509KeyPair(*tlsCert, *tlsKey) if err != nil { - coordinator.logger.Error("Certificate or Key is invalid", "err", err) + logger.Error("Certificate or Key is invalid", "err", err) os.Exit(1) } @@ -256,12 +82,12 @@ func main() { if *caCertFile != "" { caCert, err := os.ReadFile(*caCertFile) if err != nil { - coordinator.logger.Error("Not able to read cacert file", "err", err) + logger.Error("Not able to read cacert file", "err", err) os.Exit(1) } caCertPool := x509.NewCertPool() if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { - coordinator.logger.Error("Failed to use cacert file as ca certificate") + logger.Error("Failed to use cacert file as ca certificate") os.Exit(1) } @@ -271,7 +97,7 @@ func main() { if *metricsAddr != "" { go func() { if err := http.ListenAndServe(*metricsAddr, promhttp.Handler()); err != nil { - coordinator.logger.Warn("ListenAndServe", "err", err) + logger.Warn("ListenAndServe", "err", err) } }() } @@ -290,7 +116,12 @@ func main() { TLSClientConfig: tlsConfig, } - client := &http.Client{Transport: transport} + c := &http.Client{Transport: transport} - coordinator.loop(newBackOffFromFlags(), client) + coordinator, err := client.NewCoordinator(logger, newBackOffFromFlags(), c, *myFqdn, *proxyURL) + if err != nil { + logger.Error("Failed to create coordinator", "err", err) + os.Exit(1) + } + coordinator.Start(context.Background()) }