Skip to content

Commit

Permalink
fix: make COPY TO STDOUT robust to SQL errors (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Dec 26, 2024
1 parent 76120eb commit 680ced2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/replication-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ jobs:
apecloud/myduckserver:latest
# Wait and check container status
for i in {1..10}; do
for i in {1..15}; do
if ! docker ps | grep -q myduck; then
echo "MyDuck container exited unexpectedly"
docker logs myduck
Expand Down
42 changes: 32 additions & 10 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,20 +1420,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
}

done := make(chan struct{})
var sendErr atomic.Value
var globalErr atomic.Value
var blocked atomic.Bool
blocked.Store(true)
go func() {
defer close(done)

// Open the pipe for reading.
ctx.GetLogger().Tracef("Opening FIFO pipe for reading: %s", pipePath)
pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
blocked.Store(false)
if err != nil {
sendErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err))
globalErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err))
cancel()
return
}
defer pipe.Close()

// If the error has been set, then we should cancel the operation.
if globalErr.Load() != nil {
cancel()
return
}

ctx.GetLogger().Debug("Copying data from the pipe to the client")
defer func() {
ctx.GetLogger().Debug("Finished copying data from the pipe to the client")
Expand Down Expand Up @@ -1464,7 +1473,7 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
if err == io.EOF {
break
}
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
Expand All @@ -1473,22 +1482,22 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
count := bytes.Count(line, []byte{'\t'})
err := sendCopyOutResponse(count + 1)
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
}
err = sendCopyData(line)
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
}
default:
err := sendCopyOutResponse(1)
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
Expand All @@ -1500,14 +1509,14 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
if err == io.EOF {
break
}
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
if n > 0 {
err := sendCopyData(buf[:n])
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
Expand All @@ -1519,16 +1528,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
select {
case <-ctx.Done(): // Context is canceled
<-done
err, _ := sendErr.Load().(error)
err, _ := globalErr.Load().(error)
return errors.Join(ctx.Err(), err)
case result := <-ch:
if blocked.Load() {
// If the pipe is still opened for reading but the writer has exited,
// then we need to open the pipe for writing again to unblock the reader.
globalErr.Store(errors.Join(
fmt.Errorf("pipe is opened for reading but the writer has exited"),
result.Err,
))
pipe, _ := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if pipe != nil {
pipe.Close()
}
}

<-done

if result.Err != nil {
return fmt.Errorf("failed to copy data: %w", result.Err)
}

if err, ok := sendErr.Load().(error); ok {
if err, ok := globalErr.Load().(error); ok {
return err
}

Expand Down
9 changes: 5 additions & 4 deletions pgserver/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ type PipeDataLoader struct {
pipePath string
read func()
pipe atomic.Pointer[os.File] // for writing
opening atomic.Bool // for writing
blocked atomic.Bool // for writing
errPipe atomic.Pointer[os.File] // for error handling
rowCount chan int64
err atomic.Pointer[error]
logger *logrus.Entry
}

func (loader *PipeDataLoader) Start() <-chan error {
loader.blocked.Store(true)

// Open the reader.
go loader.read()

Expand All @@ -76,9 +78,8 @@ func (loader *PipeDataLoader) Start() <-chan error {
// Open the pipe for writing.
// This operation will block until the reader opens the pipe for reading.
loader.logger.Debugf("Opening pipe for writing: %s", loader.pipePath)
loader.opening.Store(true)
pipe, err := os.OpenFile(loader.pipePath, os.O_WRONLY, os.ModeNamedPipe)
loader.opening.Store(false)
loader.blocked.Store(false)
if err != nil {
ready <- err
return
Expand Down Expand Up @@ -259,7 +260,7 @@ func (loader *CsvDataLoader) executeCopy(sql string, pipePath string) {
if err != nil {
loader.ctx.GetLogger().Error(err)
loader.err.Store(&err)
if loader.opening.Load() {
if loader.blocked.Load() {
// Open the pipe once to unblock the writer
pipe, _ := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
loader.errPipe.Store(pipe)
Expand Down

0 comments on commit 680ced2

Please sign in to comment.