From e06861fbec3a3c5e3d28a993691811db0be78a12 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Mon, 28 Oct 2024 11:13:07 +0800 Subject: [PATCH 1/5] stop the loop when listener is closed --- layer4/listener.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/layer4/listener.go b/layer4/listener.go index 70c2733..b64067e 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -7,6 +7,7 @@ import ( "net" "runtime" "sync" + "sync/atomic" "time" "github.com/caddyserver/caddy/v2" @@ -114,6 +115,7 @@ type listener struct { logger *zap.Logger compiledRoute Handler + closed atomic.Bool // closed when there is a non-recoverable error and all handle goroutines are done connChan chan net.Conn err error @@ -122,10 +124,21 @@ type listener struct { wg *sync.WaitGroup } +func (l *listener) Close() error { + err := l.Listener.Close() + l.closed.Store(true) + return err +} + // loop accept connection from underlying listener and pipe the connection if there are any func (l *listener) loop() { for { conn, err := l.Listener.Accept() + // listener closed + if l.closed.Load() { + break + } + var nerr net.Error if errors.As(err, &nerr) && nerr.Temporary() { l.logger.Error("temporary error accepting connection", zap.Error(err)) From 20ad8720564575a91e9c8c39c163a5e443660deb Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Mon, 28 Oct 2024 11:22:12 +0800 Subject: [PATCH 2/5] simplify check --- layer4/listener.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/layer4/listener.go b/layer4/listener.go index b64067e..33622d7 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -125,9 +125,8 @@ type listener struct { } func (l *listener) Close() error { - err := l.Listener.Close() l.closed.Store(true) - return err + return l.Listener.Close() } // loop accept connection from underlying listener and pipe the connection if there are any From 892a5e563a0cdd8a7c9bb45d149b6823c236e5c7 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Mon, 28 Oct 2024 14:10:46 +0800 Subject: [PATCH 3/5] set error after listener closed --- layer4/listener.go | 1 + 1 file changed, 1 insertion(+) diff --git a/layer4/listener.go b/layer4/listener.go index 33622d7..d2e4ae5 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -135,6 +135,7 @@ func (l *listener) loop() { conn, err := l.Listener.Accept() // listener closed if l.closed.Load() { + l.err = net.ErrClosed break } From c422a1b083938067765e1c1c5c6bb31cef9e024a Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Mon, 28 Oct 2024 15:48:26 +0800 Subject: [PATCH 4/5] stop queuing new connections after listener closed --- layer4/listener.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/layer4/listener.go b/layer4/listener.go index d2e4ae5..85d5193 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -66,6 +66,7 @@ func (lw *ListenerWrapper) WrapListener(l net.Listener) net.Listener { Listener: l, logger: lw.logger, compiledRoute: lw.compiledRoute, + done: make(chan struct{}), connChan: connChan, wg: new(sync.WaitGroup), } @@ -116,9 +117,9 @@ type listener struct { compiledRoute Handler closed atomic.Bool + done chan struct{} // closed when there is a non-recoverable error and all handle goroutines are done connChan chan net.Conn - err error // count running handles wg *sync.WaitGroup @@ -135,7 +136,6 @@ func (l *listener) loop() { conn, err := l.Listener.Accept() // listener closed if l.closed.Load() { - l.err = net.ErrClosed break } @@ -145,7 +145,6 @@ func (l *listener) loop() { continue } if err != nil { - l.err = err break } @@ -158,6 +157,7 @@ func (l *listener) loop() { l.wg.Wait() close(l.connChan) }() + close(l.done) for conn := range l.connChan { _ = conn.Close() } @@ -198,11 +198,15 @@ func (l *listener) handle(conn net.Conn) { } func (l *listener) Accept() (net.Conn, error) { - for conn := range l.connChan { - return conn, nil + select { + case conn, ok := <-l.connChan: + if ok { + return conn, nil + } + return nil, net.ErrClosed + case <-l.done: + return nil, net.ErrClosed } - return nil, l.err - } func (l *listener) pipeConnection(conn *Connection) error { From 1025d32826e1e51e7f5e68f44a9c87a8c8d5f9ed Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Thu, 31 Oct 2024 07:56:51 +0800 Subject: [PATCH 5/5] simplify close condition check --- layer4/listener.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/layer4/listener.go b/layer4/listener.go index 85d5193..b7d532b 100644 --- a/layer4/listener.go +++ b/layer4/listener.go @@ -134,13 +134,8 @@ func (l *listener) Close() error { func (l *listener) loop() { for { conn, err := l.Listener.Accept() - // listener closed - if l.closed.Load() { - break - } - var nerr net.Error - if errors.As(err, &nerr) && nerr.Temporary() { + if errors.As(err, &nerr) && nerr.Temporary() && !l.closed.Load() { l.logger.Error("temporary error accepting connection", zap.Error(err)) continue }