-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[filebeat][streaming] - Added support for TLS & forward proxy configs…
… for websockets (#41934) (#41985) (cherry picked from commit fd81074) Co-authored-by: ShourieG <[email protected]>
- Loading branch information
1 parent
2ae64cc
commit 092f0ea
Showing
9 changed files
with
402 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ package streaming | |
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
|
@@ -41,6 +42,7 @@ type WebSocketHandler func(*testing.T, *websocket.Conn, []string) | |
var inputTests = []struct { | ||
name string | ||
server func(*testing.T, WebSocketHandler, map[string]interface{}, []string) | ||
proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server | ||
handler WebSocketHandler | ||
config map[string]interface{} | ||
response []string | ||
|
@@ -450,6 +452,140 @@ var inputTests = []struct { | |
}, | ||
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), | ||
}, | ||
{ | ||
name: "single_event_tls", | ||
server: webSocketServerWithTLS(httptest.NewUnstartedServer), | ||
handler: defaultHandler, | ||
config: map[string]interface{}{ | ||
"program": ` | ||
bytes(state.response).decode_json().as(inner_body,{ | ||
"events": [inner_body], | ||
})`, | ||
"ssl": map[string]interface{}{ | ||
"enabled": true, | ||
"certificate_authorities": []string{"testdata/certs/ca.crt"}, | ||
"certificate": "testdata/certs/cert.pem", | ||
"key": "testdata/certs/key.pem", | ||
}, | ||
}, | ||
response: []string{` | ||
{ | ||
"pps": { | ||
"agent": "example.proofpoint.com", | ||
"cid": "mmeng_uivm071" | ||
}, | ||
"ts": "2017-08-17T14:54:12.949180-07:00", | ||
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", | ||
"sm": { | ||
"tls": { | ||
"verify": "NONE" | ||
}, | ||
"stat": "Sent", | ||
"qid": "v7HLqYbx029423", | ||
"dsn": "2.0.0", | ||
"mailer": "*file*", | ||
"to": [ | ||
"/dev/null" | ||
], | ||
"ctladdr": "<[email protected]> (8/0)", | ||
"delay": "00:00:00", | ||
"xdelay": "00:00:00", | ||
"pri": 35342 | ||
}, | ||
"id": "ZeYGULpZmL5N0151HN1OyA" | ||
}`}, | ||
want: []map[string]interface{}{ | ||
{ | ||
"pps": map[string]interface{}{ | ||
"agent": "example.proofpoint.com", | ||
"cid": "mmeng_uivm071", | ||
}, | ||
"ts": "2017-08-17T14:54:12.949180-07:00", | ||
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", | ||
"sm": map[string]interface{}{ | ||
"tls": map[string]interface{}{ | ||
"verify": "NONE", | ||
}, | ||
"stat": "Sent", | ||
"qid": "v7HLqYbx029423", | ||
"dsn": "2.0.0", | ||
"mailer": "*file*", | ||
"to": []interface{}{ | ||
"/dev/null", | ||
}, | ||
"ctladdr": "<[email protected]> (8/0)", | ||
"delay": "00:00:00", | ||
"xdelay": "00:00:00", | ||
"pri": float64(35342), | ||
}, | ||
"id": "ZeYGULpZmL5N0151HN1OyA", | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "basic_proxy_forwarding", | ||
proxyServer: newWebSocketProxyTestServer, | ||
handler: defaultHandler, | ||
config: map[string]interface{}{ | ||
"program": ` | ||
bytes(state.response).decode_json().as(inner_body,{ | ||
"events": [inner_body], | ||
})`, | ||
}, | ||
response: []string{` | ||
{ | ||
"pps": { | ||
"agent": "example.proofpoint.com", | ||
"cid": "mmeng_uivm071" | ||
}, | ||
"ts": "2017-08-17T14:54:12.949180-07:00", | ||
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", | ||
"sm": { | ||
"tls": { | ||
"verify": "NONE" | ||
}, | ||
"stat": "Sent", | ||
"qid": "v7HLqYbx029423", | ||
"dsn": "2.0.0", | ||
"mailer": "*file*", | ||
"to": [ | ||
"/dev/null" | ||
], | ||
"ctladdr": "<[email protected]> (8/0)", | ||
"delay": "00:00:00", | ||
"xdelay": "00:00:00", | ||
"pri": 35342 | ||
}, | ||
"id": "ZeYGULpZmL5N0151HN1OyA" | ||
}`}, | ||
want: []map[string]interface{}{ | ||
{ | ||
"pps": map[string]interface{}{ | ||
"agent": "example.proofpoint.com", | ||
"cid": "mmeng_uivm071", | ||
}, | ||
"ts": "2017-08-17T14:54:12.949180-07:00", | ||
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", | ||
"sm": map[string]interface{}{ | ||
"tls": map[string]interface{}{ | ||
"verify": "NONE", | ||
}, | ||
"stat": "Sent", | ||
"qid": "v7HLqYbx029423", | ||
"dsn": "2.0.0", | ||
"mailer": "*file*", | ||
"to": []interface{}{ | ||
"/dev/null", | ||
}, | ||
"ctladdr": "<[email protected]> (8/0)", | ||
"delay": "00:00:00", | ||
"xdelay": "00:00:00", | ||
"pri": float64(35342), | ||
}, | ||
"id": "ZeYGULpZmL5N0151HN1OyA", | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
var urlEvalTests = []struct { | ||
|
@@ -560,6 +696,9 @@ func TestInput(t *testing.T) { | |
if test.server != nil { | ||
test.server(t, test.handler, test.config, test.response) | ||
} | ||
if test.proxyServer != nil { | ||
test.proxyServer(t, test.handler, test.config, test.response) | ||
} | ||
|
||
cfg := conf.MustNewConfigFrom(test.config) | ||
|
||
|
@@ -771,6 +910,46 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t | |
} | ||
} | ||
|
||
// webSocketServerWithTLS simulates a WebSocket server with TLS based authentication. | ||
func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { | ||
return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { | ||
server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
upgrader := websocket.Upgrader{ | ||
CheckOrigin: func(r *http.Request) bool { | ||
return true | ||
}, | ||
} | ||
|
||
conn, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
t.Fatalf("error upgrading connection to WebSocket: %v", err) | ||
return | ||
} | ||
|
||
handler(t, conn, response) | ||
})) | ||
//nolint:gosec // there is no need to use a secure cert for testing | ||
server.TLS = &tls.Config{ | ||
Certificates: []tls.Certificate{generateSelfSignedCert(t)}, | ||
} | ||
server.StartTLS() | ||
|
||
if config["url"] == nil { | ||
config["url"] = "ws" + server.URL[4:] | ||
} | ||
t.Cleanup(server.Close) | ||
} | ||
} | ||
|
||
// generateSelfSignedCert returns a self-signed certificate for testing purposes based on the dummy certs in the testdata directory | ||
func generateSelfSignedCert(t *testing.T) tls.Certificate { | ||
cert, err := tls.LoadX509KeyPair("testdata/certs/cert.pem", "testdata/certs/key.pem") | ||
if err != nil { | ||
t.Fatalf("failed to generate self-signed cert: %v", err) | ||
} | ||
return cert | ||
} | ||
|
||
// defaultHandler is a default handler for WebSocket connections. | ||
func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { | ||
for _, r := range response { | ||
|
@@ -780,3 +959,73 @@ func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { | |
} | ||
} | ||
} | ||
|
||
// webSocketTestServer creates a WebSocket target server that communicates with the proxy handler. | ||
func webSocketTestServer(t *testing.T, handler WebSocketHandler, response []string) *httptest.Server { | ||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
upgrader := websocket.Upgrader{ | ||
CheckOrigin: func(r *http.Request) bool { | ||
return true | ||
}, | ||
} | ||
conn, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
t.Fatalf("failed to upgrade WebSocket connection: %v", err) | ||
return | ||
} | ||
handler(t, conn, response) | ||
})) | ||
} | ||
|
||
// webSocketProxyHandler forwards WebSocket connections to the target server. | ||
// | ||
//nolint:errcheck //we can safely ignore errors checks here | ||
func webSocketProxyHandler(targetURL string) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
defer r.Response.Body.Close() | ||
//nolint:bodyclose // we can ignore the body close here | ||
targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil) | ||
if err != nil { | ||
http.Error(w, "failed to connect to backend WebSocket server", http.StatusBadGateway) | ||
return | ||
} | ||
defer targetConn.Close() | ||
|
||
upgrader := websocket.Upgrader{ | ||
CheckOrigin: func(r *http.Request) bool { | ||
return true | ||
}, | ||
} | ||
clientConn, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
http.Error(w, "failed to upgrade client connection", http.StatusInternalServerError) | ||
return | ||
} | ||
defer clientConn.Close() | ||
// forward messages between client and target server | ||
go func() { | ||
for { | ||
messageType, message, err := targetConn.ReadMessage() | ||
if err != nil { | ||
break | ||
} | ||
clientConn.WriteMessage(messageType, message) | ||
} | ||
}() | ||
for { | ||
messageType, message, err := clientConn.ReadMessage() | ||
if err != nil { | ||
break | ||
} | ||
targetConn.WriteMessage(messageType, message) | ||
} | ||
} | ||
} | ||
|
||
// newWebSocketProxyTestServer creates a proxy server forwarding WebSocket traffic. | ||
func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) *httptest.Server { | ||
backendServer := webSocketTestServer(t, handler, response) | ||
config["url"] = "ws" + backendServer.URL[4:] | ||
config["proxy_url"] = "ws" + backendServer.URL[4:] | ||
return httptest.NewServer(webSocketProxyHandler(config["url"].(string))) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
-----BEGIN CERTIFICATE----- | ||
MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL | ||
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM | ||
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0 | ||
MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx | ||
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN | ||
AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7 | ||
HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB | ||
twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm | ||
vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN | ||
LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL | ||
sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD | ||
VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn | ||
cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB | ||
AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd | ||
UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25 | ||
TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU | ||
COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3 | ||
z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV | ||
5h0bTbbPOyEIe5ydEIbr5kA= | ||
-----END CERTIFICATE----- |
Oops, something went wrong.