Skip to content

Commit

Permalink
revert avro writer, testing cause of failure
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 24, 2024
1 parent 3dd81e0 commit 4e58de9
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"
"io"
"log"
"log/slog"
"os"
"sync"
"sync/atomic"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -186,13 +186,14 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {

func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) {
r, w := io.Pipe()
var writeOcfError error
var numRows int
var wg sync.WaitGroup
wg.Add(1)
numRowsWritten := make(chan int, 1)
go func() {
numRows, writeOcfError = p.WriteOCF(w)
wg.Done()
defer w.Close()
numRows, err := p.WriteOCF(w)
if err != nil {
log.Fatalf("%v", err)
}
numRowsWritten <- numRows
}()

s3svc, err := utils.CreateS3Client(s3Creds)
Expand All @@ -213,12 +214,9 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils
}

slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key))
wg.Wait()
if writeOcfError != nil {
return nil, writeOcfError
}

return &AvroFile{
NumRecords: numRows,
NumRecords: <-numRowsWritten,
StorageLocation: AvroS3Storage,
FilePath: key,
}, nil
Expand Down

0 comments on commit 4e58de9

Please sign in to comment.