diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7959e5ff67..b680c9b832 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -897,7 +897,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, if err != nil { return nil, fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...) + pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...) } return &model.TableWithPkey{ diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 6148ae2a6a..beba9961d1 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "io" - "log" "log/slog" "os" + "sync" "sync/atomic" "github.com/aws/aws-sdk-go-v2/aws" @@ -186,14 +186,13 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { r, w := io.Pipe() - numRowsWritten := make(chan int, 1) + var writeOcfError error + var numRows int + var wg sync.WaitGroup + wg.Add(1) go func() { - defer w.Close() - numRows, err := p.WriteOCF(w) - if err != nil { - log.Fatalf("%v", err) - } - numRowsWritten <- numRows + numRows, writeOcfError = p.WriteOCF(w) + wg.Done() }() s3svc, err := utils.CreateS3Client(s3Creds) @@ -214,9 +213,12 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils } slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key)) - + wg.Wait() + if writeOcfError != nil { + return nil, writeOcfError + } return &AvroFile{ - NumRecords: <-numRowsWritten, + NumRecords: numRows, StorageLocation: AvroS3Storage, FilePath: key, }, nil diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 94ee47a8e2..5aa5ba730c 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -505,12 +505,7 @@ func NewTStructuredLogger(logger slog.Logger) *TStructuredLogger { } func (l *TStructuredLogger) keyvalsToFields(keyvals []interface{}) slog.Attr { - var attrs []any - for i := 0; i < len(keyvals); i += 1 { - key := fmt.Sprintf("%v", keyvals[i]) - attrs = append(attrs, key) - } - return slog.Group("test-log", attrs...) + return slog.Group("test-log", keyvals...) } func (l *TStructuredLogger) Debug(msg string, keyvals ...interface{}) {