Skip to content

Commit

Permalink
comment about ExecuteAndProcessQuery waiting on channel, also defer t…
Browse files Browse the repository at this point in the history
…o not get stuck on panic
  • Loading branch information
serprex committed Jan 29, 2024
1 parent db04baf commit 8089989
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,15 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
stream := model.NewQRecordStream(1024)
errors := make(chan error, 1)
qe.logger.Info("Executing and processing query", slog.String("query", query))

// must wait on errors to close before returning to maintain qe.conn exclusion
go func() {
defer close(errors)
_, err := qe.ExecuteAndProcessQueryStream(stream, query, args...)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err))
errors <- err
}
close(errors)
}()

select {
Expand Down

0 comments on commit 8089989

Please sign in to comment.