From d9dec7798c607e74f5ddf10ef6c2705be80cf83b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 25 May 2024 15:35:47 +0000 Subject: [PATCH] Avoid double escape --- flow/connectors/postgres/qrep.go | 11 ++++++----- flow/connectors/postgres/sink_pg.go | 11 ++++++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 2478a19613..ac45baeefe 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -123,7 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions( if totalRows.Int64 == 0 { c.logger.Warn("no records to replicate, returning") - return make([]*protos.QRepPartition, 0), nil + return nil, nil } // Calculate the number of partitions @@ -543,14 +543,15 @@ func syncQRepRecords( // construct the SET clause for the upsert operation upsertMatchColsList := writeMode.UpsertKeyColumns - upsertMatchCols := make(map[string]struct{}) + upsertMatchCols := make(map[string]struct{}, len(upsertMatchColsList)) for _, col := range upsertMatchColsList { upsertMatchCols[col] = struct{}{} } - setClauseArray := make([]string, 0) - selectStrArray := make([]string, 0) - for _, col := range sink.GetColumnNames() { + columnNames := sink.GetColumnNames() + setClauseArray := make([]string, 0, len(upsertMatchColsList)+1) + selectStrArray := make([]string, 0, len(columnNames)) + for _, col := range columnNames { _, ok := upsertMatchCols[col] quotedCol := QuoteIdentifier(col) if !ok { diff --git a/flow/connectors/postgres/sink_pg.go b/flow/connectors/postgres/sink_pg.go index 9591c66121..fac5d89c9e 100644 --- a/flow/connectors/postgres/sink_pg.go +++ b/flow/connectors/postgres/sink_pg.go @@ -72,7 +72,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx( fieldDescriptions := norows.FieldDescriptions() cols := make([]string, 0, len(fieldDescriptions)) for _, fd := range fieldDescriptions { - cols = append(cols, QuoteIdentifier(fd.Name)) + cols = append(cols, fd.Name) } p.SetSchema(cols) norows.Close() @@ -112,15 +112,20 @@ func (p PgCopyWriter) Close(err error) { } func (p PgCopyReader) GetColumnNames() []string { + <-p.schema.schemaLatch return p.schema.schema } func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { - <-p.schema.schemaLatch + cols := p.GetColumnNames() + quotedCols := make([]string, 0, len(cols)) + for _, col := range cols { + quotedCols = append(cols, QuoteIdentifier(col)) + } ct, err := c.conn.PgConn().CopyFrom( ctx, p.PipeReader, - fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(p.schema.schema, ",")), + fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(quotedCols, ",")), ) return ct.RowsAffected(), err }