Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unnecessary routes matching and restore old fallback next hand… #208

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions layer4/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,19 @@ func (cx *Connection) Wrap(conn net.Conn) *Connection {
// prefetch tries to read all bytes that a client initially sent us without blocking.
func (cx *Connection) prefetch() (err error) {
var n int
var tmp []byte

for len(cx.buf) < MaxMatchingBytes {
// read once
if len(cx.buf) < MaxMatchingBytes {
free := cap(cx.buf) - len(cx.buf)
if free >= prefetchChunkSize {
n, err = cx.Conn.Read(cx.buf[len(cx.buf) : len(cx.buf)+prefetchChunkSize])
cx.buf = cx.buf[:len(cx.buf)+n]
} else {
if tmp == nil {
tmp = bufPool.Get().([]byte)
tmp = tmp[:prefetchChunkSize]
defer bufPool.Put(tmp)
}
var tmp []byte
tmp = bufPool.Get().([]byte)
tmp = tmp[:prefetchChunkSize]
defer bufPool.Put(tmp)

n, err = cx.Conn.Read(tmp)
cx.buf = append(cx.buf, tmp[:n]...)
}
Expand All @@ -159,23 +159,17 @@ func (cx *Connection) prefetch() (err error) {
return err
}

if n < prefetchChunkSize {
break
if cx.Logger.Core().Enabled(zap.DebugLevel) {
cx.Logger.Debug("prefetched",
zap.String("remote", cx.RemoteAddr().String()),
zap.Int("bytes", len(cx.buf)),
)
}
}

if cx.Logger.Core().Enabled(zap.DebugLevel) {
cx.Logger.Debug("prefetched",
zap.String("remote", cx.RemoteAddr().String()),
zap.Int("bytes", len(cx.buf)),
)
}

if len(cx.buf) >= MaxMatchingBytes {
return ErrMatchingBufferFull
return nil
}

return nil
return ErrMatchingBufferFull
}

// freeze activates the matching mode that only reads from cx.buf.
Expand Down Expand Up @@ -215,6 +209,8 @@ func (cx *Connection) GetVar(key string) interface{} {

// MatchingBytes returns all bytes currently available for matching. This is only intended for reading.
// Do not write into the slice. It's a view of the internal buffer, and you will likely mess up the connection.
// Use of this for matching purpose should be accompanied by corresponding error value,
// ErrConsumedAllPrefetchedBytes and ErrMatchingBufferFull, if not matched.
func (cx *Connection) MatchingBytes() []byte {
return cx.buf[cx.offset:]
}
Expand Down
9 changes: 6 additions & 3 deletions layer4/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ type nopHandler struct{}

func (nopHandler) Handle(_ *Connection) error { return nil }

type nopNextHandler struct{}
// forwardNextHandler will forward the handling to the next handler in the chain.
type forwardNextHandler struct{}

func (nopNextHandler) Handle(cx *Connection, next Handler) error { return next.Handle(cx) }
func (forwardNextHandler) Handle(cx *Connection, next Handler) error {
return next.Handle(cx)
}

// listenerHandler is a connection handler that pipe incoming connection to channel as a listener wrapper
type listenerHandler struct{}

func (listenerHandler) Handle(conn *Connection, _ Handler) error {
func (listenerHandler) Handle(conn *Connection) error {
return conn.Context.Value(listenerCtxKey).(*listener).pipeConnection(conn)
}
81 changes: 57 additions & 24 deletions layer4/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func (r *Route) Provision(ctx caddy.Context) error {
for _, midhandler := range handlers {
r.middleware = append(r.middleware, wrapHandler(midhandler))
}

return nil
}

Expand All @@ -95,26 +94,40 @@ func (routes RouteList) Provision(ctx caddy.Context) error {
return nil
}

const (
// routes that need more data to determine the match
routeNeedsMore = iota
// routes definitely not matched
routeNotMatched
routeMatched
)

// Compile prepares a middleware chain from the route list.
// This should only be done once: after all the routes have
// been provisioned, and before the server loop begins.
func (routes RouteList) Compile(logger *zap.Logger, matchingTimeout time.Duration, next NextHandler) Handler {
func (routes RouteList) Compile(logger *zap.Logger, matchingTimeout time.Duration, next Handler) Handler {
return HandlerFunc(func(cx *Connection) error {
deadline := time.Now().Add(matchingTimeout)
lastMatchedRouteIdx := -1
router:

var (
lastMatchedRouteIdx = -1
lastNeedsMoreIdx = -1
routesStatus = make(map[int]int)
matcherNeedMore bool
)
// this loop should only be done if there are matchers that can't determine the match,
// i.e. some of the matchers returned false, ErrConsumedAllPrefetchedBytes. The index which
// the loop begins depends upon if there is a matched route.
loop:
// timeout matching to protect against malicious or very slow clients
err := cx.Conn.SetReadDeadline(deadline)
if err != nil {
return err
}

for i := 0; i < 10000; i++ { // retry prefetching and matching routes until timeout

// Do not call prefetch if this is the first loop iteration and there already is some data available,
// since this means we are at the start of a subroute handler and previous prefetch calls likely already fetched all bytes available from the client.
// Which means it would block the subroute handler. In the second iteration (if no subroute routes match) blocking is the correct behaviour.
if i != 0 || cx.buf == nil || len(cx.buf[cx.offset:]) == 0 {
for {
// only read more because matchers require more (no matcher in the simplest case).
// can happen if this routes list is embedded in another
if matcherNeedMore {
err = cx.prefetch()
if err != nil {
logFunc := logger.Error
Expand All @@ -128,25 +141,32 @@ func (routes RouteList) Compile(logger *zap.Logger, matchingTimeout time.Duratio
}

for i, route := range routes {
// After a match continue with the routes after the matched one, instead of starting at the beginning.
// This is done for backwards compatibility with configs written before the "Non blocking matchers & matching timeout" rewrite.
// See https://github.com/mholt/caddy-l4/pull/192 and https://github.com/mholt/caddy-l4/pull/192#issuecomment-2143681952.
if i <= lastMatchedRouteIdx {
continue
}
// Only skip once after a match, so it behaves like we continued after the match.
lastMatchedRouteIdx = -1

// If the route is definitely not matched, skip it
if s, ok := routesStatus[i]; ok && s == routeNotMatched && i <= lastNeedsMoreIdx {
continue
}
// now the matcher is after a matched route and current route needs more data to determine if more data is needed.
// note a matcher is skipped if the one after it can determine it is matched

// A route must match at least one of the matcher sets
matched, err := route.matcherSets.AnyMatch(cx)
if errors.Is(err, ErrConsumedAllPrefetchedBytes) {
lastNeedsMoreIdx = i
routesStatus[i] = routeNeedsMore
continue // ignore and try next route
}
if err != nil {
logger.Error("matching connection", zap.String("remote", cx.RemoteAddr().String()), zap.Error(err))
return nil
}
if matched {
routesStatus[i] = routeMatched
lastMatchedRouteIdx = i
lastNeedsMoreIdx = i
// remove deadline after we matched
err = cx.Conn.SetReadDeadline(time.Time{})
if err != nil {
Expand All @@ -163,7 +183,7 @@ func (routes RouteList) Compile(logger *zap.Logger, matchingTimeout time.Duratio
return nil
})
// compile the route handler stack with lastHandler being called last
handler := wrapHandler(next)(lastHandler)
handler := wrapHandler(forwardNextHandler{})(lastHandler)
for i := len(route.middleware) - 1; i >= 0; i-- {
handler = route.middleware[i](handler)
}
Expand All @@ -173,18 +193,31 @@ func (routes RouteList) Compile(logger *zap.Logger, matchingTimeout time.Duratio
}

// If handler is terminal we stop routing,
// otherwise we jump back to the start of the routing loop to peel of more protocol layers.
// otherwise we try the next handler.
if isTerminal {
return nil
} else {
lastMatchedRouteIdx = i
goto router
}
} else {
routesStatus[i] = routeNotMatched
}
}
// end of match
if lastMatchedRouteIdx == len(routes)-1 {
// next is called because if the last handler is terminal, it's already returned
return next.Handle(cx)
}
var indetermined int
for i, s := range routesStatus {
if i > lastMatchedRouteIdx && s == routeNeedsMore {
indetermined++
}
}
// some of the matchers can't reach a conclusion
if indetermined > 0 {
matcherNeedMore = true
goto loop
}
return next.Handle(cx)
}

logger.Error("matching connection", zap.String("remote", cx.RemoteAddr().String()), zap.Error(errors.New("number of prefetch calls exhausted")))
return nil
})
}
31 changes: 28 additions & 3 deletions layer4/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package layer4

import (
"context"
"encoding/json"
"errors"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"io"
"net"
"testing"
"time"
Expand All @@ -13,11 +16,33 @@ import (
"go.uber.org/zap/zaptest/observer"
)

type testIoMatcher struct {
}

func (testIoMatcher) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "layer4.matchers.testIoMatcher",
New: func() caddy.Module { return new(testIoMatcher) },
}
}

func (m *testIoMatcher) Match(cx *Connection) (bool, error) {
buf := make([]byte, 1)
n, err := io.ReadFull(cx, buf)
return n > 0, err
}

func TestMatchingTimeoutWorks(t *testing.T) {
ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()})
defer cancel()

routes := RouteList{&Route{}}
caddy.RegisterModule(testIoMatcher{})

routes := RouteList{&Route{
MatcherSetsRaw: caddyhttp.RawMatcherSets{
caddy.ModuleMap{"testIoMatcher": json.RawMessage("{}")}, // any io using matcher
},
}}

err := routes.Provision(ctx)
if err != nil {
Expand All @@ -27,9 +52,9 @@ func TestMatchingTimeoutWorks(t *testing.T) {
matched := false
loggerCore, logs := observer.New(zapcore.WarnLevel)
compiledRoutes := routes.Compile(zap.New(loggerCore), 5*time.Millisecond,
NextHandlerFunc(func(con *Connection, next Handler) error {
HandlerFunc(func(con *Connection) error {
matched = true
return next.Handle(con)
return nil
}))

in, out := net.Pipe()
Expand Down
2 changes: 1 addition & 1 deletion layer4/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Server) Provision(ctx caddy.Context, logger *zap.Logger) error {
if err != nil {
return err
}
s.compiledRoute = s.Routes.Compile(s.logger, time.Duration(s.MatchingTimeout), nopNextHandler{})
s.compiledRoute = s.Routes.Compile(s.logger, time.Duration(s.MatchingTimeout), nopHandler{})

return nil
}
Expand Down
17 changes: 13 additions & 4 deletions modules/l4http/httpmatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,14 @@ func (m *MatchHTTP) Match(cx *layer4.Connection) (bool, error) {
var err error

data := cx.MatchingBytes()
if !m.isHttp(data) {
needMore, matched := m.isHttp(data)
if needMore {
if len(data) >= layer4.MaxMatchingBytes {
return false, layer4.ErrMatchingBufferFull
}
return false, layer4.ErrConsumedAllPrefetchedBytes
}
if !matched {
return false, nil
}

Expand Down Expand Up @@ -124,11 +131,13 @@ func (m *MatchHTTP) Match(cx *layer4.Connection) (bool, error) {
return m.matcherSets.AnyMatch(req), nil
}

func (m *MatchHTTP) isHttp(data []byte) bool {
// isHttp test if the buffered data looks like HTTP by looking at the first line.
// first boolean determines if more data is required
func (m MatchHTTP) isHttp(data []byte) (bool, bool) {
// try to find the end of a http request line, for example " HTTP/1.1\r\n"
i := bytes.IndexByte(data, 0x0a) // find first new line
if i < 10 {
return false
return true, false
}
// assume only \n line ending
start := i - 9 // position of space in front of HTTP
Expand All @@ -138,7 +147,7 @@ func (m *MatchHTTP) isHttp(data []byte) bool {
start -= 1
end -= 1
}
return bytes.Compare(data[start:end], []byte(" HTTP/")) == 0
return false, bytes.Compare(data[start:end], []byte(" HTTP/")) == 0
}

// Parses information from a http2 request with prior knowledge (RFC 7540 Section 3.4)
Expand Down
Loading
Loading