diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index e1aa183b7a..0da2387dc7 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -45,7 +45,7 @@ type PgCopyReader struct { func NewPgCopyPipe() (PgCopyReader, PgCopyWriter) { read, write := io.Pipe() - schema := PgCopyShared{schemaLatch: make(chan struct{}, 0)} + schema := PgCopyShared{schemaLatch: make(chan struct{})} return PgCopyReader{PipeReader: read, schema: &schema}, PgCopyWriter{PipeWriter: write, schema: &schema} } @@ -174,7 +174,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx( // TODO use pgx simple query arg parsing code (it's internal, need to copy) // TODO correctly interpolate for i, arg := range args { - query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i), fmt.Sprint(arg)) + query = strings.ReplaceAll(query, fmt.Sprintf("$%d", i+1), fmt.Sprint(arg)) } copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ","))