Skip to content

Commit

Permalink
add error logging (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jan 13, 2024
1 parent e1ba45f commit 426b5f4
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (qe *QRepQueryExecutor) SetTestEnv(testEnv bool) {
func (qe *QRepQueryExecutor) ExecuteQuery(query string, args ...interface{}) (pgx.Rows, error) {
rows, err := qe.pool.Query(qe.ctx, query, args...)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to execute query", slog.Any("error", err))
return nil, err
}
return rows, nil
Expand All @@ -92,6 +93,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc

rows, err := tx.Query(qe.ctx, q)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to execute query in tx", slog.Any("error", err))
return nil, err
}

Expand Down Expand Up @@ -135,6 +137,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
return nil, fmt.Errorf("failed to map row to QRecord: %w", err)
}
records = append(records, record)
Expand Down Expand Up @@ -169,6 +172,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
}
Expand Down Expand Up @@ -267,6 +271,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
return nil, err
case schema := <-stream.SchemaChan():
if schema.Err != nil {
qe.logger.Error("[pg_query_executor] failed to get schema from stream", slog.Any("error", schema.Err))
return nil, fmt.Errorf("failed to get schema from stream: %w", schema.Err)
}
batch := &model.QRecordBatch{
Expand Down Expand Up @@ -327,6 +332,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotX

err = tx.QueryRow(qe.ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err))
return 0, currentSnapshotXmin.Int64, err
}

Expand Down Expand Up @@ -363,6 +369,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

randomUint, err := shared.RandomUInt64()
if err != nil {
qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to generate random uint: %w", err),
}
Expand Down Expand Up @@ -390,6 +397,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
for {
numRows, err := qe.processFetchedRows(query, tx, cursorName, fetchSize, stream)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to process fetched rows", slog.Any("error", err))
return 0, err
}

Expand All @@ -407,6 +415,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info("Committing transaction")
err = tx.Commit(qe.ctx)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to commit transaction: %w", err),
}
Expand All @@ -426,6 +435,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,

values, err := row.Values()
if err != nil {
slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
}

Expand All @@ -435,6 +445,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
Expand Down

0 comments on commit 426b5f4

Please sign in to comment.