diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index e9b0a5558e..cb436fd7e3 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -375,8 +375,8 @@ func corePullQRepRecords( executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) - var numRecords int - if err := executor.conn.QueryRow(ctx, query, rangeStart, rangeEnd).Scan(&numRecords); err != nil { + numRecords, err := executor.ExecuteQueryIntoSink(ctx, sink, query, rangeStart, rangeEnd) + if err != nil { return 0, err } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 6ed6e572f6..a10af84672 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -286,35 +286,6 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink( return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...) } -func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin( - ctx context.Context, - stream *model.QRecordStream, - query string, - args ...interface{}, -) (int, int64, error) { - var currentSnapshotXmin pgtype.Int8 - qe.logger.Info("Executing and processing query stream", slog.String("query", query)) - defer stream.Close(nil) - - tx, err := qe.conn.BeginTx(ctx, pgx.TxOptions{ - AccessMode: pgx.ReadOnly, - IsoLevel: pgx.RepeatableRead, - }) - if err != nil { - qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err)) - return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err) - } - - err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin) - if err != nil { - qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err)) - return 0, currentSnapshotXmin.Int64, err - } - - totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(ctx, tx, stream, query, args...) - return totalRecordsFetched, currentSnapshotXmin.Int64, err -} - func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin( ctx context.Context, sink QuerySinkWriter, diff --git a/flow/connectors/postgres/sink.go b/flow/connectors/postgres/sink.go index 831bc75977..fd0c3a067b 100644 --- a/flow/connectors/postgres/sink.go +++ b/flow/connectors/postgres/sink.go @@ -138,10 +138,6 @@ func (p PgCopyWriter) SetSchema(schema []string) { } } -func (p PgCopyWriter) IsSchemaSet() bool { - return p.schema.schemaSet -} - func (p PgCopyWriter) ExecuteQueryWithTx( ctx context.Context, qe *QRepQueryExecutor, @@ -162,8 +158,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx( } } - schemaQuery := query + " limit 0" - norows, err := tx.Query(ctx, schemaQuery, args...) + norows, err := tx.Query(ctx, query+" limit 0", args...) if err != nil { return 0, err } @@ -173,9 +168,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx( for _, fd := range fieldDescriptions { cols = append(cols, QuoteIdentifier(fd.Name)) } - if !p.IsSchemaSet() { - p.SetSchema(cols) - } + p.SetSchema(cols) norows.Close() // TODO use pgx simple query arg parsing code (it's internal, need to copy) @@ -187,9 +180,9 @@ func (p PgCopyWriter) ExecuteQueryWithTx( copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ",")) qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args)) if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil { - qe.logger.Info("[pg_query_executor] failed to declare cursor", + qe.logger.Info("[pg_query_executor] failed to copy", slog.String("copyQuery", copyQuery), slog.Any("error", err)) - err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err) + err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err) p.Close(err) return 0, err } @@ -220,10 +213,6 @@ func (p PgCopyReader) GetColumnNames() []string { func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) { <-p.schema.schemaLatch - cols := make([]string, 0, len(p.schema.schema)) - for _, col := range p.schema.schema { - cols = append(cols, QuoteIdentifier(col)) - } - _, err := c.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ","))) + _, err := c.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(p.schema.schema, ","))) return 0, err }