From 251eaad5b49075651a2f7e874d002a3cb064d014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 25 Jan 2024 16:25:37 +0000 Subject: [PATCH] Avoid log.Fatalf in connector code, return error instead (#1138) We don't want temporal activities calling `os.Exit` Also prefer `fmt.Sprint(x)` to `fmt.Sprintf("%v", x)` --- flow/connectors/postgres/cdc.go | 2 +- flow/connectors/utils/avro/avro_writer.go | 33 +++++++++++++---------- flow/e2e/test_utils.go | 7 +---- flow/workflows/qrep_flow.go | 8 ++---- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 8029b4ae60..c3fd393522 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -897,7 +897,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, if err != nil { return nil, fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...) + pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...) } return &model.TableWithPkey{ diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 6148ae2a6a..a8b5c41be8 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "log" "log/slog" "os" "sync/atomic" @@ -185,23 +184,23 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { } func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { - r, w := io.Pipe() - numRowsWritten := make(chan int, 1) - go func() { - defer w.Close() - numRows, err := p.WriteOCF(w) - if err != nil { - log.Fatalf("%v", err) - } - numRowsWritten <- numRows - }() - s3svc, err := utils.CreateS3Client(s3Creds) if err != nil { slog.Error("failed to create S3 client: ", slog.Any("error", err)) return nil, fmt.Errorf("failed to create S3 client: %w", err) } + r, w := io.Pipe() + defer r.Close() + var writeOcfError error + var numRows int + var noPanic bool + go func() { + defer w.Close() + numRows, writeOcfError = p.WriteOCF(w) + noPanic = true + }() + _, err = manager.NewUploader(s3svc).Upload(p.ctx, &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), @@ -212,11 +211,17 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils slog.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) } - slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key)) + if !noPanic { + return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key) + } + if writeOcfError != nil { + return nil, writeOcfError + } + return &AvroFile{ - NumRecords: <-numRowsWritten, + NumRecords: numRows, StorageLocation: AvroS3Storage, FilePath: key, }, nil diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 0638ecae25..772cdc78aa 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -518,12 +518,7 @@ func NewTStructuredLogger(logger slog.Logger) *TStructuredLogger { } func (l *TStructuredLogger) keyvalsToFields(keyvals []interface{}) slog.Attr { - var attrs []any - for i := 0; i < len(keyvals); i += 1 { - key := fmt.Sprintf("%v", keyvals[i]) - attrs = append(attrs, key) - } - return slog.Group("test-log", attrs...) + return slog.Group("test-log", keyvals...) } func (l *TStructuredLogger) Debug(msg string, keyvals ...interface{}) { diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index e2ddadd2ae..e7e87a41d5 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -235,13 +235,9 @@ func (q *QRepFlowExecution) processPartitions( chunkSize = 1 } - batches := make([][]*protos.QRepPartition, 0) + batches := make([][]*protos.QRepPartition, 0, len(partitions)/chunkSize+1) for i := 0; i < len(partitions); i += chunkSize { - end := i + chunkSize - if end > len(partitions) { - end = len(partitions) - } - + end := min(i+chunkSize, len(partitions)) batches = append(batches, partitions[i:end]) }