Skip to content

Commit

Permalink
Remove QRecordOrError (#1620)
Browse files Browse the repository at this point in the history
Errors are non recoverable, indicating end of stream
Match existing APIs which have an Err method which should be checked after iteration
  • Loading branch information
serprex authored Apr 16, 2024
1 parent 2de2c6f commit ec943f5
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 90 deletions.
68 changes: 27 additions & 41 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,11 @@ func (qe *QRepQueryExecutor) processRowsStream(
record, err := qe.mapRowToQRecord(rows, fieldDescriptions)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
}
stream.Close(fmt.Errorf("failed to map row to QRecord: %w", err))
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- model.QRecordOrError{
Record: record,
Err: nil,
}
stream.Records <- record

if numRows%heartBeatNumRows == 0 {
qe.logger.Info("processing row stream", slog.String("cursor", cursorName), slog.Int("records", numRows))
Expand All @@ -180,9 +175,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
) (int, error) {
rows, err := qe.executeQueryInTx(ctx, tx, cursorName, fetchSize)
if err != nil {
stream.Records <- model.QRecordOrError{
Err: err,
}
stream.Close(err)
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
slog.Any("error", err), slog.String("query", query))
return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err)
Expand All @@ -201,10 +194,8 @@ func (qe *QRepQueryExecutor) processFetchedRows(
return 0, fmt.Errorf("failed to process rows: %w", err)
}

if rows.Err() != nil {
stream.Records <- model.QRecordOrError{
Err: rows.Err(),
}
if err := rows.Err(); err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] row iteration failed",
slog.String("query", query), slog.Any("error", rows.Err()))
return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, rows.Err())
Expand Down Expand Up @@ -241,14 +232,14 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
Records: make([][]qvalue.QValue, 0),
}
for record := range stream.Records {
if record.Err == nil {
batch.Records = append(batch.Records, record.Record)
} else {
<-errors
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", record.Err)
}
batch.Records = append(batch.Records, record)
}
if err := <-errors; err != nil {
return nil, err
}
if err := stream.Err(); err != nil {
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err)
}
<-errors
return batch, nil
}
}
Expand All @@ -260,7 +251,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
args ...interface{},
) (int, error) {
qe.logger.Info("Executing and processing query stream", slog.String("query", query))
defer close(stream.Records)
defer stream.Close(nil)

tx, err := qe.conn.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadOnly,
Expand All @@ -271,8 +262,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
}

totalRecordsFetched, err := qe.ExecuteAndProcessQueryStreamWithTx(ctx, tx, stream, query, args...)
return totalRecordsFetched, err
return qe.ExecuteAndProcessQueryStreamWithTx(ctx, tx, stream, query, args...)
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
Expand All @@ -283,7 +273,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotX
) (int, int64, error) {
var currentSnapshotXmin pgtype.Int8
qe.logger.Info("Executing and processing query stream", slog.String("query", query))
defer close(stream.Records)
defer stream.Close(nil)

tx, err := qe.conn.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadOnly,
Expand Down Expand Up @@ -323,22 +313,20 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
if qe.snapshot != "" {
_, err = tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to set snapshot: %w", err),
}
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
stream.Close(err)
return 0, err
}
}

randomUint, err := shared.RandomUInt64()
if err != nil {
qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to generate random uint: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
err = fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
stream.Close(err)
return 0, err
}

cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint)
Expand All @@ -347,12 +335,11 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args))
_, err = tx.Exec(ctx, cursorQuery, args...)
if err != nil {
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to declare cursor: %w", err),
}
qe.logger.Info("[pg_query_executor] failed to declare cursor",
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
stream.Close(err)
return 0, err
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query))
Expand All @@ -377,10 +364,9 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
err = tx.Commit(ctx)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to commit transaction: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
stream.Close(err)
return 0, err
}

qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",
Expand Down
13 changes: 6 additions & 7 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,8 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter
)

numRows := 0
for qRecordOrErr := range p.stream.Records {
if qRecordOrErr.Err != nil {
logger.Error("[avro] failed to get record from stream", slog.Any("error", qRecordOrErr.Err))
return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

avroMap, err := avroConverter.Convert(qRecordOrErr.Record)
for qrecord := range p.stream.Records {
avroMap, err := avroConverter.Convert(qrecord)
if err != nil {
logger.Error("failed to convert QRecord to Avro compatible map: ", slog.Any("error", err))
return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
Expand All @@ -150,6 +145,10 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter

numRows += 1
}
if err := p.stream.Err(); err != nil {
logger.Error("[avro] failed to get record from stream", slog.Any("error", err))
return 0, fmt.Errorf("[avro] failed to get record from stream: %w", err)
}

return numRows, nil
}
Expand Down
35 changes: 14 additions & 21 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,28 @@ func RecordsToRawTableStream[Items model.Items](req *model.RecordsToStreamReques
go func() {
for record := range req.GetRecords() {
record.PopulateCountMap(req.TableMapping)
qRecordOrError := recordToQRecordOrError(req.BatchID, record)
recordStream.Records <- qRecordOrError
qRecord, err := recordToQRecordOrError(req.BatchID, record)
if err != nil {
recordStream.Close(err)
return
} else {
recordStream.Records <- qRecord
}
}

close(recordStream.Records)
}()
return recordStream, nil
}

func recordToQRecordOrError[Items model.Items](batchID int64, record model.Record[Items]) model.QRecordOrError {
func recordToQRecordOrError[Items model.Items](batchID int64, record model.Record[Items]) ([]qvalue.QValue, error) {
var entries [8]qvalue.QValue
switch typedRecord := record.(type) {
case *model.InsertRecord[Items]:
// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := model.ItemsToJSON(typedRecord.Items)
if err != nil {
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize insert record items to JSON: %w", err),
}
return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err)
}

entries[3] = qvalue.QValueString{Val: itemsJSON}
Expand All @@ -89,15 +92,11 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor
case *model.UpdateRecord[Items]:
newItemsJSON, err := model.ItemsToJSON(typedRecord.NewItems)
if err != nil {
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record new items to JSON: %w", err),
}
return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err)
}
oldItemsJSON, err := model.ItemsToJSON(typedRecord.OldItems)
if err != nil {
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record old items to JSON: %w", err),
}
return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err)
}

entries[3] = qvalue.QValueString{Val: newItemsJSON}
Expand All @@ -108,9 +107,7 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor
case *model.DeleteRecord[Items]:
itemsJSON, err := model.ItemsToJSON(typedRecord.Items)
if err != nil {
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize delete record items to JSON: %w", err),
}
return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err)
}

entries[3] = qvalue.QValueString{Val: itemsJSON}
Expand All @@ -119,19 +116,15 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor
entries[7] = qvalue.QValueString{Val: KeysToString(typedRecord.UnchangedToastColumns)}

default:
return model.QRecordOrError{
Err: fmt.Errorf("unknown record type: %T", typedRecord),
}
return nil, fmt.Errorf("unknown record type: %T", typedRecord)
}

entries[0] = qvalue.QValueString{Val: uuid.New().String()}
entries[1] = qvalue.QValueInt64{Val: time.Now().UnixNano()}
entries[2] = qvalue.QValueString{Val: record.GetDestinationTableName()}
entries[6] = qvalue.QValueInt64{Val: batchID}

return model.QRecordOrError{
Record: entries[:],
}
return entries[:], nil
}

func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts {
Expand Down
17 changes: 7 additions & 10 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (q *QRecordBatch) FeedToQRecordStream(stream *QRecordStream) {
stream.SetSchema(q.Schema)

for _, record := range q.Records {
stream.Records <- QRecordOrError{Record: record}
stream.Records <- record
}
close(stream.Records)
}
Expand All @@ -49,7 +49,7 @@ func constructArray[T any](qValue qvalue.QValue, typeName string) (*pgtype.Array
type QRecordBatchCopyFromSource struct {
err error
stream *QRecordStream
currentRecord QRecordOrError
currentRecord []qvalue.QValue
numRecords int
}

Expand All @@ -59,14 +59,15 @@ func NewQRecordBatchCopyFromSource(
return &QRecordBatchCopyFromSource{
numRecords: 0,
stream: stream,
currentRecord: QRecordOrError{},
currentRecord: nil,
err: nil,
}
}

func (src *QRecordBatchCopyFromSource) Next() bool {
rec, ok := <-src.stream.Records
if !ok {
src.err = src.stream.Err()
return false
}

Expand All @@ -76,16 +77,12 @@ func (src *QRecordBatchCopyFromSource) Next() bool {
}

func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
if src.currentRecord.Err != nil {
src.err = src.currentRecord.Err
if src.err != nil {
return nil, src.err
}

record := src.currentRecord.Record
numEntries := len(record)

values := make([]interface{}, numEntries)
for i, qValue := range record {
values := make([]interface{}, len(src.currentRecord))
for i, qValue := range src.currentRecord {
if qValue.Value() == nil {
values[i] = nil
continue
Expand Down
29 changes: 18 additions & 11 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ type RecordTypeCounts struct {
DeleteCount int
}

type QRecordOrError struct {
Err error
Record []qvalue.QValue
}

type QRecordSchemaOrError struct {
Schema *qvalue.QRecordSchema
}

type QRecordStream struct {
schemaLatch chan struct{}
Records chan QRecordOrError
Records chan []qvalue.QValue
err error
schema qvalue.QRecordSchema
schemaSet bool
}
Expand Down Expand Up @@ -51,8 +43,9 @@ func (r *RecordsToStreamRequest[T]) GetRecords() <-chan Record[T] {
func NewQRecordStream(buffer int) *QRecordStream {
return &QRecordStream{
schemaLatch: make(chan struct{}),
Records: make(chan QRecordOrError, buffer),
Records: make(chan []qvalue.QValue, buffer),
schema: qvalue.QRecordSchema{},
err: nil,
schemaSet: false,
}
}
Expand All @@ -77,3 +70,17 @@ func (s *QRecordStream) IsSchemaSet() bool {
func (s *QRecordStream) SchemaChan() <-chan struct{} {
return s.schemaLatch
}

func (s *QRecordStream) Err() error {
return s.err
}

// Set error & close stream. Calling with multiple errors only tracks first error & does not panic.
// Close(nil) after an error won't panic, but Close after Close(nil) will panic,
// this is enough to be able to safely `defer stream.Close(nil)`.
func (s *QRecordStream) Close(err error) {
if s.err == nil {
s.err = err
close(s.Records)
}
}

0 comments on commit ec943f5

Please sign in to comment.