diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 3c6afea5aa8e..80032b8c1875 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -18,11 +18,13 @@ package eslegclient import ( + "bufio" "bytes" "encoding/base64" "encoding/json" "fmt" "io" + "net" "net/http" "net/url" "time" @@ -312,13 +314,70 @@ func (conn *Connection) Close() error { return nil } +type httpClientProxySettings struct { + *httpcommon.HTTPClientProxySettings +} + +// ProxyDialer is a dialer that can be registered to golang.org/x/net/proxy +func (settings *httpClientProxySettings) ProxyDialer(_ *url.URL, forward proxy.Dialer) (proxy.Dialer, error) { + return transport.DialerFunc(func(_, address string) (net.Conn, error) { + + // Headers given to the CONNECT request + hdr := settings.Headers.Headers() + if settings.URL.User != nil { + username := settings.URL.User.Username() + password, _ := settings.URL.User.Password() + if len(hdr) == 0 { + hdr = http.Header{} + } + hdr.Add("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password))) + } + + req := &http.Request{ + Method: "CONNECT", + URL: &url.URL{Opaque: address}, + Host: address, + Header: hdr, + } + + // Dial the proxy host + c, err := forward.Dial("tcp", settings.URL.Host) + if err != nil { + return nil, err + } + + // Write the CONNECT request + err = req.Write(c) + if err != nil { + c.Close() + return nil, err + } + + res, err := http.ReadResponse(bufio.NewReader(c), req) + if err != nil { + c.Close() + return nil, err + } + res.Body.Close() + + if res.StatusCode != http.StatusOK { + c.Close() + return nil, fmt.Errorf("proxy server returned status code %d", res.StatusCode) + } + + return c, nil + }), nil +} + func (conn *Connection) Test(d testing.Driver) { testProxyDialer := func(d testing.Driver, forward transport.Dialer, settings *httpcommon.HTTPClientProxySettings) transport.Dialer { switch scheme := settings.URL.Scheme; scheme { case "http", "https": - proxy.RegisterDialerType(scheme, settings.ProxyDialer) + proxy.RegisterDialerType(scheme, (&httpClientProxySettings{HTTPClientProxySettings: settings}).ProxyDialer) } - return transport.TestProxyDialer(d, forward, &transport.ProxyConfig{URL: settings.URL.String()}) + dialer, err := transport.ProxyDialer(logp.L(), &transport.ProxyConfig{URL: settings.URL.String()}, forward) + d.Fatal("proxy", err) + return dialer } d.Run("elasticsearch: "+conn.URL, func(d testing.Driver) {