Skip to content

Commit

Permalink
Avoid double escape
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 25, 2024
1 parent 047a3f0 commit d9dec77
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
11 changes: 6 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions(

if totalRows.Int64 == 0 {
c.logger.Warn("no records to replicate, returning")
return make([]*protos.QRepPartition, 0), nil
return nil, nil
}

// Calculate the number of partitions
Expand Down Expand Up @@ -543,14 +543,15 @@ func syncQRepRecords(

// construct the SET clause for the upsert operation
upsertMatchColsList := writeMode.UpsertKeyColumns
upsertMatchCols := make(map[string]struct{})
upsertMatchCols := make(map[string]struct{}, len(upsertMatchColsList))
for _, col := range upsertMatchColsList {
upsertMatchCols[col] = struct{}{}
}

setClauseArray := make([]string, 0)
selectStrArray := make([]string, 0)
for _, col := range sink.GetColumnNames() {
columnNames := sink.GetColumnNames()
setClauseArray := make([]string, 0, len(upsertMatchColsList)+1)
selectStrArray := make([]string, 0, len(columnNames))
for _, col := range columnNames {
_, ok := upsertMatchCols[col]
quotedCol := QuoteIdentifier(col)
if !ok {
Expand Down
11 changes: 8 additions & 3 deletions flow/connectors/postgres/sink_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
fieldDescriptions := norows.FieldDescriptions()
cols := make([]string, 0, len(fieldDescriptions))
for _, fd := range fieldDescriptions {
cols = append(cols, QuoteIdentifier(fd.Name))
cols = append(cols, fd.Name)
}
p.SetSchema(cols)
norows.Close()
Expand Down Expand Up @@ -112,15 +112,20 @@ func (p PgCopyWriter) Close(err error) {
}

func (p PgCopyReader) GetColumnNames() []string {
<-p.schema.schemaLatch
return p.schema.schema
}

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
<-p.schema.schemaLatch
cols := p.GetColumnNames()
quotedCols := make([]string, 0, len(cols))
for _, col := range cols {
quotedCols = append(cols, QuoteIdentifier(col))

Check failure on line 123 in flow/connectors/postgres/sink_pg.go

View workflow job for this annotation

GitHub Actions / lint

appendAssign: append result not assigned to the same slice (gocritic)
}
ct, err := c.conn.PgConn().CopyFrom(
ctx,
p.PipeReader,
fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(p.schema.schema, ",")),
fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(quotedCols, ",")),
)
return ct.RowsAffected(), err
}

0 comments on commit d9dec77

Please sign in to comment.