Skip to content

Commit

Permalink
Check command tag for row counts
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 24, 2024
1 parent 3857e84 commit aeffe5b
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions flow/connectors/postgres/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
}

copyQuery := fmt.Sprintf("COPY (%s) TO STDOUT", query)
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args))
if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil {
qe.logger.Info("[pg_query_executor] executing copy", slog.String("query", copyQuery))
ct, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery)
if err != nil {
qe.logger.Info("[pg_query_executor] failed to copy",
slog.String("copyQuery", copyQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
Expand All @@ -213,12 +214,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
return 0, err
}

// TODO row count, GET DIAGNOSTICS x = ROW_COUNT
totalRecordsFetched := 0

totalRecordsFetched := ct.RowsAffected()
qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",
query, totalRecordsFetched))
return totalRecordsFetched, nil
return int(totalRecordsFetched), nil
}

func (p PgCopyWriter) Close(err error) {
Expand All @@ -231,10 +230,10 @@ func (p PgCopyReader) GetColumnNames() []string {

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
<-p.schema.schemaLatch
_, err := c.conn.PgConn().CopyFrom(
ct, err := c.conn.PgConn().CopyFrom(
ctx,
p.PipeReader,
fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(p.schema.schema, ",")),
)
return 0, err
return ct.RowsAffected(), err
}

0 comments on commit aeffe5b

Please sign in to comment.