Skip to content

Commit

Permalink
Merge pull request rancher-sandbox#7896 from bcxpro/2094-closewrite-p…
Browse files Browse the repository at this point in the history
…roxy

Fix: docker exec hangs indefinitely when reading from stdin
  • Loading branch information
Nino-K authored Feb 12, 2025
2 parents 7695d4f + 0697c13 commit b74dcc4
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 62 deletions.
7 changes: 6 additions & 1 deletion src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ func Listen(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("endpoint %s does not start with protocol %s", endpoint, prefix)
}

listener, err := winio.ListenPipe(endpoint[len(prefix):], nil)
// Configure pipe in MessageMode to support Docker's half-close semantics
// - Enables zero-byte writes as EOF signals (CloseWrite)
// - Crucial for stdin stream termination in interactive containers
pipeConfig := &winio.PipeConfig{MessageMode: true}

listener, err := winio.ListenPipe(endpoint[len(prefix):], pipeConfig)
if err != nil {
return nil, fmt.Errorf("could not listen on %s: %w", endpoint, err)
}
Expand Down
13 changes: 5 additions & 8 deletions src/go/wsl-helper/pkg/dockerproxy/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"log"
"net"
"net/http"
"net/http/httputil"
"os"
"os/signal"
"regexp"
Expand All @@ -38,6 +37,7 @@ import (

"github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/models"
"github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/platform"
"github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/util"
)

// RequestContextValue contains things we attach to incoming requests
Expand Down Expand Up @@ -74,7 +74,10 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error {
logWriter := logrus.StandardLogger().Writer()
defer logWriter.Close()
munger := newRequestMunger()
proxy := &httputil.ReverseProxy{
proxy := &util.ReverseProxy{
Dial: func(string, string) (net.Conn, error) {
return dialer()
},
Director: func(req *http.Request) {
logrus.WithField("request", req).
WithField("headers", req.Header).
Expand All @@ -96,12 +99,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error {
Error("could not munge request")
}
},
Transport: &http.Transport{
Dial: func(string, string) (net.Conn, error) {
return dialer()
},
DisableCompression: true, // for debugging
},
ModifyResponse: func(resp *http.Response) error {
logEntry := logrus.WithField("response", resp)
defer func() { logEntry.Debug("got backend response") }()
Expand Down
19 changes: 16 additions & 3 deletions src/go/wsl-helper/pkg/dockerproxy/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,22 @@ func handleConnection(conn net.Conn, dockerPath string) {
return
}
defer dockerConn.Close()
err = util.Pipe(conn, dockerConn)
if err != nil {
logrus.Errorf("error forwarding docker connection: %s", err)

// Cast both client and docker connections to HalfReadWriteCloser for further handling.
xConn, ok := conn.(util.HalfReadWriteCloser)
if !ok {
logrus.Errorf("client connection does not implement HalfReadWriteCloser")
return
}

xDockerConn, ok := dockerConn.(util.HalfReadWriteCloser)
if !ok {
logrus.Errorf("docker connection does not implement HalfReadWriteCloser")
return
}

// Pipe data between the client and Docker, ensuring bidirectional data flow.
if err := util.Pipe(xConn, xDockerConn); err != nil {
logrus.Errorf("error forwarding data between client and docker: %s", err)
}
}
44 changes: 27 additions & 17 deletions src/go/wsl-helper/pkg/dockerproxy/util/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package util

import (
"fmt"
"io"
)

// Pipe bidirectionally between two streams.
func Pipe(c1, c2 io.ReadWriteCloser) error {
func Pipe(c1, c2 HalfReadWriteCloser) error {
ioCopy := func(reader io.Reader, writer io.Writer) <-chan error {
ch := make(chan error)
go func() {
Expand All @@ -33,22 +33,32 @@ func Pipe(c1, c2 io.ReadWriteCloser) error {

ch1 := ioCopy(c1, c2)
ch2 := ioCopy(c2, c1)
select {
case err := <-ch1:
c1.Close()
c2.Close()
<-ch2
if err != io.EOF {
return err
}
case err := <-ch2:
c1.Close()
c2.Close()
<-ch1
if err != io.EOF {
return err
for i := 0; i < 2; i++ {
select {
case err := <-ch1:
cwErr := c2.CloseWrite()
if cwErr != nil {
return fmt.Errorf("error closing write end of c2: %w", cwErr)
}
if err != nil && err != io.EOF {
return err
}
case err := <-ch2:
cwErr := c1.CloseWrite()
if cwErr != nil {
return fmt.Errorf("error closing write end of c1: %w", cwErr)
}
if err != nil && err != io.EOF {
return err
}
}
}

return nil
}

type HalfReadWriteCloser interface {
// CloseWrite closes the write-side of the connection.
CloseWrite() error
// Write is a passthrough to the underlying connection.
io.ReadWriteCloser
}
131 changes: 98 additions & 33 deletions src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,120 @@ package util

import (
"bytes"
"errors"
"io"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

type nopReadWriteCloser struct {
io.ReadWriter
// bidirectionalHalfClosePipe is a testing utility that simulates a bidirectional pipe
// with the ability to half-close connections. It's designed to mimic scenarios
// like interactive command-line operations where a client can send data and
// then half-close the connection while waiting for a response.
type bidirectionalHalfClosePipe struct {
r io.ReadCloser
w io.WriteCloser
}

func (nopReadWriteCloser) Close() error {
return nil
// newBidirectionalHalfClosePipe creates two interconnected bidirectional pipe endpoints.
//
// The function returns two bidirectionalHalfClosePipe instances that are connected
// such that what is written to one's write endpoint can be read from the other's
// read endpoint, and vice versa.
//
// Returns:
// - h1: First bidirectional pipe endpoint
// - h2: Second bidirectional pipe endpoint
func newBidirectionalHalfClosePipe() (h1, h2 *bidirectionalHalfClosePipe) {
pr1, pw1 := io.Pipe()
pr2, pw2 := io.Pipe()

h1 = &bidirectionalHalfClosePipe{
r: pr1, w: pw2,
}

h2 = &bidirectionalHalfClosePipe{
r: pr2, w: pw1,
}
return
}

type passthroughReadWriteCloser struct {
io.ReadCloser
io.WriteCloser
func (h *bidirectionalHalfClosePipe) CloseWrite() error {
return h.w.Close()
}

func newPipeReadWriter() io.ReadWriteCloser {
r, w := io.Pipe()
return &passthroughReadWriteCloser{
ReadCloser: r,
WriteCloser: w,
func (h *bidirectionalHalfClosePipe) Close() error {
wErr := h.w.Close()
rErr := h.r.Close()

if wErr != nil {
return wErr
}
return rErr
}

func (p *passthroughReadWriteCloser) Close() error {
err := p.ReadCloser.Close()
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
return err
}
err = p.WriteCloser.Close()
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
return err
}
return nil
func (h *bidirectionalHalfClosePipe) Read(p []byte) (n int, err error) {
return h.r.Read(p)
}

func (h *bidirectionalHalfClosePipe) Write(p []byte) (n int, err error) {
return h.w.Write(p)
}

// TestPipe verifies the functionality of the bidirectional pipe utility.
//
// The test simulates a scenario similar to interactive command execution,
// such as a docker run -i command, which requires bidirectional communication.
// This test case mimics scenarios like:
// - Sending input to a Docker container via stdin
// - Half-closing the input stream
// - Receiving output from the container
//
// The test steps are:
// 1. A client sends data
// 2. The client half-closes the connection
// 3. The server reads the data
// 4. The server sends a return response
// 5. The server half-closes the connection
//
// This approach is particularly relevant for interactive Docker runs where
// the client needs to send input and then wait for the container's response,
// while maintaining the ability to close streams independently.
func TestPipe(t *testing.T) {
rw := newPipeReadWriter()
output := bytes.Buffer{}
data := &passthroughReadWriteCloser{
ReadCloser: nopReadWriteCloser{bytes.NewBufferString("some data")},
WriteCloser: nopReadWriteCloser{&output},
}
err := Pipe(rw, data)
if assert.NoError(t, err) {
assert.Equal(t, "some data", output.String())
}
h1a, h1b := newBidirectionalHalfClosePipe()
h2a, h2b := newBidirectionalHalfClosePipe()
var wg sync.WaitGroup
wg.Add(2)

// Goroutine simulating the client-side operation
go func() {
defer wg.Done()
dataToSend := bytes.NewBufferString("some data")
_, err := h1a.Write(dataToSend.Bytes())
assert.NoError(t, err)
h1a.CloseWrite()

output, err := io.ReadAll(h1a)
assert.NoError(t, err)
assert.EqualValues(t, output, "return data")
}()

// Goroutine simulating the server-side operation
go func() {
defer wg.Done()
output, err := io.ReadAll(h2b)
assert.NoError(t, err)
assert.EqualValues(t, output, "some data")

dataToSend := bytes.NewBufferString("return data")
_, err = h2b.Write(dataToSend.Bytes())
assert.NoError(t, err)

h2b.CloseWrite()
}()

err := Pipe(h1b, h2a)
assert.NoError(t, err)
wg.Wait()
}
Loading

0 comments on commit b74dcc4

Please sign in to comment.