Skip to content

Commit

Permalink
fix copy syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 24, 2024
1 parent 1319b8f commit 3857e84
Showing 1 changed file with 1 addition and 2 deletions.
3 changes: 1 addition & 2 deletions flow/connectors/postgres/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
norows.Close()

// TODO use pgx simple query arg parsing code (it's internal, need to copy)
// TODO correctly interpolate
for i, arg := range args {
var str string
switch arg := arg.(type) {
Expand All @@ -196,7 +195,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i+1), str)
}

copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ","))
copyQuery := fmt.Sprintf("COPY (%s) TO STDOUT", query)
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args))
if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil {
qe.logger.Info("[pg_query_executor] failed to copy",
Expand Down

0 comments on commit 3857e84

Please sign in to comment.