Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make COPY TO STDOUT robust to SQL errors #323

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/replication-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ jobs:
apecloud/myduckserver:latest

# Wait and check container status
for i in {1..10}; do
for i in {1..15}; do
if ! docker ps | grep -q myduck; then
echo "MyDuck container exited unexpectedly"
docker logs myduck
Expand Down
42 changes: 32 additions & 10 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,20 +1404,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
}

done := make(chan struct{})
var sendErr atomic.Value
var globalErr atomic.Value
var blocked atomic.Bool
blocked.Store(true)
go func() {
defer close(done)

// Open the pipe for reading.
ctx.GetLogger().Tracef("Opening FIFO pipe for reading: %s", pipePath)
pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
blocked.Store(false)
if err != nil {
sendErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err))
globalErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err))
cancel()
return
}
defer pipe.Close()

// If the error has been set, then we should cancel the operation.
if globalErr.Load() != nil {
cancel()
return
}

ctx.GetLogger().Debug("Copying data from the pipe to the client")
defer func() {
ctx.GetLogger().Debug("Finished copying data from the pipe to the client")
Expand Down Expand Up @@ -1448,7 +1457,7 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
if err == io.EOF {
break
}
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
Expand All @@ -1457,22 +1466,22 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
count := bytes.Count(line, []byte{'\t'})
err := sendCopyOutResponse(count + 1)
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
}
err = sendCopyData(line)
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
}
default:
err := sendCopyOutResponse(1)
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
Expand All @@ -1484,14 +1493,14 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
if err == io.EOF {
break
}
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
if n > 0 {
err := sendCopyData(buf[:n])
if err != nil {
sendErr.Store(err)
globalErr.Store(err)
cancel()
return
}
Expand All @@ -1503,16 +1512,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
select {
case <-ctx.Done(): // Context is canceled
<-done
err, _ := sendErr.Load().(error)
err, _ := globalErr.Load().(error)
return errors.Join(ctx.Err(), err)
case result := <-ch:
if blocked.Load() {
// If the pipe is still opened for reading but the writer has exited,
// then we need to open the pipe for writing again to unblock the reader.
globalErr.Store(errors.Join(
fmt.Errorf("pipe is opened for reading but the writer has exited"),
result.Err,
))
pipe, _ := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if pipe != nil {
pipe.Close()
}
}

<-done

if result.Err != nil {
return fmt.Errorf("failed to copy data: %w", result.Err)
}

if err, ok := sendErr.Load().(error); ok {
if err, ok := globalErr.Load().(error); ok {
return err
}

Expand Down
9 changes: 5 additions & 4 deletions pgserver/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ type PipeDataLoader struct {
pipePath string
read func()
pipe atomic.Pointer[os.File] // for writing
opening atomic.Bool // for writing
blocked atomic.Bool // for writing
errPipe atomic.Pointer[os.File] // for error handling
rowCount chan int64
err atomic.Pointer[error]
logger *logrus.Entry
}

func (loader *PipeDataLoader) Start() <-chan error {
loader.blocked.Store(true)

// Open the reader.
go loader.read()

Expand All @@ -76,9 +78,8 @@ func (loader *PipeDataLoader) Start() <-chan error {
// Open the pipe for writing.
// This operation will block until the reader opens the pipe for reading.
loader.logger.Debugf("Opening pipe for writing: %s", loader.pipePath)
loader.opening.Store(true)
pipe, err := os.OpenFile(loader.pipePath, os.O_WRONLY, os.ModeNamedPipe)
loader.opening.Store(false)
loader.blocked.Store(false)
if err != nil {
ready <- err
return
Expand Down Expand Up @@ -259,7 +260,7 @@ func (loader *CsvDataLoader) executeCopy(sql string, pipePath string) {
if err != nil {
loader.ctx.GetLogger().Error(err)
loader.err.Store(&err)
if loader.opening.Load() {
if loader.blocked.Load() {
// Open the pipe once to unblock the writer
pipe, _ := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
loader.errPipe.Store(pipe)
Expand Down
Loading