From 3857e84f7fb228347b7071ead708ac685bb500d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 24 May 2024 16:54:51 +0000 Subject: [PATCH] fix copy syntax --- flow/connectors/postgres/sink.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index f49e702317..f8bb1b3a47 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -174,7 +174,6 @@ func (p PgCopyWriter) ExecuteQueryWithTx( norows.Close() // TODO use pgx simple query arg parsing code (it's internal, need to copy) - // TODO correctly interpolate for i, arg := range args { var str string switch arg := arg.(type) { @@ -196,7 +195,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx( query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i+1), str) } - copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ",")) + copyQuery := fmt.Sprintf("COPY (%s) TO STDOUT", query) qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args)) if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil { qe.logger.Info("[pg_query_executor] failed to copy",