diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index f8bb1b3a47..b6f4324491 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -196,8 +196,9 @@ func (p PgCopyWriter) ExecuteQueryWithTx( } copyQuery := fmt.Sprintf("COPY (%s) TO STDOUT", query) - qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args)) - if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil { + qe.logger.Info("[pg_query_executor] executing copy", slog.String("query", copyQuery)) + ct, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery) + if err != nil { qe.logger.Info("[pg_query_executor] failed to copy", slog.String("copyQuery", copyQuery), slog.Any("error", err)) err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err) @@ -213,12 +214,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx( return 0, err } - // TODO row count, GET DIAGNOSTICS x = ROW_COUNT - totalRecordsFetched := 0 - + totalRecordsFetched := ct.RowsAffected() qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d", query, totalRecordsFetched)) - return totalRecordsFetched, nil + return int(totalRecordsFetched), nil } func (p PgCopyWriter) Close(err error) { @@ -231,10 +230,10 @@ func (p PgCopyReader) GetColumnNames() []string { func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { <-p.schema.schemaLatch - _, err := c.conn.PgConn().CopyFrom( + ct, err := c.conn.PgConn().CopyFrom( ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(p.schema.schema, ",")), ) - return 0, err + return ct.RowsAffected(), err }