diff --git a/.github/workflows/replication-test.yml b/.github/workflows/replication-test.yml index fa260c4..0c9f01b 100644 --- a/.github/workflows/replication-test.yml +++ b/.github/workflows/replication-test.yml @@ -118,7 +118,7 @@ jobs: apecloud/myduckserver:latest # Wait and check container status - for i in {1..10}; do + for i in {1..15}; do if ! docker ps | grep -q myduck; then echo "MyDuck container exited unexpectedly" docker logs myduck diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index 039cf57..212c080 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -1420,20 +1420,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo } done := make(chan struct{}) - var sendErr atomic.Value + var globalErr atomic.Value + var blocked atomic.Bool + blocked.Store(true) go func() { defer close(done) // Open the pipe for reading. ctx.GetLogger().Tracef("Opening FIFO pipe for reading: %s", pipePath) pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) + blocked.Store(false) if err != nil { - sendErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err)) + globalErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err)) cancel() return } defer pipe.Close() + // If the error has been set, then we should cancel the operation. + if globalErr.Load() != nil { + cancel() + return + } + ctx.GetLogger().Debug("Copying data from the pipe to the client") defer func() { ctx.GetLogger().Debug("Finished copying data from the pipe to the client") @@ -1464,7 +1473,7 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo if err == io.EOF { break } - sendErr.Store(err) + globalErr.Store(err) cancel() return } @@ -1473,14 +1482,14 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo count := bytes.Count(line, []byte{'\t'}) err := sendCopyOutResponse(count + 1) if err != nil { - sendErr.Store(err) + globalErr.Store(err) cancel() return } } err = sendCopyData(line) if err != nil { - sendErr.Store(err) + globalErr.Store(err) cancel() return } @@ -1488,7 +1497,7 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo default: err := sendCopyOutResponse(1) if err != nil { - sendErr.Store(err) + globalErr.Store(err) cancel() return } @@ -1500,14 +1509,14 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo if err == io.EOF { break } - sendErr.Store(err) + globalErr.Store(err) cancel() return } if n > 0 { err := sendCopyData(buf[:n]) if err != nil { - sendErr.Store(err) + globalErr.Store(err) cancel() return } @@ -1519,16 +1528,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo select { case <-ctx.Done(): // Context is canceled <-done - err, _ := sendErr.Load().(error) + err, _ := globalErr.Load().(error) return errors.Join(ctx.Err(), err) case result := <-ch: + if blocked.Load() { + // If the pipe is still opened for reading but the writer has exited, + // then we need to open the pipe for writing again to unblock the reader. + globalErr.Store(errors.Join( + fmt.Errorf("pipe is opened for reading but the writer has exited"), + result.Err, + )) + pipe, _ := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe) + if pipe != nil { + pipe.Close() + } + } + <-done if result.Err != nil { return fmt.Errorf("failed to copy data: %w", result.Err) } - if err, ok := sendErr.Load().(error); ok { + if err, ok := globalErr.Load().(error); ok { return err } diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index 4aa019e..846e6d6 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -57,7 +57,7 @@ type PipeDataLoader struct { pipePath string read func() pipe atomic.Pointer[os.File] // for writing - opening atomic.Bool // for writing + blocked atomic.Bool // for writing errPipe atomic.Pointer[os.File] // for error handling rowCount chan int64 err atomic.Pointer[error] @@ -65,6 +65,8 @@ type PipeDataLoader struct { } func (loader *PipeDataLoader) Start() <-chan error { + loader.blocked.Store(true) + // Open the reader. go loader.read() @@ -76,9 +78,8 @@ func (loader *PipeDataLoader) Start() <-chan error { // Open the pipe for writing. // This operation will block until the reader opens the pipe for reading. loader.logger.Debugf("Opening pipe for writing: %s", loader.pipePath) - loader.opening.Store(true) pipe, err := os.OpenFile(loader.pipePath, os.O_WRONLY, os.ModeNamedPipe) - loader.opening.Store(false) + loader.blocked.Store(false) if err != nil { ready <- err return @@ -259,7 +260,7 @@ func (loader *CsvDataLoader) executeCopy(sql string, pipePath string) { if err != nil { loader.ctx.GetLogger().Error(err) loader.err.Store(&err) - if loader.opening.Load() { + if loader.blocked.Load() { // Open the pipe once to unblock the writer pipe, _ := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) loader.errPipe.Store(pipe)