Skip to content

Commit

Permalink
Sketching out implementations, bit annoying that conn is private on p…
Browse files Browse the repository at this point in the history
…gx.Tx, as is query sanitization
  • Loading branch information
serprex committed May 23, 2024
1 parent c138f2a commit 4978121
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 36 deletions.
18 changes: 10 additions & 8 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func (c *PostgresConnector) PullPgQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *io.PipeReader,
stream *io.PipeWriter,
) (int, error) {
return corePullQRepRecords(c, ctx, config, partition, &PgCopySinkWrite{PipeReader: stream})
return corePullQRepRecords(c, ctx, config, partition, PgCopyWriter{PipeWriter: stream})
}

func corePullQRepRecords(
Expand All @@ -335,7 +335,7 @@ func corePullQRepRecords(
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)

_, err := sink.ExecuteQuery(ctx, executor, config.Query)
_, err := executor.ExecuteQueryIntoSink(ctx, sink, config.Query)
return 0, err
}
c.logger.Info("Obtained ranges for partition for PullQRepStream", partitionIdLog)
Expand Down Expand Up @@ -391,16 +391,18 @@ func (c *PostgresConnector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
return syncQRepRecords(c, ctx, config, partition, model.NewQRecordCopyFromSource(stream))
return syncQRepRecords(c, ctx, config, partition, RecordStreamSink{
QRecordStream: stream,
})
}

func (c *PostgresConnector) SyncPgQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
pipe *io.PipeReader,
pipe PgCopyReader,
) (int, error) {
return syncQRepRecords(c, ctx, config, partition, model.NewPgRecordCopyFromSource(pipe))
return syncQRepRecords(c, ctx, config, partition, pipe)
}

func syncQRepRecords(
Expand Down Expand Up @@ -669,14 +671,14 @@ func pullXminRecordStream(
var err error
var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
numRecords, currentSnapshotXmin, err = executor.ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
ctx,
sink,
query,
oldxid,
)
} else {
numRecords, currentSnapshotXmin, err = executor.executeAndProcessQueryStreamGettingCurrentSnapshotXmin(
numRecords, currentSnapshotXmin, err = executor.ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
ctx,
sink,
query,
Expand Down
74 changes: 46 additions & 28 deletions flow/connectors/postgres/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
return totalRecordsFetched, nil
}

func (stream RecordStreamSink) CopyInto(ctx context.Context, tx pgx.Tx, table pgx.Identifier) (int64, error) {
return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream))
}

func (stream RecordStreamSink) GetColumnNames() []string {
return stream.Schema().GetColumnNames()
}

func (p PgCopyWriter) SetSchema(schema []pgconn.FieldDescription) {
if !p.schema.schemaSet {
p.schema.schema = schema
close(p.schema.schemaLatch)
p.schema.schemaSet = true
}
}

func (p PgCopyWriter) IsSchemaSet() bool {
return p.schema.schemaSet
}

func (p PgCopyWriter) ExecuteQueryWithTx(
ctx context.Context,
qe *QRepQueryExecutor,
Expand All @@ -145,47 +165,36 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
}

schemaQuery := query + " limit 0"
norows, err := tx.Query(ctx, schemaQuery, ...args)
norows, err := tx.Query(ctx, schemaQuery, args...)
if err != nil {
return 0, err
}
fieldDescriptions := norows.FieldDescriptions()
cols := make([]string, 0, len(fieldDescriptions))
for _, fd := range fieldDescriptions {
cols = append(cols, fd.Name)
cols = append(cols, QuoteIdentifier(fd.Name))
}
if !p.schema.IsSchemaSet() {
p.schema.SetSchema(fieldDescriptions)
if !p.IsSchemaSet() {
p.SetSchema(fieldDescriptions)
}
norows.Close()

// TODO use pgx simple query arg parsing code (it's internal, need to copy)
// TODO correctly interpolate
for i, arg := range args {
query = strings.Replace(query, fmt.Sprintf("$%d", i), fmt.Sprint(arg), -1)
}

copyQuery := fmt.Sprintf("COPY %s (%s) TO STDOUT", query, strings.Join(cols, ","))
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", copyQuery, args))
if _, err := qe.conn.PgConn().CopyTo(ctx, copyQuery, args...); err != nil {
if _, err := qe.conn.PgConn().CopyTo(ctx, p.PipeWriter, copyQuery); err != nil {
qe.logger.Info("[pg_query_executor] failed to declare cursor",
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
slog.String("copyQuery", copyQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
p.Close(err)
return 0, err
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query))

totalRecordsFetched := 0
for {
numRows, err := qe.processFetchedRows(ctx, query, tx, cursorName, fetchSize, stream.QRecordStream)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to process fetched rows", slog.Any("error", err))
return 0, err
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] fetched %d rows for query '%s'", numRows, query))
totalRecordsFetched += numRows

if numRows == 0 {
break
}
}

qe.logger.Info("Committing transaction")
if err := tx.Commit(ctx); err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
Expand All @@ -194,15 +203,24 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
return 0, err
}

// TODO row count, GET DIAGNOSTICS x = ROW_COUNT
totalRecordsFetched := 0

qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",
query, totalRecordsFetched))
return totalRecordsFetched, nil
}

func (stream RecordStreamSink) GetColumnNames() []string {
return stream.Schema().GetColumnNames()
}

func (p PgCopyWriter) Close(err error) {
p.PipeWriter.CloseWithError(err)
}

func (p PgCopyReader) CopyInto(ctx context.Context, qe *QRepQueryExecutor, tx pgx.Tx, table pgx.Identifier) (int64, error) {
<-p.schema.schemaLatch
cols := make([]string, 0, len(p.schema.schema))
for _, fd := range p.schema.schema {
cols = append(cols, QuoteIdentifier(fd.Name))
}
_, err := qe.conn.PgConn().CopyFrom(ctx, p.PipeReader, fmt.Sprintf("COPY %s (%s) FROM STDIN", table.Sanitize(), strings.Join(cols, ",")))
return 0, err
}

0 comments on commit 4978121

Please sign in to comment.