Skip to content

Commit

Permalink
wtf numRecords
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 24, 2024
1 parent 817b46c commit 6abba93
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 47 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ func corePullQRepRecords(
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)

var numRecords int
if err := executor.conn.QueryRow(ctx, query, rangeStart, rangeEnd).Scan(&numRecords); err != nil {
numRecords, err := executor.ExecuteQueryIntoSink(ctx, sink, query, rangeStart, rangeEnd)
if err != nil {
return 0, err
}

Expand Down
29 changes: 0 additions & 29 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,35 +286,6 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(
return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
ctx context.Context,
stream *model.QRecordStream,
query string,
args ...interface{},
) (int, int64, error) {
var currentSnapshotXmin pgtype.Int8
qe.logger.Info("Executing and processing query stream", slog.String("query", query))
defer stream.Close(nil)

tx, err := qe.conn.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadOnly,
IsoLevel: pgx.RepeatableRead,
})
if err != nil {
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, err
}

totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(ctx, tx, stream, query, args...)
return totalRecordsFetched, currentSnapshotXmin.Int64, err
}

func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
ctx context.Context,
sink QuerySinkWriter,
Expand Down
21 changes: 5 additions & 16 deletions flow/connectors/postgres/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ func (p PgCopyWriter) SetSchema(schema []string) {
}
}

func (p PgCopyWriter) IsSchemaSet() bool {
return p.schema.schemaSet
}

func (p PgCopyWriter) ExecuteQueryWithTx(
ctx context.Context,
qe *QRepQueryExecutor,
Expand All @@ -162,8 +158,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
}
}

schemaQuery := query + " limit 0"
norows, err := tx.Query(ctx, schemaQuery, args...)
norows, err := tx.Query(ctx, query+" limit 0", args...)
if err != nil {
return 0, err
}
Expand All @@ -173,9 +168,7 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
for _, fd := range fieldDescriptions {
cols = append(cols, QuoteIdentifier(fd.Name))
}
if !p.IsSchemaSet() {
p.SetSchema(cols)
}
p.SetSchema(cols)
norows.Close()

// TODO use pgx simple query arg parsing code (it's internal, need to copy)
Expand All @@ -187,9 +180,9 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ","))
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args))
if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil {
qe.logger.Info("[pg_query_executor] failed to declare cursor",
qe.logger.Info("[pg_query_executor] failed to copy",
slog.String("copyQuery", copyQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
p.Close(err)
return 0, err
}
Expand Down Expand Up @@ -220,10 +213,6 @@ func (p PgCopyReader) GetColumnNames() []string {

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
<-p.schema.schemaLatch
cols := make([]string, 0, len(p.schema.schema))
for _, col := range p.schema.schema {
cols = append(cols, QuoteIdentifier(col))
}
_, err := c.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ",")))
_, err := c.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(p.schema.schema, ",")))
return 0, err
}

0 comments on commit 6abba93

Please sign in to comment.