diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index beba9961d1..6148ae2a6a 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "io" + "log" "log/slog" "os" - "sync" "sync/atomic" "github.com/aws/aws-sdk-go-v2/aws" @@ -186,13 +186,14 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { r, w := io.Pipe() - var writeOcfError error - var numRows int - var wg sync.WaitGroup - wg.Add(1) + numRowsWritten := make(chan int, 1) go func() { - numRows, writeOcfError = p.WriteOCF(w) - wg.Done() + defer w.Close() + numRows, err := p.WriteOCF(w) + if err != nil { + log.Fatalf("%v", err) + } + numRowsWritten <- numRows }() s3svc, err := utils.CreateS3Client(s3Creds) @@ -213,12 +214,9 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils } slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key)) - wg.Wait() - if writeOcfError != nil { - return nil, writeOcfError - } + return &AvroFile{ - NumRecords: numRows, + NumRecords: <-numRowsWritten, StorageLocation: AvroS3Storage, FilePath: key, }, nil