diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index f49e702317..f8bb1b3a47 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -174,7 +174,6 @@ func (p PgCopyWriter) ExecuteQueryWithTx( norows.Close() // TODO use pgx simple query arg parsing code (it's internal, need to copy) - // TODO correctly interpolate for i, arg := range args { var str string switch arg := arg.(type) { @@ -196,7 +195,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx( query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i+1), str) } - copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ",")) + copyQuery := fmt.Sprintf("COPY (%s) TO STDOUT", query) qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args)) if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil { qe.logger.Info("[pg_query_executor] failed to copy",