From 3dd81e0e7866e95bf89d1b92c2332c8ae0b5552f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 24 Jan 2024 00:13:03 +0000 Subject: [PATCH] Avoid log.Fatalf in connector code, return error instead Also prefer `fmt.Sprint(x)` to `fmt.Sprintf("%v", x)` --- flow/connectors/postgres/cdc.go | 2 +- flow/connectors/utils/avro/avro_writer.go | 22 ++++++++++++---------- flow/e2e/test_utils.go | 7 +------ flow/workflows/qrep_flow.go | 8 ++------ 4 files changed, 16 insertions(+), 23 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7959e5ff67..b680c9b832 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -897,7 +897,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, if err != nil { return nil, fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...) + pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...) } return &model.TableWithPkey{ diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 6148ae2a6a..beba9961d1 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "io" - "log" "log/slog" "os" + "sync" "sync/atomic" "github.com/aws/aws-sdk-go-v2/aws" @@ -186,14 +186,13 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { r, w := io.Pipe() - numRowsWritten := make(chan int, 1) + var writeOcfError error + var numRows int + var wg sync.WaitGroup + wg.Add(1) go func() { - defer w.Close() - numRows, err := p.WriteOCF(w) - if err != nil { - log.Fatalf("%v", err) - } - numRowsWritten <- numRows + numRows, writeOcfError = p.WriteOCF(w) + wg.Done() }() s3svc, err := utils.CreateS3Client(s3Creds) @@ -214,9 +213,12 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils } slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key)) - + wg.Wait() + if writeOcfError != nil { + return nil, writeOcfError + } return &AvroFile{ - NumRecords: <-numRowsWritten, + NumRecords: numRows, StorageLocation: AvroS3Storage, FilePath: key, }, nil diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 94ee47a8e2..5aa5ba730c 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -505,12 +505,7 @@ func NewTStructuredLogger(logger slog.Logger) *TStructuredLogger { } func (l *TStructuredLogger) keyvalsToFields(keyvals []interface{}) slog.Attr { - var attrs []any - for i := 0; i < len(keyvals); i += 1 { - key := fmt.Sprintf("%v", keyvals[i]) - attrs = append(attrs, key) - } - return slog.Group("test-log", attrs...) + return slog.Group("test-log", keyvals...) } func (l *TStructuredLogger) Debug(msg string, keyvals ...interface{}) { diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 1ae1518e21..e7fc99dc40 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -236,13 +236,9 @@ func (q *QRepFlowExecution) processPartitions( chunkSize = 1 } - batches := make([][]*protos.QRepPartition, 0) + batches := make([][]*protos.QRepPartition, 0, len(partitions)/chunkSize+1) for i := 0; i < len(partitions); i += chunkSize { - end := i + chunkSize - if end > len(partitions) { - end = len(partitions) - } - + end := min(i+chunkSize, len(partitions)) batches = append(batches, partitions[i:end]) }