diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index a10af84672..a8ceefe307 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -148,8 +148,9 @@ func (qe *QRepQueryExecutor) processRowsStream( record, err := qe.mapRowToQRecord(rows, fieldDescriptions) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) - stream.Close(fmt.Errorf("failed to map row to QRecord: %w", err)) - return 0, fmt.Errorf("failed to map row to QRecord: %w", err) + err := fmt.Errorf("failed to map row to QRecord: %w", err) + stream.Close(err) + return 0, err } stream.Records <- record diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index d8d2230a36..e1aa183b7a 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -45,7 +45,7 @@ type PgCopyReader struct { func NewPgCopyPipe() (PgCopyReader, PgCopyWriter) { read, write := io.Pipe() - schema := PgCopyShared{} + schema := PgCopyShared{schemaLatch: make(chan struct{}, 0)} return PgCopyReader{PipeReader: read, schema: &schema}, PgCopyWriter{PipeWriter: write, schema: &schema} }