From 426b5f4f1bee17ac72983d3ff30ea3a407b6bb0b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 12 Jan 2024 21:21:52 -0500 Subject: [PATCH] add error logging (#1075) --- flow/connectors/postgres/qrep_query_executor.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 7a2d74673d..0bbd10561d 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -73,6 +73,7 @@ func (qe *QRepQueryExecutor) SetTestEnv(testEnv bool) { func (qe *QRepQueryExecutor) ExecuteQuery(query string, args ...interface{}) (pgx.Rows, error) { rows, err := qe.pool.Query(qe.ctx, query, args...) if err != nil { + qe.logger.Error("[pg_query_executor] failed to execute query", slog.Any("error", err)) return nil, err } return rows, nil @@ -92,6 +93,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc rows, err := tx.Query(qe.ctx, q) if err != nil { + qe.logger.Error("[pg_query_executor] failed to execute query in tx", slog.Any("error", err)) return nil, err } @@ -135,6 +137,7 @@ func (qe *QRepQueryExecutor) ProcessRows( for rows.Next() { record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) if err != nil { + qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) return nil, fmt.Errorf("failed to map row to QRecord: %w", err) } records = append(records, record) @@ -169,6 +172,7 @@ func (qe *QRepQueryExecutor) processRowsStream( for rows.Next() { record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) if err != nil { + qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to map row to QRecord: %w", err), } @@ -267,6 +271,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( return nil, err case schema := <-stream.SchemaChan(): if schema.Err != nil { + qe.logger.Error("[pg_query_executor] failed to get schema from stream", slog.Any("error", schema.Err)) return nil, fmt.Errorf("failed to get schema from stream: %w", schema.Err) } batch := &model.QRecordBatch{ @@ -327,6 +332,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotX err = tx.QueryRow(qe.ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(¤tSnapshotXmin) if err != nil { + qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err)) return 0, currentSnapshotXmin.Int64, err } @@ -363,6 +369,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( randomUint, err := shared.RandomUInt64() if err != nil { + qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err)) stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to generate random uint: %w", err), } @@ -390,6 +397,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( for { numRows, err := qe.processFetchedRows(query, tx, cursorName, fetchSize, stream) if err != nil { + qe.logger.Error("[pg_query_executor] failed to process fetched rows", slog.Any("error", err)) return 0, err } @@ -407,6 +415,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( qe.logger.Info("Committing transaction") err = tx.Commit(qe.ctx) if err != nil { + qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err)) stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to commit transaction: %w", err), } @@ -426,6 +435,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, values, err := row.Values() if err != nil { + slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err) } @@ -435,6 +445,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, if !ok { tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) if err != nil { + slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err) } record.Set(i, tmp)