Skip to content

Commit

Permalink
Avoid log.Fatalf in connector code, return error instead
Browse files Browse the repository at this point in the history
Also prefer `fmt.Sprint(x)` to `fmt.Sprintf("%v", x)`
  • Loading branch information
serprex committed Jan 24, 2024
1 parent cd6086c commit 3dd81e0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 23 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...)
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...)
}

return &model.TableWithPkey{
Expand Down
22 changes: 12 additions & 10 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"
"io"
"log"
"log/slog"
"os"
"sync"
"sync/atomic"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -186,14 +186,13 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {

func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) {
r, w := io.Pipe()
numRowsWritten := make(chan int, 1)
var writeOcfError error
var numRows int
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer w.Close()
numRows, err := p.WriteOCF(w)
if err != nil {
log.Fatalf("%v", err)
}
numRowsWritten <- numRows
numRows, writeOcfError = p.WriteOCF(w)
wg.Done()
}()

s3svc, err := utils.CreateS3Client(s3Creds)
Expand All @@ -214,9 +213,12 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils
}

slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key))

wg.Wait()
if writeOcfError != nil {
return nil, writeOcfError
}
return &AvroFile{
NumRecords: <-numRowsWritten,
NumRecords: numRows,
StorageLocation: AvroS3Storage,
FilePath: key,
}, nil
Expand Down
7 changes: 1 addition & 6 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,7 @@ func NewTStructuredLogger(logger slog.Logger) *TStructuredLogger {
}

func (l *TStructuredLogger) keyvalsToFields(keyvals []interface{}) slog.Attr {
var attrs []any
for i := 0; i < len(keyvals); i += 1 {
key := fmt.Sprintf("%v", keyvals[i])
attrs = append(attrs, key)
}
return slog.Group("test-log", attrs...)
return slog.Group("test-log", keyvals...)
}

func (l *TStructuredLogger) Debug(msg string, keyvals ...interface{}) {
Expand Down
8 changes: 2 additions & 6 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,9 @@ func (q *QRepFlowExecution) processPartitions(
chunkSize = 1
}

batches := make([][]*protos.QRepPartition, 0)
batches := make([][]*protos.QRepPartition, 0, len(partitions)/chunkSize+1)
for i := 0; i < len(partitions); i += chunkSize {
end := i + chunkSize
if end > len(partitions) {
end = len(partitions)
}

end := min(i+chunkSize, len(partitions))
batches = append(batches, partitions[i:end])
}

Expand Down

0 comments on commit 3dd81e0

Please sign in to comment.