Skip to content

Commit

Permalink
pkg/sqlutil: add QueryHook for wrapping Queryers with timeouts, prome…
Browse files Browse the repository at this point in the history
…theus metrics and slow query logging (#396)
  • Loading branch information
jmank88 authored Mar 12, 2024
1 parent 7ec0dab commit 9bf02a1
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 7 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.2.0
github.com/linkedin/goavro/v2 v2.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
Expand Down
126 changes: 126 additions & 0 deletions pkg/sqlutil/hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package sqlutil

import (
"context"
"database/sql"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

var _ DB = &WrappedDB{}

// WrappedDB is a [DB] which invokes a [QueryHook] on each call.
type WrappedDB struct {
db DB
lggr logger.Logger
hook QueryHook
}

// QueryHook is a func that is executed for each query, providing an opportunity to measure, log, inspect/modify errors, etc.
// The do func *must* be called.
// Logs emitted through the provided logger.Logger will have the caller's line info.
//
// See [MonitorHook] and [TimeoutHook] for examples.
type QueryHook func(ctx context.Context, lggr logger.Logger, do func(context.Context) error, query string, args ...any) error

// NewWrappedDB returns a new [WrappedDB] that calls each [QueryHook] in the provided order.
func NewWrappedDB(db DB, l logger.Logger, hs ...QueryHook) *WrappedDB {
iq := WrappedDB{db: db,
lggr: logger.Helper(logger.Named(l, "WrappedDB"), 2), // skip our own wrapper and one interceptor
hook: noopHook,
}
switch len(hs) {
case 0:
return &iq
case 1:
iq.hook = hs[0]
return &iq
}

// Nest the QueryHook calls so that they are wrapped from first to last.
// Example:
// [A, B, C] => A(B(C(do())))
for i := len(hs) - 1; i >= 0; i-- {
next := hs[i]
prev := iq.hook
iq.hook = func(ctx context.Context, lggr logger.Logger, do func(context.Context) error, query string, args ...any) error {
// opt: cache the construction of these loggers
lggr = logger.Helper(lggr, 1) // skip one more for this wrapper
return next(ctx, lggr, func(ctx context.Context) error {
lggr = logger.Helper(lggr, 2) // skip two more for do() and this extra wrapper
return prev(ctx, lggr, do, query, args...)
}, query, args...)
}
}
return &iq
}

func noopHook(ctx context.Context, lggr logger.Logger, do func(context.Context) error, query string, args ...any) error {
return do(ctx)
}

func (w *WrappedDB) DriverName() string {
return w.db.DriverName()
}

func (w *WrappedDB) Rebind(s string) string {
return w.db.Rebind(s)
}

func (w *WrappedDB) BindNamed(s string, i interface{}) (string, []any, error) {
return w.db.BindNamed(s, i)
}

func (w *WrappedDB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
err = w.hook(ctx, w.lggr, func(ctx context.Context) (err error) {
rows, err = w.db.QueryContext(ctx, query, args...) //nolint
return
}, query, args...)
return
}

func (w *WrappedDB) QueryxContext(ctx context.Context, query string, args ...any) (rows *sqlx.Rows, err error) {
err = w.hook(ctx, w.lggr, func(ctx context.Context) (err error) {
rows, err = w.db.QueryxContext(ctx, query, args...) //nolint:sqlclosecheck
return
}, query, args...)
return
}

func (w *WrappedDB) QueryRowxContext(ctx context.Context, query string, args ...any) (row *sqlx.Row) {
_ = w.hook(ctx, w.lggr, func(ctx context.Context) error {
row = w.db.QueryRowxContext(ctx, query, args...)
return nil
}, query, args...)
return
}

func (w *WrappedDB) ExecContext(ctx context.Context, query string, args ...any) (res sql.Result, err error) {
err = w.hook(ctx, w.lggr, func(ctx context.Context) (err error) {
res, err = w.db.ExecContext(ctx, query, args...)
return
}, query, args...)
return
}

func (w *WrappedDB) PrepareContext(ctx context.Context, query string) (stmt *sql.Stmt, err error) {
err = w.hook(ctx, w.lggr, func(ctx context.Context) (err error) {
stmt, err = w.db.PrepareContext(ctx, query) //nolint:sqlclosecheck
return
}, query, nil)
return
}

func (w *WrappedDB) GetContext(ctx context.Context, dest interface{}, query string, args ...any) error {
return w.hook(ctx, w.lggr, func(ctx context.Context) error {
return w.db.GetContext(ctx, dest, query, args...)
}, query, args...)
}

func (w *WrappedDB) SelectContext(ctx context.Context, dest interface{}, query string, args ...any) error {
return w.hook(ctx, w.lggr, func(ctx context.Context) error {
return w.db.SelectContext(ctx, dest, query, args...)
}, query, args...)
}
124 changes: 124 additions & 0 deletions pkg/sqlutil/hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package sqlutil

import (
"context"
"database/sql"
"errors"
"fmt"
"runtime"
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

const (
getDur = 10 * time.Millisecond
selDur = 200 * time.Millisecond
)

func TestNewInterceptedQueryer(t *testing.T) {
lggr, ol := logger.TestObserved(t, zapcore.InfoLevel)
var db DB = &database{}
var sentinelErr = errors.New("intercepted error")
const fakeError = "fake warning"
db = NewWrappedDB(db, lggr, TimeoutHook(selDur/2), noopHook, MonitorHook(func() bool { return true }), noopHook, func(ctx context.Context, lggr logger.Logger, do func(context.Context) error, query string, args ...any) error {
err := do(ctx)
if err != nil {
return err
}
lggr.Error(fakeError)
return sentinelErr
})
ctx := tests.Context(t)

// Error intercepted
err := db.GetContext(ctx, "test", "foo", 42, "bar")
_, file, line, ok := runtime.Caller(0)
require.True(t, ok)
expCaller := fmt.Sprintf("%s:%d", file, line-1)
require.ErrorIs(t, err, sentinelErr)
logs := ol.FilterMessage(slowMsg).All()
require.Len(t, logs, 1)
assert.Equal(t, zapcore.WarnLevel, logs[0].Level)
assert.Equal(t, expCaller, logs[0].Caller.String())
logs = ol.FilterMessage(fakeError).All()
require.Len(t, logs, 1)
assert.Equal(t, zapcore.ErrorLevel, logs[0].Level)
assert.Equal(t, expCaller, logs[0].Caller.String())
_ = ol.TakeAll()

// Timeout applied
err = db.SelectContext(ctx, "test", "foo", 42, "bar")
require.ErrorIs(t, err, context.DeadlineExceeded)
logs = ol.FilterMessage(slowMsg).All()
require.Len(t, logs, 1)
assert.Equal(t, zapcore.DPanicLevel, logs[0].Level)
_ = ol.TakeAll()

// Without default timeout
err = db.SelectContext(WithoutDefaultTimeout(ctx), "test", "foo", 42, "bar")
require.ErrorIs(t, err, sentinelErr)

// W/o default, but with our own
ctx2, cancel := context.WithTimeout(WithoutDefaultTimeout(ctx), selDur/100)
t.Cleanup(cancel)
err = db.SelectContext(ctx2, "test", "foo", 42, "bar")
require.ErrorIs(t, err, context.DeadlineExceeded)
}

var _ DB = &database{}

type database struct{}

func (q *database) DriverName() string { return "" }

func (q *database) Rebind(s string) string { return "" }

func (q *database) BindNamed(s string, i interface{}) (string, []interface{}, error) {
return "", nil, nil
}

func (q *database) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return nil, nil
}

func (q *database) QueryxContext(ctx context.Context, query string, args ...interface{}) (*sqlx.Rows, error) {
return nil, nil
}

func (q *database) QueryRowxContext(ctx context.Context, query string, args ...interface{}) *sqlx.Row {
return nil
}

func (q *database) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return nil, nil
}

func (q *database) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
return nil, nil
}

func (q *database) GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(getDur):
}
return nil
}

func (q *database) SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(selDur):
}
return nil
}
Loading

0 comments on commit 9bf02a1

Please sign in to comment.