From cc1ab5bf27bf91853cbd6e27e7877c102e130ddb Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Mon, 6 May 2024 16:00:57 +0200 Subject: [PATCH 01/14] feat: add matching timeout and not blocking matchers --- layer4/connection.go | 143 ++++++++++++++++++++++++++------------ layer4/connection_test.go | 19 +++-- layer4/handlers.go | 2 + layer4/matchers.go | 4 +- layer4/routes.go | 129 +++++++++++++++++----------------- layer4/routes_test.go | 57 +++++++++++++++ layer4/server.go | 4 +- 7 files changed, 234 insertions(+), 124 deletions(-) create mode 100644 layer4/routes_test.go diff --git a/layer4/connection.go b/layer4/connection.go index be4973c..ce6d6cf 100644 --- a/layer4/connection.go +++ b/layer4/connection.go @@ -15,9 +15,8 @@ package layer4 import ( - "bytes" "context" - "io" + "errors" "net" "sync" @@ -30,7 +29,7 @@ import ( // and variable table. This function is intended for use at the start of a // connection handler chain where the underlying connection is not yet a layer4 // Connection value. -func WrapConnection(underlying net.Conn, buf *bytes.Buffer, logger *zap.Logger) *Connection { +func WrapConnection(underlying net.Conn, buf []byte, logger *zap.Logger) *Connection { repl := caddy.NewReplacer() repl.Set("l4.conn.remote_addr", underlying.RemoteAddr()) repl.Set("l4.conn.local_addr", underlying.LocalAddr()) @@ -66,31 +65,34 @@ type Connection struct { Logger *zap.Logger - buf *bytes.Buffer // stores recordings - bufReader io.Reader // used to read buf so it doesn't discard bytes - recording bool + buf []byte // stores matching data + offset int + matching bool bytesRead, bytesWritten uint64 } +var ErrConsumedAllPrefetchedBytes = errors.New("consumed all prefetched bytes") +var ErrMatchingBufferFull = errors.New("matching buffer is full") + // Read implements io.Reader in such a way that reads first // deplete any associated buffer from the prior recording, // and once depleted (or if there isn't one), it continues // reading from the underlying connection. func (cx *Connection) Read(p []byte) (n int, err error) { + if cx.matching { + if len(cx.buf) == 0 || len(cx.buf[cx.offset:]) == 0 { + return 0, ErrConsumedAllPrefetchedBytes + } + } + // if there is a buffer we should read from, start // with that; we only read from the underlying conn // after the buffer has been "depleted" - if cx.bufReader != nil { - n, err = cx.bufReader.Read(p) - if err == io.EOF { - cx.bufReader = nil - err = nil - } - // prevent first read from returning 0 bytes because of empty bufReader - if !(n == 0 && err == nil) { - return - } + if cx.offset < len(cx.buf) { + n := copy(p, cx.buf[cx.offset:]) + cx.offset += n + return n, nil } // buffer has been "depleted" so read from @@ -98,19 +100,6 @@ func (cx *Connection) Read(p []byte) (n int, err error) { n, err = cx.Conn.Read(p) cx.bytesRead += uint64(n) - if !cx.recording { - return - } - - // since we're recording at this point, anything that - // was read needs to be written to the buffer, even - // if there was an error - if n > 0 { - if nw, errw := cx.buf.Write(p[:n]); errw != nil { - return nw, errw - } - } - return } @@ -130,33 +119,82 @@ func (cx *Connection) Wrap(conn net.Conn) *Connection { Context: cx.Context, Logger: cx.Logger, buf: cx.buf, - bufReader: cx.bufReader, - recording: cx.recording, + offset: cx.offset, + matching: cx.matching, bytesRead: cx.bytesRead, bytesWritten: cx.bytesWritten, } } -// record starts recording the stream into cx.buf. It also creates a reader -// to read from the buffer but not to discard any byte. -func (cx *Connection) record() { - cx.recording = true - cx.bufReader = bytes.NewReader(cx.buf.Bytes()) // Don't discard bytes. +// prefetch tries to read all bytes that a client initially sent us without blocking. +func (cx *Connection) prefetch() (err error) { + var n int + var tmp []byte + + for len(cx.buf) < MaxMatchingBytes { + free := cap(cx.buf) - len(cx.buf) + if free >= PrefetchChunkSize { + n, err = cx.Conn.Read(cx.buf[len(cx.buf) : len(cx.buf)+PrefetchChunkSize]) + cx.buf = cx.buf[:len(cx.buf)+n] + } else { + if tmp == nil { + tmp = bufPool.Get().([]byte) + tmp = tmp[:PrefetchChunkSize] + defer bufPool.Put(tmp) + } + n, err = cx.Conn.Read(tmp) + cx.buf = append(cx.buf, tmp[:n]...) + } + + cx.bytesRead += uint64(n) + + if err != nil { + return err + } + + if n < PrefetchChunkSize { + break + } + } + + if cx.Logger.Core().Enabled(zap.DebugLevel) { + cx.Logger.Debug("prefetched", + zap.String("remote", cx.RemoteAddr().String()), + zap.Int("bytes", len(cx.buf)), + ) + } + + if len(cx.buf) >= MaxMatchingBytes { + return ErrMatchingBufferFull + } + + return nil +} + +// freeze activates the matching mode that only reads from cx.buf. +func (cx *Connection) freeze() { + cx.matching = true +} + +// unfreeze stops the matching mode and resets the buffer offset +// so that the next reads come from the buffer first. +func (cx *Connection) unfreeze() { + cx.matching = false + cx.offset = 0 } -// rewind stops recording and creates a reader for the -// buffer so that the next reads from an associated -// recordableConn come from the buffer first, then -// continue with the underlying conn. -func (cx *Connection) rewind() { - cx.recording = false - cx.bufReader = cx.buf // Actually consume bytes. +// clear discards cx.buf and resets cx.offset to prepare the connection for the next prefetch & matching phase +func (cx *Connection) clear() { + if cx.buf != nil { + cx.buf = cx.buf[:0] + } + cx.offset = 0 } // SetVar sets a value in the context's variable table with // the given key. It overwrites any previous value with the // same key. -func (cx Connection) SetVar(key string, value interface{}) { +func (cx *Connection) SetVar(key string, value interface{}) { varMap, ok := cx.Context.Value(VarsCtxKey).(map[string]interface{}) if !ok { return @@ -167,7 +205,7 @@ func (cx Connection) SetVar(key string, value interface{}) { // GetVar gets a value from the context's variable table with // the given key. It returns the value if found, and true if // it found a value with that key; false otherwise. -func (cx Connection) GetVar(key string) interface{} { +func (cx *Connection) GetVar(key string) interface{} { varMap, ok := cx.Context.Value(VarsCtxKey).(map[string]interface{}) if !ok { return nil @@ -175,6 +213,12 @@ func (cx Connection) GetVar(key string) interface{} { return varMap[key] } +// MatchingBytes returns all bytes currently available for matching. This is only intended for reading. +// Do not write into the slice it's a view of the internal buffer and you will likely mess up the connection. +func (cx *Connection) MatchingBytes() ([]byte, error) { + return cx.buf[cx.offset:], nil +} + var ( // VarsCtxKey is the key used to store the variables table // in a Connection's context. @@ -187,8 +231,15 @@ var ( listenerCtxKey caddy.CtxKey = "listener" ) +const PrefetchChunkSize = 1024 + +// MaxMatchingBytes is the amount of bytes that are at most prefetched during matching. +// This is probably most relevant for the http matcher since http requests do not have a size limit. +// 8 KiB should cover most use-cases and is similar to popular webservers. +const MaxMatchingBytes = 8 * 1024 + var bufPool = sync.Pool{ New: func() interface{} { - return new(bytes.Buffer) + return make([]byte, 0, PrefetchChunkSize) }, } diff --git a/layer4/connection_test.go b/layer4/connection_test.go index e610582..61786f4 100644 --- a/layer4/connection_test.go +++ b/layer4/connection_test.go @@ -8,7 +8,7 @@ import ( "go.uber.org/zap" ) -func TestConnection_RecordAndRewind(t *testing.T) { +func TestConnection_FreezeAndUnfreeze(t *testing.T) { in, out := net.Pipe() defer in.Close() defer out.Close() @@ -26,9 +26,14 @@ func TestConnection_RecordAndRewind(t *testing.T) { in.Write(consumeData) }() - // 1st matcher + // prefetch like server handler would + err := cx.prefetch() + if err != nil { + t.Fatal(err) + } - cx.record() + // 1st matcher + cx.freeze() n, err := cx.Read(buf) if err != nil { @@ -41,11 +46,11 @@ func TestConnection_RecordAndRewind(t *testing.T) { t.Fatalf("expected %s but received %s", matcherData, buf) } - cx.rewind() + cx.unfreeze() // 2nd matcher (reads same data) - cx.record() + cx.freeze() n, err = cx.Read(buf) if err != nil { @@ -58,9 +63,9 @@ func TestConnection_RecordAndRewind(t *testing.T) { t.Fatalf("expected %s but received %s", matcherData, buf) } - cx.rewind() + cx.unfreeze() - // 1st consumer (no record call) + // 1st consumer (no freeze call) n, err = cx.Read(buf) if err != nil { diff --git a/layer4/handlers.go b/layer4/handlers.go index 14ac43c..b095a36 100644 --- a/layer4/handlers.go +++ b/layer4/handlers.go @@ -35,6 +35,8 @@ func (h Handlers) Compile() Handler { // as part of a middleware chain. type NextHandler interface { Handle(*Connection, Handler) error + // Must return true if this is the last handler in a handler chain. + IsTerminal() bool } // Handler is a type that can handle connections. diff --git a/layer4/matchers.go b/layer4/matchers.go index 0b3a9ed..14b6006 100644 --- a/layer4/matchers.go +++ b/layer4/matchers.go @@ -48,9 +48,9 @@ type MatcherSet []ConnMatcher // or if there are no matchers. Any error terminates matching. func (mset MatcherSet) Match(cx *Connection) (matched bool, err error) { for _, m := range mset { - cx.record() + cx.freeze() matched, err = m.Match(cx) - cx.rewind() + cx.unfreeze() if cx.Logger.Core().Enabled(zap.DebugLevel) { matcher := "unknown" if cm, ok := m.(caddy.Module); ok { diff --git a/layer4/routes.go b/layer4/routes.go index 673341c..2e8328d 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -16,7 +16,10 @@ package layer4 import ( "encoding/json" + "errors" "fmt" + "os" + "time" "github.com/caddyserver/caddy/v2" "go.uber.org/zap" @@ -39,10 +42,15 @@ type Route struct { // executed in sequential order if the route's matchers match. HandlersRaw []json.RawMessage `json:"handle,omitempty" caddy:"namespace=layer4.handlers inline_key=handler"` + // Is set to true during Provision if any of the handlers IsTerminal. + terminal bool + matcherSets MatcherSets middleware []Middleware } +var ErrMatchingTimeout = errors.New("aborted matching according to timeout") + // Provision sets up a route. func (r *Route) Provision(ctx caddy.Context) error { // matchers @@ -62,7 +70,11 @@ func (r *Route) Provision(ctx caddy.Context) error { } var handlers Handlers for _, mod := range mods.([]interface{}) { - handlers = append(handlers, mod.(NextHandler)) + handler := mod.(NextHandler) + handlers = append(handlers, handler) + if handler.IsTerminal() { + r.terminal = true + } } for _, midhandler := range handlers { r.middleware = append(r.middleware, wrapHandler(midhandler)) @@ -93,74 +105,57 @@ func (routes RouteList) Provision(ctx caddy.Context) error { // This should only be done once: after all the routes have // been provisioned, and before the server loop begins. func (routes RouteList) Compile(next Handler, logger *zap.Logger) Handler { - mid := make([]Middleware, 0, len(routes)) - for _, route := range routes { - mid = append(mid, wrapRoute(route, logger)) - } - stack := next - for i := len(mid) - 1; i >= 0; i-- { - stack = mid[i](stack) - } - return stack -} - -// wrapRoute wraps route with a middleware and handler so that it can -// be chained in and defer evaluation of its matchers to request-time. -// Like wrapMiddleware, it is vital that this wrapping takes place in -// its own stack frame so as to not overwrite the reference to the -// intended route by looping and changing the reference each time. -func wrapRoute(route *Route, logger *zap.Logger) Middleware { - return func(next Handler) Handler { - return HandlerFunc(func(cx *Connection) error { - // TODO: Update this comment, it seems we've moved the copy into the handler? - // copy the next handler (it's an interface, so it's just - // a very lightweight copy of a pointer); this is important - // because this is a closure to the func below, which - // re-assigns the value as it compiles the middleware stack; - // if we don't make this copy, we'd affect the underlying - // pointer for all future request (yikes); we could - // alternatively solve this by moving the func below out of - // this closure and into a standalone package-level func, - // but I just thought this made more sense - nextCopy := next - - // route must match at least one of the matcher sets - matched, err := route.matcherSets.AnyMatch(cx) + return HandlerFunc(func(cx *Connection) error { + deadline := time.Now().Add(100 * time.Millisecond) // TODO: make this configurable + router: + // timeout matching to protect against malicious or very slow clients + err := cx.Conn.SetReadDeadline(deadline) + if err != nil { + return err + } + for { // retry prefetching and matching routes until timeout + err = cx.prefetch() if err != nil { - logger.Error("matching connection", zap.String("remote", cx.RemoteAddr().String()), zap.Error(err)) + logFunc := logger.Error + if errors.Is(err, os.ErrDeadlineExceeded) { + err = ErrMatchingTimeout + logFunc = logger.Warn + } + logFunc("matching connection", zap.String("remote", cx.RemoteAddr().String()), zap.Error(err)) return nil // return nil so the error does not get logged again } - if !matched { - return nextCopy.Handle(cx) + for _, route := range routes { + // route must match at least one of the matcher sets + matched, err := route.matcherSets.AnyMatch(cx) + if errors.Is(err, ErrConsumedAllPrefetchedBytes) { + continue // ignore and try next route + } + if err != nil { + logger.Error("matching connection", zap.String("remote", cx.RemoteAddr().String()), zap.Error(err)) + return nil + } + if matched { + // remove deadline after we matched + err = cx.Conn.SetReadDeadline(time.Time{}) + if err != nil { + return err + } + + // compile this route's handler stack + stack := next + for i := len(route.middleware) - 1; i >= 0; i-- { + stack = route.middleware[i](stack) + } + err = stack.Handle(cx) + if err != nil { + return err + } + if !route.terminal { + cx.clear() + goto router + } + } } - - // TODO: other routing features? - - // // if route is part of a group, ensure only the - // // first matching route in the group is applied - // if route.Group != "" { - // groups := req.Context().Value(routeGroupCtxKey).(map[string]struct{}) - - // if _, ok := groups[route.Group]; ok { - // // this group has already been - // // satisfied by a matching route - // return nextCopy.ServeHTTP(rw, req) - // } - - // // this matching route satisfies the group - // groups[route.Group] = struct{}{} - // } - - // // make terminal routes terminate - // if route.Terminal { - // nextCopy = emptyHandler - // } - - // compile this route's handler stack - for i := len(route.middleware) - 1; i >= 0; i-- { - nextCopy = route.middleware[i](nextCopy) - } - return nextCopy.Handle(cx) - }) - } + } + }) } diff --git a/layer4/routes_test.go b/layer4/routes_test.go new file mode 100644 index 0000000..0983231 --- /dev/null +++ b/layer4/routes_test.go @@ -0,0 +1,57 @@ +package layer4 + +import ( + "context" + "errors" + "net" + "testing" + "time" + + "github.com/caddyserver/caddy/v2" + "go.uber.org/zap" +) + +type sentinelHandler struct { + Called bool +} + +func (h *sentinelHandler) Handle(_ *Connection) error { + h.Called = true + return nil +} + +func TestMatchingTimeoutWorks(t *testing.T) { + ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) + defer cancel() + + routes := RouteList{&Route{}} + + err := routes.Provision(ctx) + if err != nil { + t.Fatalf("provision failed | %s", err) + } + + sentinel := &sentinelHandler{} + compiledRoutes := routes.Compile(sentinel, zap.NewNop(), 5*time.Millisecond) + + in, out := net.Pipe() + defer in.Close() + defer out.Close() + + cx := WrapConnection(out, []byte{}, zap.NewNop()) + defer cx.Close() + + err = compiledRoutes.Handle(cx) + if err == nil { + t.Fatalf("missing error") + } + + if !errors.Is(err, ErrMatchingTimeout) { + t.Fatalf("unexpected handler error | %v", err) + } + + // since matching failed no handler should be called + if sentinel.Called != false { + t.Fatal("sentinel handler was called but should not") + } +} diff --git a/layer4/server.go b/layer4/server.go index 6f1609f..81395c4 100644 --- a/layer4/server.go +++ b/layer4/server.go @@ -99,8 +99,8 @@ func (s Server) servePacket(pc net.PacketConn) error { func (s Server) handle(conn net.Conn) { defer conn.Close() - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() + buf := bufPool.Get().([]byte) + buf = buf[:0] defer bufPool.Put(buf) cx := WrapConnection(conn, buf, s.logger) From 5f37ae2c1c3f052bd0994a785aae2364000bad9c Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Sat, 4 May 2024 15:27:49 +0200 Subject: [PATCH 02/14] test: adapt tests to changed WrapConnection --- layer4/connection_test.go | 2 +- layer4/listener.go | 10 +++++----- layer4/matchers_test.go | 8 -------- modules/l4http/httpmatcher_test.go | 14 ++++++++------ modules/l4proxyprotocol/handler_test.go | 7 +++---- modules/l4proxyprotocol/matcher_test.go | 7 +++---- modules/l4socks/socks4_matcher_test.go | 3 +-- modules/l4socks/socks5_handler_test.go | 2 +- modules/l4socks/socks5_matcher_test.go | 3 +-- 9 files changed, 23 insertions(+), 33 deletions(-) diff --git a/layer4/connection_test.go b/layer4/connection_test.go index 61786f4..d7626e6 100644 --- a/layer4/connection_test.go +++ b/layer4/connection_test.go @@ -13,7 +13,7 @@ func TestConnection_FreezeAndUnfreeze(t *testing.T) { defer in.Close() defer out.Close() - cx := WrapConnection(out, &bytes.Buffer{}, zap.NewNop()) + cx := WrapConnection(out, []byte{}, zap.NewNop()) defer cx.Close() matcherData := []byte("foo") diff --git a/layer4/listener.go b/layer4/listener.go index 81c4ae1..1927a41 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -1,16 +1,16 @@ package layer4 import ( - "bytes" "context" "crypto/tls" "errors" - "github.com/caddyserver/caddy/v2" - "go.uber.org/zap" "net" "runtime" "sync" "time" + + "github.com/caddyserver/caddy/v2" + "go.uber.org/zap" ) func init() { @@ -116,8 +116,8 @@ func (l *listener) handle(conn net.Conn) { } }() - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() + buf := bufPool.Get().([]byte) + buf = buf[:0] defer bufPool.Put(buf) cx := WrapConnection(conn, buf, l.logger) diff --git a/layer4/matchers_test.go b/layer4/matchers_test.go index 4391a67..0f447fc 100644 --- a/layer4/matchers_test.go +++ b/layer4/matchers_test.go @@ -1,7 +1,6 @@ package layer4 import ( - "bytes" "net" "testing" @@ -70,7 +69,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ @@ -89,7 +87,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "192.168.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ @@ -108,7 +105,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "192.168.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ @@ -130,7 +126,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "172.16.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ @@ -152,7 +147,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "192.168.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "192.168.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ @@ -174,7 +168,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "172.16.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ @@ -194,7 +187,6 @@ func TestNotMatcher(t *testing.T) { localAddr: dummyAddr{ip: "127.0.0.1", network: "tcp"}, remoteAddr: dummyAddr{ip: "192.168.0.1", network: "tcp"}, }, - buf: &bytes.Buffer{}, Logger: zap.NewNop(), }, matcher: MatchNot{ diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index 555658c..eb43d73 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -1,11 +1,11 @@ package l4http import ( - "bytes" "context" "crypto/tls" "encoding/base64" "encoding/json" + "errors" "io" "net" "sync" @@ -33,7 +33,7 @@ func httpMatchTester(t *testing.T, matcherSets caddyhttp.RawMatcherSets, data [] _ = out.Close() }() - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, make([]byte, 0, layer4.PrefetchChunkSize), zap.NewNop()) go func() { wg.Add(1) defer func() { @@ -51,7 +51,8 @@ func httpMatchTester(t *testing.T, matcherSets caddyhttp.RawMatcherSets, data [] err := matcher.Provision(ctx) assertNoError(t, err) - matched, err := matcher.Match(cx) + mset := layer4.MatcherSet{matcher} // use MatcherSet to correctly call freeze() before matching + matched, err := mset.Match(cx) _, _ = io.Copy(io.Discard, in) @@ -202,7 +203,7 @@ func TestHttpMatchingByProtocolWithHttps(t *testing.T) { _ = out.Close() }() - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer func() { @@ -223,7 +224,8 @@ func TestHttpMatchingByProtocolWithHttps(t *testing.T) { err := matcher.Provision(ctx) assertNoError(t, err) - matched, err := matcher.Match(cx) + mset := layer4.MatcherSet{matcher} // use MatcherSet to correctly call freeze() before matching + matched, err := mset.Match(cx) assertNoError(t, err) if !matched { t.Fatalf("matcher did not match") @@ -247,7 +249,7 @@ func TestHttpMatchingGarbage(t *testing.T) { if matched { t.Fatalf("matcher did match") } - if err == nil || err.Error() != "unexpected EOF" { + if !errors.Is(err, layer4.ErrConsumedAllPrefetchedBytes) { t.Fatalf("handler did not return an error or the wrong error -> %v", err) } } diff --git a/modules/l4proxyprotocol/handler_test.go b/modules/l4proxyprotocol/handler_test.go index 4f3610f..741592a 100644 --- a/modules/l4proxyprotocol/handler_test.go +++ b/modules/l4proxyprotocol/handler_test.go @@ -1,7 +1,6 @@ package l4proxyprotocol import ( - "bytes" "context" "io" "net" @@ -25,7 +24,7 @@ func TestProxyProtocolHandleV1(t *testing.T) { in, out := net.Pipe() defer closePipe(wg, in, out) - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer wg.Done() @@ -63,7 +62,7 @@ func TestProxyProtocolHandleV2(t *testing.T) { in, out := net.Pipe() defer closePipe(wg, in, out) - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer wg.Done() @@ -101,7 +100,7 @@ func TestProxyProtocolHandleGarbage(t *testing.T) { in, out := net.Pipe() defer closePipe(wg, in, out) - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer wg.Done() diff --git a/modules/l4proxyprotocol/matcher_test.go b/modules/l4proxyprotocol/matcher_test.go index 00dc598..c34d33c 100644 --- a/modules/l4proxyprotocol/matcher_test.go +++ b/modules/l4proxyprotocol/matcher_test.go @@ -1,7 +1,6 @@ package l4proxyprotocol import ( - "bytes" "encoding/hex" "io" "net" @@ -33,7 +32,7 @@ func TestProxyProtocolMatchV1(t *testing.T) { in, out := net.Pipe() defer closePipe(wg, in, out) - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer wg.Done() @@ -59,7 +58,7 @@ func TestProxyProtocolMatchV2(t *testing.T) { in, out := net.Pipe() defer closePipe(wg, in, out) - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer wg.Done() @@ -85,7 +84,7 @@ func TestProxyProtocolMatchGarbage(t *testing.T) { in, out := net.Pipe() defer closePipe(wg, in, out) - cx := layer4.WrapConnection(in, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { wg.Add(1) defer wg.Done() diff --git a/modules/l4socks/socks4_matcher_test.go b/modules/l4socks/socks4_matcher_test.go index 77fb0d8..6757b70 100644 --- a/modules/l4socks/socks4_matcher_test.go +++ b/modules/l4socks/socks4_matcher_test.go @@ -1,7 +1,6 @@ package l4socks import ( - "bytes" "context" "io" "net" @@ -78,7 +77,7 @@ func TestSocks4Matcher_Match(t *testing.T) { _ = out.Close() }() - cx := layer4.WrapConnection(out, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(out, []byte{}, zap.NewNop()) go func() { _, err := in.Write(tc.data) assertNoError(t, err) diff --git a/modules/l4socks/socks5_handler_test.go b/modules/l4socks/socks5_handler_test.go index e6170a5..a85f02e 100644 --- a/modules/l4socks/socks5_handler_test.go +++ b/modules/l4socks/socks5_handler_test.go @@ -17,7 +17,7 @@ import ( func replay(t *testing.T, handler *Socks5Handler, expectedError string, messages [][]byte) { t.Helper() in, out := net.Pipe() - cx := layer4.WrapConnection(out, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(out, []byte{}, zap.NewNop()) defer func() { _ = in.Close() _, _ = io.Copy(io.Discard, out) diff --git a/modules/l4socks/socks5_matcher_test.go b/modules/l4socks/socks5_matcher_test.go index b45b57e..dabf47e 100644 --- a/modules/l4socks/socks5_matcher_test.go +++ b/modules/l4socks/socks5_matcher_test.go @@ -1,7 +1,6 @@ package l4socks import ( - "bytes" "context" "io" "net" @@ -55,7 +54,7 @@ func TestSocks5Matcher_Match(t *testing.T) { _ = out.Close() }() - cx := layer4.WrapConnection(out, &bytes.Buffer{}, zap.NewNop()) + cx := layer4.WrapConnection(out, []byte{}, zap.NewNop()) go func() { _, err := in.Write(tc.data) assertNoError(t, err) From 35298311383ef4aeed5a667e54c7efc8763b0a88 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Sun, 6 Nov 2022 23:17:22 +0100 Subject: [PATCH 03/14] fix: workaround buffered reader in http matcher --- modules/l4http/httpmatcher.go | 38 ++++++++++++++++++---- modules/l4http/httpmatcher_test.go | 51 ++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/modules/l4http/httpmatcher.go b/modules/l4http/httpmatcher.go index 7bfdbc2..2036137 100644 --- a/modules/l4http/httpmatcher.go +++ b/modules/l4http/httpmatcher.go @@ -16,17 +16,19 @@ package l4http import ( "bufio" + "bytes" "encoding/json" "fmt" + "io" + "net/http" + "net/url" + "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/mholt/caddy-l4/layer4" "github.com/mholt/caddy-l4/modules/l4tls" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - "io" - "net/http" - "net/url" ) func init() { @@ -78,11 +80,18 @@ func (m MatchHTTP) Match(cx *layer4.Connection) (bool, error) { req, ok := cx.GetVar("http_request").(*http.Request) if !ok { var err error - bufReader := bufio.NewReader(cx) + + data, err := cx.MatchingBytes() + if !m.isHttp(data) { + return false, nil + } + + // use bufio reader which exactly matches the size of prefetched data, + // to not trigger all bytes consumed error + bufReader := bufio.NewReaderSize(cx, len(data)) req, err = http.ReadRequest(bufReader) if err != nil { - // TODO: find a way to distinguish actual errors from mismatches - return false, nil + return false, err } // check if req is a http2 request made with prior knowledge and if so parse it @@ -113,6 +122,23 @@ func (m MatchHTTP) Match(cx *layer4.Connection) (bool, error) { return m.matcherSets.AnyMatch(req), nil } +func (m MatchHTTP) isHttp(data []byte) bool { + // try to find the end of a http request line, for example " HTTP/1.1\r\n" + i := bytes.IndexByte(data, 0x0a) // find first new line + if i < 10 { + return false + } + // assume only \n line ending + start := i - 9 // position of space in front of HTTP + end := i - 3 // cut off version number "1.1" or "2.0" + // if we got a correct \r\n line ending shift the calculated start & end to the left + if data[i-1] == 0x0d { + start -= 1 + end -= 1 + } + return bytes.Compare(data[start:end], []byte(" HTTP/")) == 0 +} + // Parses information from a http2 request with prior knowledge (RFC 7540 Section 3.4) func (m MatchHTTP) handleHttp2WithPriorKnowledge(reader io.Reader, req *http.Request) error { // Does req contain a valid http2 magic? diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index eb43d73..3a2605a 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -253,3 +253,54 @@ func TestHttpMatchingGarbage(t *testing.T) { t.Fatalf("handler did not return an error or the wrong error -> %v", err) } } + +func TestMatchHTTP_isHttp(t *testing.T) { + for _, tc := range []struct { + name string + data []byte + shouldMatch bool + }{ + { + name: "http/1.1-only-lf", + data: []byte("GET /foo/bar?aaa=bbb HTTP/1.1\nHost: localhost:10443\n\n"), + shouldMatch: true, + }, + { + name: "http/1.1-cr-lf", + data: []byte("GET /foo/bar?aaa=bbb HTTP/1.1\r\nHost: localhost:10443\r\n\r\n"), + shouldMatch: true, + }, + { + name: "http/1.0-cr-lf", + data: []byte("GET /foo/bar?aaa=bbb HTTP/1.0\r\nHost: localhost:10443\r\n\r\n"), + shouldMatch: true, + }, + { + name: "http/2.0-cr-lf", + data: []byte("PRI * HTTP/2.0\r\n\r\n"), + shouldMatch: true, + }, + { + name: "dummy-short", + data: []byte("dum\n"), + shouldMatch: false, + }, + { + name: "dummy-long", + data: []byte("dummydummydummy\n"), + shouldMatch: false, + }, + { + name: "http/1.1-without-space-in-front", + data: []byte("HTTP/1.1\n"), + shouldMatch: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + matched := MatchHTTP{}.isHttp(tc.data) + if matched != tc.shouldMatch { + t.Fatalf("test %v | matched: %v != shouldMatch: %v", tc.name, matched, tc.shouldMatch) + } + }) + } +} From bb022596fa794d99a60b399a1a1cd001b4938826 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Fri, 3 May 2024 19:36:19 +0200 Subject: [PATCH 04/14] fix: catch wrapped connections in main handler --- layer4/routes.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/layer4/routes.go b/layer4/routes.go index 2e8328d..bf137fa 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -141,16 +141,28 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger) Handler { return err } - // compile this route's handler stack - stack := next - for i := len(route.middleware) - 1; i >= 0; i-- { - stack = route.middleware[i](stack) - } - err = stack.Handle(cx) - if err != nil { - return err - } - if !route.terminal { + var handler Handler + if route.terminal { + handler = next + for i := len(route.middleware) - 1; i >= 0; i-- { + handler = route.middleware[i](handler) + } + return handler.Handle(cx) + } else { + // Catch potentially wrapped connection to use it as input for next round of route matching. + // This is for example required for matchers after a tls handler. + catcher := HandlerFunc(func(conn *Connection) error { + cx = conn + return nil + }) + handler = &catcher + for i := len(route.middleware) - 1; i >= 0; i-- { + handler = route.middleware[i](handler) + } + err = handler.Handle(cx) + if err != nil { + return err + } cx.clear() goto router } From e8ec165853c8123fce5de86ac80a8a9769088752 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Fri, 3 May 2024 20:01:51 +0200 Subject: [PATCH 05/14] fix: implement IsTerminal for existing handlers --- layer4/routes.go | 6 +++--- modules/l4echo/echo.go | 4 ++++ modules/l4proxy/proxy.go | 4 ++++ modules/l4proxyprotocol/handler.go | 4 ++++ modules/l4socks/socks5_handler.go | 9 +++++++-- modules/l4subroute/handler.go | 12 ++++++++++++ modules/l4tee/tee.go | 4 ++++ modules/l4throttle/throttle.go | 4 ++++ modules/l4tls/handler.go | 4 ++++ 9 files changed, 46 insertions(+), 5 deletions(-) diff --git a/layer4/routes.go b/layer4/routes.go index bf137fa..8be8986 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -43,7 +43,7 @@ type Route struct { HandlersRaw []json.RawMessage `json:"handle,omitempty" caddy:"namespace=layer4.handlers inline_key=handler"` // Is set to true during Provision if any of the handlers IsTerminal. - terminal bool + Terminal bool matcherSets MatcherSets middleware []Middleware @@ -73,7 +73,7 @@ func (r *Route) Provision(ctx caddy.Context) error { handler := mod.(NextHandler) handlers = append(handlers, handler) if handler.IsTerminal() { - r.terminal = true + r.Terminal = true } } for _, midhandler := range handlers { @@ -142,7 +142,7 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger) Handler { } var handler Handler - if route.terminal { + if route.Terminal { handler = next for i := len(route.middleware) - 1; i >= 0; i-- { handler = route.middleware[i](handler) diff --git a/modules/l4echo/echo.go b/modules/l4echo/echo.go index ac80255..85dd179 100644 --- a/modules/l4echo/echo.go +++ b/modules/l4echo/echo.go @@ -42,5 +42,9 @@ func (Handler) Handle(cx *layer4.Connection, _ layer4.Handler) error { return err } +func (h Handler) IsTerminal() bool { + return true +} + // Interface guard var _ layer4.NextHandler = (*Handler)(nil) diff --git a/modules/l4proxy/proxy.go b/modules/l4proxy/proxy.go index 02160ce..9b075ac 100644 --- a/modules/l4proxy/proxy.go +++ b/modules/l4proxy/proxy.go @@ -184,6 +184,10 @@ func (h Handler) Handle(down *layer4.Connection, _ layer4.Handler) error { return nil } +func (h *Handler) IsTerminal() bool { + return true +} + func (h *Handler) dialPeers(upstream *Upstream, repl *caddy.Replacer, down *layer4.Connection) ([]net.Conn, error) { var upConns []net.Conn diff --git a/modules/l4proxyprotocol/handler.go b/modules/l4proxyprotocol/handler.go index caee671..4b66c16 100644 --- a/modules/l4proxyprotocol/handler.go +++ b/modules/l4proxyprotocol/handler.go @@ -164,6 +164,10 @@ func (h *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(cx.Wrap(conn)) } +func (h *Handler) IsTerminal() bool { + return false +} + // GetConn gets the connection which holds the information received from the PROXY protocol. func GetConn(cx *layer4.Connection) net.Conn { if val := cx.GetVar("l4.proxy_protocol.conn"); val != nil { diff --git a/modules/l4socks/socks5_handler.go b/modules/l4socks/socks5_handler.go index bfefeb5..d38e431 100644 --- a/modules/l4socks/socks5_handler.go +++ b/modules/l4socks/socks5_handler.go @@ -2,12 +2,13 @@ package l4socks import ( "fmt" + "net" + "strings" + "github.com/caddyserver/caddy/v2" "github.com/mholt/caddy-l4/layer4" "github.com/things-go/go-socks5" "go.uber.org/zap" - "net" - "strings" ) func init() { @@ -78,6 +79,10 @@ func (h *Socks5Handler) Handle(cx *layer4.Connection, _ layer4.Handler) error { return h.server.ServeConn(cx) } +func (h *Socks5Handler) IsTerminal() bool { + return true +} + var ( _ caddy.Provisioner = (*Socks5Handler)(nil) _ layer4.NextHandler = (*Socks5Handler)(nil) diff --git a/modules/l4subroute/handler.go b/modules/l4subroute/handler.go index 4a671c6..a29c312 100644 --- a/modules/l4subroute/handler.go +++ b/modules/l4subroute/handler.go @@ -64,6 +64,18 @@ func (h *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return subroute.Handle(cx) } +func (h *Handler) IsTerminal() bool { + // try to be clever but maybe this needs to be configurable? + terminal := true + for _, route := range h.Routes { + if !route.Terminal { + terminal = false + break + } + } + return terminal +} + // Interface guards var ( _ caddy.Provisioner = (*Handler)(nil) diff --git a/modules/l4tee/tee.go b/modules/l4tee/tee.go index 73b321c..84413f0 100644 --- a/modules/l4tee/tee.go +++ b/modules/l4tee/tee.go @@ -110,6 +110,10 @@ func (t Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(&nextc) } +func (h *Handler) IsTerminal() bool { + return false +} + // teeConn is a connection wrapper that reads // from a different reader. type teeConn struct { diff --git a/modules/l4throttle/throttle.go b/modules/l4throttle/throttle.go index f4454d3..0990bf9 100644 --- a/modules/l4throttle/throttle.go +++ b/modules/l4throttle/throttle.go @@ -114,6 +114,10 @@ func (h Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(cx) } +func (h Handler) IsTerminal() bool { + return false +} + type throttledConn struct { net.Conn ctx context.Context diff --git a/modules/l4tls/handler.go b/modules/l4tls/handler.go index 7c77412..a32605b 100644 --- a/modules/l4tls/handler.go +++ b/modules/l4tls/handler.go @@ -104,6 +104,10 @@ func (t *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(cx.Wrap(tlsConn)) } +func (t *Handler) IsTerminal() bool { + return false +} + func appendClientHello(cx *layer4.Connection, chi ClientHelloInfo) { var clientHellos []ClientHelloInfo if val := cx.GetVar("tls_client_hellos"); val != nil { From 102e264985b48ac622add468d134291cda6a58a6 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Fri, 3 May 2024 22:02:30 +0200 Subject: [PATCH 06/14] feat: configurable matching timeout --- layer4/listener.go | 9 ++++++++- layer4/routes.go | 4 ++-- layer4/server.go | 9 ++++++++- modules/l4subroute/handler.go | 10 +++++++++- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/layer4/listener.go b/layer4/listener.go index 1927a41..1f906bd 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -22,6 +22,9 @@ type ListenerWrapper struct { // Routes express composable logic for handling byte streams. Routes RouteList `json:"routes,omitempty"` + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 500ms. + MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` + compiledRoute Handler logger *zap.Logger @@ -41,11 +44,15 @@ func (lw *ListenerWrapper) Provision(ctx caddy.Context) error { lw.ctx = ctx lw.logger = ctx.Logger() + if lw.MatchingTimeout <= 0 { + lw.MatchingTimeout = caddy.Duration(500 * time.Millisecond) + } + err := lw.Routes.Provision(ctx) if err != nil { return err } - lw.compiledRoute = lw.Routes.Compile(listenerHandler{}, lw.logger) + lw.compiledRoute = lw.Routes.Compile(listenerHandler{}, lw.logger, time.Duration(lw.MatchingTimeout)) return nil } diff --git a/layer4/routes.go b/layer4/routes.go index 8be8986..bfcd8b7 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -104,9 +104,9 @@ func (routes RouteList) Provision(ctx caddy.Context) error { // Compile prepares a middleware chain from the route list. // This should only be done once: after all the routes have // been provisioned, and before the server loop begins. -func (routes RouteList) Compile(next Handler, logger *zap.Logger) Handler { +func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeout time.Duration) Handler { return HandlerFunc(func(cx *Connection) error { - deadline := time.Now().Add(100 * time.Millisecond) // TODO: make this configurable + deadline := time.Now().Add(matchingTimeout) router: // timeout matching to protect against malicious or very slow clients err := cx.Conn.SetReadDeadline(deadline) diff --git a/layer4/server.go b/layer4/server.go index 81395c4..6bc9ec5 100644 --- a/layer4/server.go +++ b/layer4/server.go @@ -35,6 +35,9 @@ type Server struct { // Routes express composable logic for handling byte streams. Routes RouteList `json:"routes,omitempty"` + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 500ms. + MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` + logger *zap.Logger listenAddrs []caddy.NetworkAddress compiledRoute Handler @@ -44,6 +47,10 @@ type Server struct { func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error { s.logger = logger + if s.MatchingTimeout <= 0 { + s.MatchingTimeout = caddy.Duration(500 * time.Millisecond) + } + for i, address := range s.Listen { addr, err := caddy.ParseNetworkAddress(address) if err != nil { @@ -56,7 +63,7 @@ func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error { if err != nil { return err } - s.compiledRoute = s.Routes.Compile(nopHandler{}, s.logger) + s.compiledRoute = s.Routes.Compile(nopHandler{}, s.logger, time.Duration(s.MatchingTimeout)) return nil } diff --git a/modules/l4subroute/handler.go b/modules/l4subroute/handler.go index a29c312..f4ab200 100644 --- a/modules/l4subroute/handler.go +++ b/modules/l4subroute/handler.go @@ -16,6 +16,7 @@ package l4subroute import ( "fmt" + "time" "github.com/caddyserver/caddy/v2" "github.com/mholt/caddy-l4/layer4" @@ -34,6 +35,9 @@ type Handler struct { // The primary list of routes to compile and execute. Routes layer4.RouteList `json:"routes,omitempty"` + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 500ms. + MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` + logger *zap.Logger } @@ -49,6 +53,10 @@ func (Handler) CaddyModule() caddy.ModuleInfo { func (h *Handler) Provision(ctx caddy.Context) error { h.logger = ctx.Logger(h) + if h.MatchingTimeout <= 0 { + h.MatchingTimeout = caddy.Duration(500 * time.Millisecond) + } + if h.Routes != nil { err := h.Routes.Provision(ctx) if err != nil { @@ -60,7 +68,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // Handle handles the connections. func (h *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { - subroute := h.Routes.Compile(next, h.logger) + subroute := h.Routes.Compile(next, h.logger, time.Duration(h.MatchingTimeout)) return subroute.Handle(cx) } From 6b17ab3b0398e42cff0874d1bc51881e66eedc78 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Mon, 6 May 2024 16:30:25 +0200 Subject: [PATCH 07/14] fix: increase default matching timeout 500ms is too low for tls handshake of far away servers. --- layer4/listener.go | 4 ++-- layer4/server.go | 4 ++-- modules/l4subroute/handler.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/layer4/listener.go b/layer4/listener.go index 1f906bd..eb93bb7 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -22,7 +22,7 @@ type ListenerWrapper struct { // Routes express composable logic for handling byte streams. Routes RouteList `json:"routes,omitempty"` - // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 500ms. + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 3s. MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` compiledRoute Handler @@ -45,7 +45,7 @@ func (lw *ListenerWrapper) Provision(ctx caddy.Context) error { lw.logger = ctx.Logger() if lw.MatchingTimeout <= 0 { - lw.MatchingTimeout = caddy.Duration(500 * time.Millisecond) + lw.MatchingTimeout = caddy.Duration(3 * time.Second) } err := lw.Routes.Provision(ctx) diff --git a/layer4/server.go b/layer4/server.go index 6bc9ec5..60846aa 100644 --- a/layer4/server.go +++ b/layer4/server.go @@ -35,7 +35,7 @@ type Server struct { // Routes express composable logic for handling byte streams. Routes RouteList `json:"routes,omitempty"` - // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 500ms. + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 3s. MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` logger *zap.Logger @@ -48,7 +48,7 @@ func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error { s.logger = logger if s.MatchingTimeout <= 0 { - s.MatchingTimeout = caddy.Duration(500 * time.Millisecond) + s.MatchingTimeout = caddy.Duration(3 * time.Second) } for i, address := range s.Listen { diff --git a/modules/l4subroute/handler.go b/modules/l4subroute/handler.go index f4ab200..416af57 100644 --- a/modules/l4subroute/handler.go +++ b/modules/l4subroute/handler.go @@ -35,7 +35,7 @@ type Handler struct { // The primary list of routes to compile and execute. Routes layer4.RouteList `json:"routes,omitempty"` - // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 500ms. + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 3s. MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` logger *zap.Logger @@ -54,7 +54,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.logger = ctx.Logger(h) if h.MatchingTimeout <= 0 { - h.MatchingTimeout = caddy.Duration(500 * time.Millisecond) + h.MatchingTimeout = caddy.Duration(3 * time.Second) } if h.Routes != nil { From 571dbce1b15b766471f6bc44889da2c830843af2 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Mon, 6 May 2024 21:32:12 +0200 Subject: [PATCH 08/14] refactor: improve readability of matched route code path --- layer4/routes.go | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/layer4/routes.go b/layer4/routes.go index bfcd8b7..7ad7b51 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -125,7 +125,7 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeou return nil // return nil so the error does not get logged again } for _, route := range routes { - // route must match at least one of the matcher sets + // A route must match at least one of the matcher sets matched, err := route.matcherSets.AnyMatch(cx) if errors.Is(err, ErrConsumedAllPrefetchedBytes) { continue // ignore and try next route @@ -141,28 +141,31 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeou return err } + lastHandler := HandlerFunc(func(conn *Connection) error { + // Catch potentially wrapped connection to use it as input for the next round of route matching. + // This is for example required for matchers after a tls handler. + cx = conn + // call the default/fallback handler + return next.Handle(conn) + }) var handler Handler + handler = &lastHandler + // compile the route handler stack with lastHandler being called last + for i := len(route.middleware) - 1; i >= 0; i-- { + handler = route.middleware[i](handler) + } + err = handler.Handle(cx) + if err != nil { + return err + } + + // If matched route is terminal we stop routing, + // otherwise we jump back to the start of the routing loop to peel of more protocol layers. if route.Terminal { - handler = next - for i := len(route.middleware) - 1; i >= 0; i-- { - handler = route.middleware[i](handler) - } - return handler.Handle(cx) + return nil } else { - // Catch potentially wrapped connection to use it as input for next round of route matching. - // This is for example required for matchers after a tls handler. - catcher := HandlerFunc(func(conn *Connection) error { - cx = conn - return nil - }) - handler = &catcher - for i := len(route.middleware) - 1; i >= 0; i-- { - handler = route.middleware[i](handler) - } - err = handler.Handle(cx) - if err != nil { - return err - } + // Reset buffer and offsets after a handler was executed, + // we assume it consumed the buffer and maybe even wrote to the connection. cx.clear() goto router } From 59375bbbcf568c821745c2d293d01380db9d2a83 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Mon, 6 May 2024 22:23:46 +0200 Subject: [PATCH 09/14] feat: auto reset buf after it got consumed & allow handler to partially consume buf Not sure if partially consumed buf after handler can happen in real life because most non terminal handlers do some kind of handshake and thus would also write to the connection. But with this it should work. --- layer4/connection.go | 32 +++++++++++++++----------------- layer4/routes.go | 3 --- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/layer4/connection.go b/layer4/connection.go index ce6d6cf..b4fd8e4 100644 --- a/layer4/connection.go +++ b/layer4/connection.go @@ -65,9 +65,10 @@ type Connection struct { Logger *zap.Logger - buf []byte // stores matching data - offset int - matching bool + buf []byte // stores matching data + offset int + frozenOffset int + matching bool bytesRead, bytesWritten uint64 } @@ -80,18 +81,22 @@ var ErrMatchingBufferFull = errors.New("matching buffer is full") // and once depleted (or if there isn't one), it continues // reading from the underlying connection. func (cx *Connection) Read(p []byte) (n int, err error) { - if cx.matching { - if len(cx.buf) == 0 || len(cx.buf[cx.offset:]) == 0 { - return 0, ErrConsumedAllPrefetchedBytes - } + // if we are matching and consumed the buffer exit with error + if cx.matching && (len(cx.buf) == 0 || len(cx.buf) == cx.offset) { + return 0, ErrConsumedAllPrefetchedBytes } // if there is a buffer we should read from, start // with that; we only read from the underlying conn // after the buffer has been "depleted" - if cx.offset < len(cx.buf) { + if len(cx.buf) > 0 && cx.offset < len(cx.buf) { n := copy(p, cx.buf[cx.offset:]) cx.offset += n + if !cx.matching && cx.offset == len(cx.buf) { + // if we are not in matching mode reset buf automatically after it was consumed + cx.offset = 0 + cx.buf = cx.buf[:0] + } return n, nil } @@ -174,21 +179,14 @@ func (cx *Connection) prefetch() (err error) { // freeze activates the matching mode that only reads from cx.buf. func (cx *Connection) freeze() { cx.matching = true + cx.frozenOffset = cx.offset } // unfreeze stops the matching mode and resets the buffer offset // so that the next reads come from the buffer first. func (cx *Connection) unfreeze() { cx.matching = false - cx.offset = 0 -} - -// clear discards cx.buf and resets cx.offset to prepare the connection for the next prefetch & matching phase -func (cx *Connection) clear() { - if cx.buf != nil { - cx.buf = cx.buf[:0] - } - cx.offset = 0 + cx.offset = cx.frozenOffset } // SetVar sets a value in the context's variable table with diff --git a/layer4/routes.go b/layer4/routes.go index 7ad7b51..368c733 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -164,9 +164,6 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeou if route.Terminal { return nil } else { - // Reset buffer and offsets after a handler was executed, - // we assume it consumed the buffer and maybe even wrote to the connection. - cx.clear() goto router } } From 1cc4504e7b7c0ca9e78099ad7c4aa0222d978cb7 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Mon, 6 May 2024 22:25:03 +0200 Subject: [PATCH 10/14] test: fix http matcher & routes test --- layer4/routes_test.go | 42 +++--- modules/l4http/httpmatcher_test.go | 216 ++++++++++++++--------------- 2 files changed, 129 insertions(+), 129 deletions(-) diff --git a/layer4/routes_test.go b/layer4/routes_test.go index 0983231..62c9903 100644 --- a/layer4/routes_test.go +++ b/layer4/routes_test.go @@ -9,17 +9,10 @@ import ( "github.com/caddyserver/caddy/v2" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" ) -type sentinelHandler struct { - Called bool -} - -func (h *sentinelHandler) Handle(_ *Connection) error { - h.Called = true - return nil -} - func TestMatchingTimeoutWorks(t *testing.T) { ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) defer cancel() @@ -31,8 +24,12 @@ func TestMatchingTimeoutWorks(t *testing.T) { t.Fatalf("provision failed | %s", err) } - sentinel := &sentinelHandler{} - compiledRoutes := routes.Compile(sentinel, zap.NewNop(), 5*time.Millisecond) + matched := false + loggerCore, logs := observer.New(zapcore.WarnLevel) + compiledRoutes := routes.Compile(HandlerFunc(func(con *Connection) error { + matched = true + return nil + }), zap.New(loggerCore), 5*time.Millisecond) in, out := net.Pipe() defer in.Close() @@ -42,16 +39,27 @@ func TestMatchingTimeoutWorks(t *testing.T) { defer cx.Close() err = compiledRoutes.Handle(cx) - if err == nil { - t.Fatalf("missing error") + if err != nil { + t.Fatalf("handle failed | %s", err) } - if !errors.Is(err, ErrMatchingTimeout) { - t.Fatalf("unexpected handler error | %v", err) + // verify the matching aborted error was logged + if logs.Len() != 1 { + t.Fatalf("logs should contain 1 entry but has %d", logs.Len()) + } + logEntry := logs.All()[0] + if logEntry.Level != zapcore.WarnLevel { + t.Fatalf("wrong log level | %s", logEntry.Level) + } + if logEntry.Message != "matching connection" { + t.Fatalf("wrong log message | %s", logEntry.Message) + } + if !(logEntry.Context[1].Key == "error" && errors.Is(logEntry.Context[1].Interface.(error), ErrMatchingTimeout)) { + t.Fatalf("wrong error | %v", logEntry.Context[1].Interface) } // since matching failed no handler should be called - if sentinel.Called != false { - t.Fatal("sentinel handler was called but should not") + if matched { + t.Fatal("handler was called but should not") } } diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index 3a2605a..dd0d045 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -5,15 +5,14 @@ import ( "crypto/tls" "encoding/base64" "encoding/json" - "errors" - "io" "net" - "sync" "testing" + "time" "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/mholt/caddy-l4/layer4" + _ "github.com/mholt/caddy-l4/modules/l4echo" "go.uber.org/zap" ) @@ -24,22 +23,13 @@ func assertNoError(t *testing.T, err error) { } } -func httpMatchTester(t *testing.T, matcherSets caddyhttp.RawMatcherSets, data []byte) (bool, error) { - wg := &sync.WaitGroup{} +func httpMatchTester(t *testing.T, matchers json.RawMessage, data []byte) (bool, error) { in, out := net.Pipe() - defer func() { - wg.Wait() - _ = in.Close() - _ = out.Close() - }() + defer in.Close() + defer out.Close() cx := layer4.WrapConnection(in, make([]byte, 0, layer4.PrefetchChunkSize), zap.NewNop()) go func() { - wg.Add(1) - defer func() { - wg.Done() - _ = out.Close() - }() _, err := out.Write(data) assertNoError(t, err) }() @@ -47,14 +37,22 @@ func httpMatchTester(t *testing.T, matcherSets caddyhttp.RawMatcherSets, data [] ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) defer cancel() - matcher := MatchHTTP{MatcherSetsRaw: matcherSets} - err := matcher.Provision(ctx) + routes := layer4.RouteList{&layer4.Route{ + MatcherSetsRaw: caddyhttp.RawMatcherSets{ + caddy.ModuleMap{"http": matchers}, + }, + }} + err := routes.Provision(ctx) assertNoError(t, err) - mset := layer4.MatcherSet{matcher} // use MatcherSet to correctly call freeze() before matching - matched, err := mset.Match(cx) + matched := false + compiledRoute := routes.Compile(layer4.HandlerFunc(func(con *layer4.Connection) error { + matched = true + return nil + }), zap.NewNop(), 10*time.Millisecond) // FIXME: routes without handlers are not terminal thus each test runs at least this long - _, _ = io.Copy(io.Discard, in) + err = compiledRoute.Handle(cx) + assertNoError(t, err) return matched, err } @@ -63,43 +61,43 @@ func TestHttp1Matching(t *testing.T) { http1RequestExample := []byte("GET /foo/bar?aaa=bbb HTTP/1.1\nHost: localhost:10443\nUser-Agent: curl/7.82.0\nAccept: */*\n\n") for _, tc := range []struct { - name string - matcherSets caddyhttp.RawMatcherSets - data []byte + name string + matchers json.RawMessage + data []byte }{ { - name: "match-by-host", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"host": json.RawMessage("[\"localhost\"]")}}, - data: http1RequestExample, + name: "match-by-host", + matchers: json.RawMessage("[{\"host\":[\"localhost\"]}]"), + data: http1RequestExample, }, { - name: "match-by-method", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"method": json.RawMessage("[\"GET\"]")}}, - data: http1RequestExample, + name: "match-by-method", + matchers: json.RawMessage("[{\"method\":[\"GET\"]}]"), + data: http1RequestExample, }, { - name: "match-by-path", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"path": json.RawMessage("[\"/foo/bar\"]")}}, - data: http1RequestExample, + name: "match-by-path", + matchers: json.RawMessage("[{\"path\":[\"/foo/bar\"]}]"), + data: http1RequestExample, }, { - name: "match-by-query", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"query": json.RawMessage("{\"aaa\":[\"bbb\"]}")}}, - data: http1RequestExample, + name: "match-by-query", + matchers: json.RawMessage("[{\"query\":{\"aaa\":[\"bbb\"]}}]"), + data: http1RequestExample, }, { - name: "match-by-header", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"header": json.RawMessage("{\"user-agent\":[\"curl*\"]}")}}, - data: http1RequestExample, + name: "match-by-header", + matchers: json.RawMessage("[{\"header\":{\"user-agent\":[\"curl*\"]}}]"), + data: http1RequestExample, }, { - name: "match-by-protocol", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"protocol": json.RawMessage("\"http\"")}}, - data: http1RequestExample, + name: "match-by-protocol", + matchers: json.RawMessage("[{\"protocol\":\"http\"}]"), + data: http1RequestExample, }, } { t.Run(tc.name, func(t *testing.T) { - matched, err := httpMatchTester(t, tc.matcherSets, tc.data) + matched, err := httpMatchTester(t, tc.matchers, tc.data) assertNoError(t, err) if !matched { t.Errorf("matcher did not match") @@ -116,74 +114,74 @@ func TestHttp2Matching(t *testing.T) { assertNoError(t, err) for _, tc := range []struct { - name string - matcherSets caddyhttp.RawMatcherSets - data []byte + name string + matchers json.RawMessage + data []byte }{ { - name: "match-by-host", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"host": json.RawMessage("[\"localhost\"]")}}, - data: http2PriorKnowledgeRequestExample, + name: "match-by-host", + matchers: json.RawMessage("[{\"host\":[\"localhost\"]}]"), + data: http2PriorKnowledgeRequestExample, }, { - name: "match-by-method", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"method": json.RawMessage("[\"GET\"]")}}, - data: http2PriorKnowledgeRequestExample, + name: "match-by-method", + matchers: json.RawMessage("[{\"method\":[\"GET\"]}]"), + data: http2PriorKnowledgeRequestExample, }, { - name: "match-by-path", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"path": json.RawMessage("[\"/foo/bar\"]")}}, - data: http2PriorKnowledgeRequestExample, + name: "match-by-path", + matchers: json.RawMessage("[{\"path\":[\"/foo/bar\"]}]"), + data: http2PriorKnowledgeRequestExample, }, { - name: "match-by-query", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"query": json.RawMessage("{\"aaa\":[\"bbb\"]}")}}, - data: http2PriorKnowledgeRequestExample, + name: "match-by-query", + matchers: json.RawMessage("[{\"query\":{\"aaa\":[\"bbb\"]}}]"), + data: http2PriorKnowledgeRequestExample, }, { - name: "match-by-header", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"header": json.RawMessage("{\"user-agent\":[\"curl*\"]}")}}, - data: http2PriorKnowledgeRequestExample, + name: "match-by-header", + matchers: json.RawMessage("[{\"header\":{\"user-agent\":[\"curl*\"]}}]"), + data: http2PriorKnowledgeRequestExample, }, { - name: "match-by-protocol", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"protocol": json.RawMessage("\"http\"")}}, - data: http2PriorKnowledgeRequestExample, + name: "match-by-protocol", + matchers: json.RawMessage("[{\"protocol\":\"http\"}]"), + data: http2PriorKnowledgeRequestExample, }, { - name: "upgrade-match-by-host", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"host": json.RawMessage("[\"localhost\"]")}}, - data: http2UpgradeRequestExample, + name: "upgrade-match-by-host", + matchers: json.RawMessage("[{\"host\":[\"localhost\"]}]"), + data: http2UpgradeRequestExample, }, { - name: "upgrade-match-by-method", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"method": json.RawMessage("[\"GET\"]")}}, - data: http2UpgradeRequestExample, + name: "upgrade-match-by-method", + matchers: json.RawMessage("[{\"method\":[\"GET\"]}]"), + data: http2UpgradeRequestExample, }, { - name: "upgrade-match-by-path", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"path": json.RawMessage("[\"/foo/bar\"]")}}, - data: http2UpgradeRequestExample, + name: "upgrade-match-by-path", + matchers: json.RawMessage("[{\"path\":[\"/foo/bar\"]}]"), + data: http2UpgradeRequestExample, }, { - name: "upgrade-match-by-query", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"query": json.RawMessage("{\"aaa\":[\"bbb\"]}")}}, - data: http2UpgradeRequestExample, + name: "upgrade-match-by-query", + matchers: json.RawMessage("[{\"query\":{\"aaa\":[\"bbb\"]}}]"), + data: http2UpgradeRequestExample, }, { - name: "upgrade-match-by-header", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"header": json.RawMessage("{\"user-agent\":[\"curl*\"]}")}}, - data: http2UpgradeRequestExample, + name: "upgrade-match-by-header", + matchers: json.RawMessage("[{\"header\":{\"user-agent\":[\"curl*\"]}}]"), + data: http2UpgradeRequestExample, }, { - name: "upgrade-match-by-protocol", - matcherSets: caddyhttp.RawMatcherSets{caddy.ModuleMap{"protocol": json.RawMessage("\"http\"")}}, - data: http2UpgradeRequestExample, + name: "upgrade-match-by-protocol", + matchers: json.RawMessage("[{\"protocol\":\"http\"}]"), + data: http2UpgradeRequestExample, }, } { t.Run(tc.name, func(t *testing.T) { - matched, err := httpMatchTester(t, tc.matcherSets, tc.data) + matched, err := httpMatchTester(t, tc.matchers, tc.data) assertNoError(t, err) if !matched { t.Errorf("matcher did not match") @@ -193,23 +191,30 @@ func TestHttp2Matching(t *testing.T) { } func TestHttpMatchingByProtocolWithHttps(t *testing.T) { - matcherSets := caddyhttp.RawMatcherSets{caddy.ModuleMap{"protocol": json.RawMessage("\"https\"")}} + ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) + defer cancel() + + routes := layer4.RouteList{&layer4.Route{ + MatcherSetsRaw: caddyhttp.RawMatcherSets{ + caddy.ModuleMap{"http": json.RawMessage("[{\"protocol\":\"https\"}]")}, + }, + }} + + err := routes.Provision(ctx) + assertNoError(t, err) + + handlerCalled := false + compiledRoute := routes.Compile(layer4.HandlerFunc(func(con *layer4.Connection) error { + handlerCalled = true + return nil + }), zap.NewNop(), 100*time.Millisecond) - wg := &sync.WaitGroup{} in, out := net.Pipe() - defer func() { - wg.Wait() - _ = in.Close() - _ = out.Close() - }() + defer in.Close() + defer out.Close() cx := layer4.WrapConnection(in, []byte{}, zap.NewNop()) go func() { - wg.Add(1) - defer func() { - wg.Done() - _ = out.Close() - }() _, err := out.Write([]byte("GET /foo/bar?aaa=bbb HTTP/1.1\nHost: localhost:10443\n\n")) assertNoError(t, err) }() @@ -217,27 +222,17 @@ func TestHttpMatchingByProtocolWithHttps(t *testing.T) { // pretend the tls handler was executed before, not an ideal test setup but better then nothing cx.SetVar("tls_connection_states", []*tls.ConnectionState{{ServerName: "localhost"}}) - ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) - defer cancel() - - matcher := MatchHTTP{MatcherSetsRaw: matcherSets} - err := matcher.Provision(ctx) - assertNoError(t, err) - - mset := layer4.MatcherSet{matcher} // use MatcherSet to correctly call freeze() before matching - matched, err := mset.Match(cx) + err = compiledRoute.Handle(cx) assertNoError(t, err) - if !matched { + if !handlerCalled { t.Fatalf("matcher did not match") } - - _, _ = io.Copy(io.Discard, in) } func TestHttpMatchingGarbage(t *testing.T) { - matcherSets := caddyhttp.RawMatcherSets{caddy.ModuleMap{"host": json.RawMessage("[\"localhost\"]")}} + matchers := json.RawMessage("[{\"host\":[\"localhost\"]}]") - matched, err := httpMatchTester(t, matcherSets, []byte("not a valid http request")) + matched, err := httpMatchTester(t, matchers, []byte("not a valid http request")) assertNoError(t, err) if matched { t.Fatalf("matcher did match") @@ -245,13 +240,10 @@ func TestHttpMatchingGarbage(t *testing.T) { validHttp2MagicWithoutHeadersFrame, err := base64.StdEncoding.DecodeString("UFJJICogSFRUUC8yLjANCg0KU00NCg0KAAASBAAAAAAAAAMAAABkAAQCAAAAAAIAAAAATm8gbG9uZ2VyIHZhbGlkIGh0dHAyIHJlcXVlc3QgZnJhbWVz") assertNoError(t, err) - matched, err = httpMatchTester(t, matcherSets, validHttp2MagicWithoutHeadersFrame) + matched, err = httpMatchTester(t, matchers, validHttp2MagicWithoutHeadersFrame) if matched { t.Fatalf("matcher did match") } - if !errors.Is(err, layer4.ErrConsumedAllPrefetchedBytes) { - t.Fatalf("handler did not return an error or the wrong error -> %v", err) - } } func TestMatchHTTP_isHttp(t *testing.T) { From ef3ab679a958d11f6e5faed64fbec57b4291eb51 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Wed, 8 May 2024 14:47:53 +0200 Subject: [PATCH 11/14] refactor: remove err from MatchingBytes --- layer4/connection.go | 4 ++-- modules/l4http/httpmatcher.go | 2 +- modules/l4http/httpmatcher_test.go | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/layer4/connection.go b/layer4/connection.go index b4fd8e4..ec149df 100644 --- a/layer4/connection.go +++ b/layer4/connection.go @@ -213,8 +213,8 @@ func (cx *Connection) GetVar(key string) interface{} { // MatchingBytes returns all bytes currently available for matching. This is only intended for reading. // Do not write into the slice it's a view of the internal buffer and you will likely mess up the connection. -func (cx *Connection) MatchingBytes() ([]byte, error) { - return cx.buf[cx.offset:], nil +func (cx *Connection) MatchingBytes() []byte { + return cx.buf[cx.offset:] } var ( diff --git a/modules/l4http/httpmatcher.go b/modules/l4http/httpmatcher.go index 2036137..c80fa7b 100644 --- a/modules/l4http/httpmatcher.go +++ b/modules/l4http/httpmatcher.go @@ -81,7 +81,7 @@ func (m MatchHTTP) Match(cx *layer4.Connection) (bool, error) { if !ok { var err error - data, err := cx.MatchingBytes() + data := cx.MatchingBytes() if !m.isHttp(data) { return false, nil } diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index dd0d045..9ecafd4 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -12,7 +12,6 @@ import ( "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/modules/caddyhttp" "github.com/mholt/caddy-l4/layer4" - _ "github.com/mholt/caddy-l4/modules/l4echo" "go.uber.org/zap" ) From 9a1883ea33fd7a99b4f0aa72419b9a5f3d539662 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Wed, 8 May 2024 14:54:01 +0200 Subject: [PATCH 12/14] refactor: remove IsTerminal() from Handler --- layer4/handlers.go | 13 ++++++++++--- layer4/routes.go | 21 ++++++++------------- layer4/routes_test.go | 4 ++-- layer4/server.go | 2 +- modules/l4echo/echo.go | 4 ---- modules/l4http/httpmatcher_test.go | 6 +++--- modules/l4proxy/proxy.go | 4 ---- modules/l4proxyprotocol/handler.go | 4 ---- modules/l4socks/socks5_handler.go | 4 ---- modules/l4subroute/handler.go | 16 +++------------- modules/l4tee/tee.go | 4 ---- modules/l4throttle/throttle.go | 4 ---- modules/l4tls/handler.go | 4 ---- 13 files changed, 27 insertions(+), 63 deletions(-) diff --git a/layer4/handlers.go b/layer4/handlers.go index b095a36..d41e5dd 100644 --- a/layer4/handlers.go +++ b/layer4/handlers.go @@ -35,8 +35,6 @@ func (h Handlers) Compile() Handler { // as part of a middleware chain. type NextHandler interface { Handle(*Connection, Handler) error - // Must return true if this is the last handler in a handler chain. - IsTerminal() bool } // Handler is a type that can handle connections. @@ -62,6 +60,11 @@ type HandlerFunc func(*Connection) error // Handle handles a connection; it implements the Handler interface. func (h HandlerFunc) Handle(cx *Connection) error { return h(cx) } +// NextHandlerFunc can turn a function into a NextHandler type. +type NextHandlerFunc func(cx *Connection, next Handler) error + +func (h NextHandlerFunc) Handle(cx *Connection, next Handler) error { return h(cx, next) } + // nopHandler is a connection handler that does nothing with the // connection, not even reading from it; it simply returns. It is // the default end of all handler chains. @@ -77,9 +80,13 @@ type nopHandler struct{} func (nopHandler) Handle(_ *Connection) error { return nil } +type nopNextHandler struct{} + +func (nopNextHandler) Handle(cx *Connection, next Handler) error { return next.Handle(cx) } + // listenerHandler is a connection handler that pipe incoming connection to channel as a listener wrapper type listenerHandler struct{} -func (listenerHandler) Handle(conn *Connection) error { +func (listenerHandler) Handle(conn *Connection, _ Handler) error { return conn.Context.Value(listenerCtxKey).(*listener).pipeConnection(conn) } diff --git a/layer4/routes.go b/layer4/routes.go index 368c733..d3ef20f 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -42,9 +42,6 @@ type Route struct { // executed in sequential order if the route's matchers match. HandlersRaw []json.RawMessage `json:"handle,omitempty" caddy:"namespace=layer4.handlers inline_key=handler"` - // Is set to true during Provision if any of the handlers IsTerminal. - Terminal bool - matcherSets MatcherSets middleware []Middleware } @@ -72,9 +69,6 @@ func (r *Route) Provision(ctx caddy.Context) error { for _, mod := range mods.([]interface{}) { handler := mod.(NextHandler) handlers = append(handlers, handler) - if handler.IsTerminal() { - r.Terminal = true - } } for _, midhandler := range handlers { r.middleware = append(r.middleware, wrapHandler(midhandler)) @@ -104,7 +98,7 @@ func (routes RouteList) Provision(ctx caddy.Context) error { // Compile prepares a middleware chain from the route list. // This should only be done once: after all the routes have // been provisioned, and before the server loop begins. -func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeout time.Duration) Handler { +func (routes RouteList) Compile(next NextHandler, logger *zap.Logger, matchingTimeout time.Duration) Handler { return HandlerFunc(func(cx *Connection) error { deadline := time.Now().Add(matchingTimeout) router: @@ -141,16 +135,17 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeou return err } + isTerminal := true lastHandler := HandlerFunc(func(conn *Connection) error { // Catch potentially wrapped connection to use it as input for the next round of route matching. // This is for example required for matchers after a tls handler. cx = conn - // call the default/fallback handler - return next.Handle(conn) + // If this handler is called all handlers before where not terminal + isTerminal = false + return nil }) - var handler Handler - handler = &lastHandler // compile the route handler stack with lastHandler being called last + handler := wrapHandler(next)(lastHandler) for i := len(route.middleware) - 1; i >= 0; i-- { handler = route.middleware[i](handler) } @@ -159,9 +154,9 @@ func (routes RouteList) Compile(next Handler, logger *zap.Logger, matchingTimeou return err } - // If matched route is terminal we stop routing, + // If handler is terminal we stop routing, // otherwise we jump back to the start of the routing loop to peel of more protocol layers. - if route.Terminal { + if isTerminal { return nil } else { goto router diff --git a/layer4/routes_test.go b/layer4/routes_test.go index 62c9903..86021bf 100644 --- a/layer4/routes_test.go +++ b/layer4/routes_test.go @@ -26,9 +26,9 @@ func TestMatchingTimeoutWorks(t *testing.T) { matched := false loggerCore, logs := observer.New(zapcore.WarnLevel) - compiledRoutes := routes.Compile(HandlerFunc(func(con *Connection) error { + compiledRoutes := routes.Compile(NextHandlerFunc(func(con *Connection, next Handler) error { matched = true - return nil + return next.Handle(con) }), zap.New(loggerCore), 5*time.Millisecond) in, out := net.Pipe() diff --git a/layer4/server.go b/layer4/server.go index 60846aa..24138fb 100644 --- a/layer4/server.go +++ b/layer4/server.go @@ -63,7 +63,7 @@ func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error { if err != nil { return err } - s.compiledRoute = s.Routes.Compile(nopHandler{}, s.logger, time.Duration(s.MatchingTimeout)) + s.compiledRoute = s.Routes.Compile(nopNextHandler{}, s.logger, time.Duration(s.MatchingTimeout)) return nil } diff --git a/modules/l4echo/echo.go b/modules/l4echo/echo.go index 85dd179..ac80255 100644 --- a/modules/l4echo/echo.go +++ b/modules/l4echo/echo.go @@ -42,9 +42,5 @@ func (Handler) Handle(cx *layer4.Connection, _ layer4.Handler) error { return err } -func (h Handler) IsTerminal() bool { - return true -} - // Interface guard var _ layer4.NextHandler = (*Handler)(nil) diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index 9ecafd4..8b0438d 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -45,10 +45,10 @@ func httpMatchTester(t *testing.T, matchers json.RawMessage, data []byte) (bool, assertNoError(t, err) matched := false - compiledRoute := routes.Compile(layer4.HandlerFunc(func(con *layer4.Connection) error { + compiledRoute := routes.Compile(layer4.NextHandlerFunc(func(con *layer4.Connection, _ layer4.Handler) error { matched = true return nil - }), zap.NewNop(), 10*time.Millisecond) // FIXME: routes without handlers are not terminal thus each test runs at least this long + }), zap.NewNop(), 10*time.Millisecond) err = compiledRoute.Handle(cx) assertNoError(t, err) @@ -203,7 +203,7 @@ func TestHttpMatchingByProtocolWithHttps(t *testing.T) { assertNoError(t, err) handlerCalled := false - compiledRoute := routes.Compile(layer4.HandlerFunc(func(con *layer4.Connection) error { + compiledRoute := routes.Compile(layer4.NextHandlerFunc(func(con *layer4.Connection, _ layer4.Handler) error { handlerCalled = true return nil }), zap.NewNop(), 100*time.Millisecond) diff --git a/modules/l4proxy/proxy.go b/modules/l4proxy/proxy.go index 9b075ac..02160ce 100644 --- a/modules/l4proxy/proxy.go +++ b/modules/l4proxy/proxy.go @@ -184,10 +184,6 @@ func (h Handler) Handle(down *layer4.Connection, _ layer4.Handler) error { return nil } -func (h *Handler) IsTerminal() bool { - return true -} - func (h *Handler) dialPeers(upstream *Upstream, repl *caddy.Replacer, down *layer4.Connection) ([]net.Conn, error) { var upConns []net.Conn diff --git a/modules/l4proxyprotocol/handler.go b/modules/l4proxyprotocol/handler.go index 4b66c16..caee671 100644 --- a/modules/l4proxyprotocol/handler.go +++ b/modules/l4proxyprotocol/handler.go @@ -164,10 +164,6 @@ func (h *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(cx.Wrap(conn)) } -func (h *Handler) IsTerminal() bool { - return false -} - // GetConn gets the connection which holds the information received from the PROXY protocol. func GetConn(cx *layer4.Connection) net.Conn { if val := cx.GetVar("l4.proxy_protocol.conn"); val != nil { diff --git a/modules/l4socks/socks5_handler.go b/modules/l4socks/socks5_handler.go index d38e431..8bb1c33 100644 --- a/modules/l4socks/socks5_handler.go +++ b/modules/l4socks/socks5_handler.go @@ -79,10 +79,6 @@ func (h *Socks5Handler) Handle(cx *layer4.Connection, _ layer4.Handler) error { return h.server.ServeConn(cx) } -func (h *Socks5Handler) IsTerminal() bool { - return true -} - var ( _ caddy.Provisioner = (*Socks5Handler)(nil) _ layer4.NextHandler = (*Socks5Handler)(nil) diff --git a/modules/l4subroute/handler.go b/modules/l4subroute/handler.go index 416af57..4261326 100644 --- a/modules/l4subroute/handler.go +++ b/modules/l4subroute/handler.go @@ -68,22 +68,12 @@ func (h *Handler) Provision(ctx caddy.Context) error { // Handle handles the connections. func (h *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { - subroute := h.Routes.Compile(next, h.logger, time.Duration(h.MatchingTimeout)) + subroute := h.Routes.Compile(layer4.NextHandlerFunc(func(cx *layer4.Connection, _ layer4.Handler) error { + return next.Handle(cx) // continue with original chain after subroute + }), h.logger, time.Duration(h.MatchingTimeout)) return subroute.Handle(cx) } -func (h *Handler) IsTerminal() bool { - // try to be clever but maybe this needs to be configurable? - terminal := true - for _, route := range h.Routes { - if !route.Terminal { - terminal = false - break - } - } - return terminal -} - // Interface guards var ( _ caddy.Provisioner = (*Handler)(nil) diff --git a/modules/l4tee/tee.go b/modules/l4tee/tee.go index 84413f0..73b321c 100644 --- a/modules/l4tee/tee.go +++ b/modules/l4tee/tee.go @@ -110,10 +110,6 @@ func (t Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(&nextc) } -func (h *Handler) IsTerminal() bool { - return false -} - // teeConn is a connection wrapper that reads // from a different reader. type teeConn struct { diff --git a/modules/l4throttle/throttle.go b/modules/l4throttle/throttle.go index 0990bf9..f4454d3 100644 --- a/modules/l4throttle/throttle.go +++ b/modules/l4throttle/throttle.go @@ -114,10 +114,6 @@ func (h Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(cx) } -func (h Handler) IsTerminal() bool { - return false -} - type throttledConn struct { net.Conn ctx context.Context diff --git a/modules/l4tls/handler.go b/modules/l4tls/handler.go index a32605b..7c77412 100644 --- a/modules/l4tls/handler.go +++ b/modules/l4tls/handler.go @@ -104,10 +104,6 @@ func (t *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { return next.Handle(cx.Wrap(tlsConn)) } -func (t *Handler) IsTerminal() bool { - return false -} - func appendClientHello(cx *layer4.Connection, chi ClientHelloInfo) { var clientHellos []ClientHelloInfo if val := cx.GetVar("tls_client_hellos"); val != nil { From 8b58826b18da7345b06906833d8df80913a0260d Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Thu, 9 May 2024 14:06:55 +0200 Subject: [PATCH 13/14] refactor: const MatchingTimeoutDefault & NextHandler as last arg --- layer4/listener.go | 4 ++-- layer4/routes.go | 2 +- layer4/routes_test.go | 9 +++++---- layer4/server.go | 6 ++++-- modules/l4http/httpmatcher_test.go | 18 ++++++++++-------- modules/l4subroute/handler.go | 9 +++++---- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/layer4/listener.go b/layer4/listener.go index eb93bb7..de60abe 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -45,14 +45,14 @@ func (lw *ListenerWrapper) Provision(ctx caddy.Context) error { lw.logger = ctx.Logger() if lw.MatchingTimeout <= 0 { - lw.MatchingTimeout = caddy.Duration(3 * time.Second) + lw.MatchingTimeout = caddy.Duration(MatchingTimeoutDefault) } err := lw.Routes.Provision(ctx) if err != nil { return err } - lw.compiledRoute = lw.Routes.Compile(listenerHandler{}, lw.logger, time.Duration(lw.MatchingTimeout)) + lw.compiledRoute = lw.Routes.Compile(lw.logger, time.Duration(lw.MatchingTimeout), listenerHandler{}) return nil } diff --git a/layer4/routes.go b/layer4/routes.go index d3ef20f..62ebfc3 100644 --- a/layer4/routes.go +++ b/layer4/routes.go @@ -98,7 +98,7 @@ func (routes RouteList) Provision(ctx caddy.Context) error { // Compile prepares a middleware chain from the route list. // This should only be done once: after all the routes have // been provisioned, and before the server loop begins. -func (routes RouteList) Compile(next NextHandler, logger *zap.Logger, matchingTimeout time.Duration) Handler { +func (routes RouteList) Compile(logger *zap.Logger, matchingTimeout time.Duration, next NextHandler) Handler { return HandlerFunc(func(cx *Connection) error { deadline := time.Now().Add(matchingTimeout) router: diff --git a/layer4/routes_test.go b/layer4/routes_test.go index 86021bf..1db54c7 100644 --- a/layer4/routes_test.go +++ b/layer4/routes_test.go @@ -26,10 +26,11 @@ func TestMatchingTimeoutWorks(t *testing.T) { matched := false loggerCore, logs := observer.New(zapcore.WarnLevel) - compiledRoutes := routes.Compile(NextHandlerFunc(func(con *Connection, next Handler) error { - matched = true - return next.Handle(con) - }), zap.New(loggerCore), 5*time.Millisecond) + compiledRoutes := routes.Compile(zap.New(loggerCore), 5*time.Millisecond, + NextHandlerFunc(func(con *Connection, next Handler) error { + matched = true + return next.Handle(con) + })) in, out := net.Pipe() defer in.Close() diff --git a/layer4/server.go b/layer4/server.go index 24138fb..b238238 100644 --- a/layer4/server.go +++ b/layer4/server.go @@ -25,6 +25,8 @@ import ( "go.uber.org/zap" ) +const MatchingTimeoutDefault = 3 * time.Second + // Server represents a Caddy layer4 server. type Server struct { // The network address to bind to. Any Caddy network address @@ -48,7 +50,7 @@ func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error { s.logger = logger if s.MatchingTimeout <= 0 { - s.MatchingTimeout = caddy.Duration(3 * time.Second) + s.MatchingTimeout = caddy.Duration(MatchingTimeoutDefault) } for i, address := range s.Listen { @@ -63,7 +65,7 @@ func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error { if err != nil { return err } - s.compiledRoute = s.Routes.Compile(nopNextHandler{}, s.logger, time.Duration(s.MatchingTimeout)) + s.compiledRoute = s.Routes.Compile(s.logger, time.Duration(s.MatchingTimeout), nopNextHandler{}) return nil } diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index 8b0438d..18ab993 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -45,10 +45,11 @@ func httpMatchTester(t *testing.T, matchers json.RawMessage, data []byte) (bool, assertNoError(t, err) matched := false - compiledRoute := routes.Compile(layer4.NextHandlerFunc(func(con *layer4.Connection, _ layer4.Handler) error { - matched = true - return nil - }), zap.NewNop(), 10*time.Millisecond) + compiledRoute := routes.Compile(zap.NewNop(), 10*time.Millisecond, + layer4.NextHandlerFunc(func(con *layer4.Connection, _ layer4.Handler) error { + matched = true + return nil + })) err = compiledRoute.Handle(cx) assertNoError(t, err) @@ -203,10 +204,11 @@ func TestHttpMatchingByProtocolWithHttps(t *testing.T) { assertNoError(t, err) handlerCalled := false - compiledRoute := routes.Compile(layer4.NextHandlerFunc(func(con *layer4.Connection, _ layer4.Handler) error { - handlerCalled = true - return nil - }), zap.NewNop(), 100*time.Millisecond) + compiledRoute := routes.Compile(zap.NewNop(), 100*time.Millisecond, + layer4.NextHandlerFunc(func(con *layer4.Connection, _ layer4.Handler) error { + handlerCalled = true + return nil + })) in, out := net.Pipe() defer in.Close() diff --git a/modules/l4subroute/handler.go b/modules/l4subroute/handler.go index 4261326..4bb67d2 100644 --- a/modules/l4subroute/handler.go +++ b/modules/l4subroute/handler.go @@ -54,7 +54,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.logger = ctx.Logger(h) if h.MatchingTimeout <= 0 { - h.MatchingTimeout = caddy.Duration(3 * time.Second) + h.MatchingTimeout = caddy.Duration(layer4.MatchingTimeoutDefault) } if h.Routes != nil { @@ -68,9 +68,10 @@ func (h *Handler) Provision(ctx caddy.Context) error { // Handle handles the connections. func (h *Handler) Handle(cx *layer4.Connection, next layer4.Handler) error { - subroute := h.Routes.Compile(layer4.NextHandlerFunc(func(cx *layer4.Connection, _ layer4.Handler) error { - return next.Handle(cx) // continue with original chain after subroute - }), h.logger, time.Duration(h.MatchingTimeout)) + subroute := h.Routes.Compile(h.logger, time.Duration(h.MatchingTimeout), + layer4.NextHandlerFunc(func(cx *layer4.Connection, _ layer4.Handler) error { + return next.Handle(cx) // continue with original chain after subroute + })) return subroute.Handle(cx) } From 703fca12e5337f38220d43d1ddf920b4b8e96367 Mon Sep 17 00:00:00 2001 From: Yannick Dylla <17772145+ydylla@users.noreply.github.com> Date: Thu, 9 May 2024 21:18:38 +0200 Subject: [PATCH 14/14] refactor: make PrefetchChunkSize private & fix typo --- layer4/connection.go | 14 +++++++------- modules/l4http/httpmatcher_test.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/layer4/connection.go b/layer4/connection.go index ec149df..7bd0184 100644 --- a/layer4/connection.go +++ b/layer4/connection.go @@ -138,13 +138,13 @@ func (cx *Connection) prefetch() (err error) { for len(cx.buf) < MaxMatchingBytes { free := cap(cx.buf) - len(cx.buf) - if free >= PrefetchChunkSize { - n, err = cx.Conn.Read(cx.buf[len(cx.buf) : len(cx.buf)+PrefetchChunkSize]) + if free >= prefetchChunkSize { + n, err = cx.Conn.Read(cx.buf[len(cx.buf) : len(cx.buf)+prefetchChunkSize]) cx.buf = cx.buf[:len(cx.buf)+n] } else { if tmp == nil { tmp = bufPool.Get().([]byte) - tmp = tmp[:PrefetchChunkSize] + tmp = tmp[:prefetchChunkSize] defer bufPool.Put(tmp) } n, err = cx.Conn.Read(tmp) @@ -157,7 +157,7 @@ func (cx *Connection) prefetch() (err error) { return err } - if n < PrefetchChunkSize { + if n < prefetchChunkSize { break } } @@ -212,7 +212,7 @@ func (cx *Connection) GetVar(key string) interface{} { } // MatchingBytes returns all bytes currently available for matching. This is only intended for reading. -// Do not write into the slice it's a view of the internal buffer and you will likely mess up the connection. +// Do not write into the slice. It's a view of the internal buffer and you will likely mess up the connection. func (cx *Connection) MatchingBytes() []byte { return cx.buf[cx.offset:] } @@ -229,7 +229,7 @@ var ( listenerCtxKey caddy.CtxKey = "listener" ) -const PrefetchChunkSize = 1024 +const prefetchChunkSize = 1024 // MaxMatchingBytes is the amount of bytes that are at most prefetched during matching. // This is probably most relevant for the http matcher since http requests do not have a size limit. @@ -238,6 +238,6 @@ const MaxMatchingBytes = 8 * 1024 var bufPool = sync.Pool{ New: func() interface{} { - return make([]byte, 0, PrefetchChunkSize) + return make([]byte, 0, prefetchChunkSize) }, } diff --git a/modules/l4http/httpmatcher_test.go b/modules/l4http/httpmatcher_test.go index 18ab993..19d1e0a 100644 --- a/modules/l4http/httpmatcher_test.go +++ b/modules/l4http/httpmatcher_test.go @@ -27,7 +27,7 @@ func httpMatchTester(t *testing.T, matchers json.RawMessage, data []byte) (bool, defer in.Close() defer out.Close() - cx := layer4.WrapConnection(in, make([]byte, 0, layer4.PrefetchChunkSize), zap.NewNop()) + cx := layer4.WrapConnection(in, make([]byte, 0), zap.NewNop()) go func() { _, err := out.Write(data) assertNoError(t, err)