Skip to content

Commit

Permalink
serialization errors are not well handled in the Postgres datastore:
Browse files Browse the repository at this point in the history
- they tend to be logged by the `pgx` driver with potentially very
  large SQL statements
- they are not retried, nor there is a backoff. There are also other
  types of errors that are not being retried, like `conn closed`
  (which pgx considers safe to retry)
- they return to the client as unknown instead of the recommended
  aborted gRPC code
- no information is logged on retries in `debug` level
- logs can get extremely verbose due to large SQL and args

Serialization errors can also manifest as a transaction rollback,
and so code is added to retry and wrap the error accordingly,
in which case gRPC `aborted` is returned.

Additionally, retries after serialization can lead to longer
requests, and longer requests can hit the deadline. Sometimes
a cancellation leads to the pgx connection being asynchronously
closed, losing information about the original cause that closed it.
Unfortunately, there isn't an exported type for this, so it's
detected via the string. The error is turned into a multi-error
that has cancellation. In a local load test, evidence shows that
in all cases the original reason was cancellation.

A call to `pgx.SafeToRetry` is added to handle various pgx internal
errors (like `conn closed`) and determine if they can be retried.
This has helped retry errors it didn't use to retry before.

Retries also now have a backoff, since the default maximum of 10
can knock down the database. The retry function is refactored, and
the minimum wait is reduced since the total backoff grows pretty quickly,
and will easily make the client hit the typical deadlines. The log
message for retries is moved to Debug since serialization errors can
be pretty common so it shouldn't be considered a warning.

I also detected that the backoff function wasn't respecting the
context cancellation, so even when the client had canceled, the
goroutine would be still asleep. The function was changed to cancel
immediately after the client signal.
  • Loading branch information
vroldanbet committed Sep 29, 2023
1 parent 84250fd commit 04200d5
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 31 deletions.
26 changes: 26 additions & 0 deletions internal/datastore/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

// SerializationError is returned when there's been a serialization
// error while performing a datastore operation
type SerializationError struct {
error
}

func (err SerializationError) GRPCStatus() *status.Status {
return spiceerrors.WithCodeAndDetails(
err,
codes.Aborted,
spiceerrors.ForReason(
v1.ErrorReason_ERROR_REASON_WRITE_OR_DELETE_PRECONDITION_FAILURE,
map[string]string{},
),
)
}

func (err SerializationError) Unwrap() error {
return err.error
}

// NewSerializationError creates a new SerializationError
func NewSerializationError(err error) error {
return SerializationError{err}
}

// CreateRelationshipExistsError is an error returned when attempting to CREATE an already-existing
// relationship.
type CreateRelationshipExistsError struct {
Expand Down
13 changes: 3 additions & 10 deletions internal/datastore/crdb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"sync"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

Expand Down Expand Up @@ -291,7 +291,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
p.healthTracker.SetNodeHealth(nodeID, false)
}

SleepOnErr(ctx, err, retries)
common.SleepOnErr(ctx, err, retries)

conn, err = p.acquireFromDifferentNode(ctx, nodeID)
if err != nil {
Expand All @@ -301,7 +301,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
}
if errors.As(err, &retryable) {
log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("retryable error")
SleepOnErr(ctx, err, retries)
common.SleepOnErr(ctx, err, retries)
continue
}
conn.Release()
Expand All @@ -323,13 +323,6 @@ func (p *RetryPool) GC(conn *pgx.Conn) {
delete(p.nodeForConn, conn)
}

// SleepOnErr sleeps for a short period of time after an error has occurred.
func SleepOnErr(ctx context.Context, err error, retries uint8) {
after := retry.BackoffExponentialWithJitter(100*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit
log.Ctx(ctx).Warn().Err(err).Dur("after", after).Msg("retrying on database error")
time.Sleep(after)
}

func (p *RetryPool) acquireFromDifferentNode(ctx context.Context, nodeID uint32) (*pgxpool.Conn, error) {
log.Ctx(ctx).Info().Uint32("node_id", nodeID).Msg("acquiring a connection from a different node")
for {
Expand Down
3 changes: 2 additions & 1 deletion internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/crdb/pool"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (cds *crdbDatastore) WatchSchema(ctx context.Context, afterRevision datasto
running = true

// Sleep a bit for retrying.
pool.SleepOnErr(ctx, err, uint8(retryCount))
pgxcommon.SleepOnErr(ctx, err, uint8(retryCount))
return
}

Expand Down
1 change: 1 addition & 0 deletions internal/datastore/postgres/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
pgUniqueConstraintViolation = "23505"
pgSerializationFailure = "40001"
pgTransactionAborted = "25P02"
)

var (
Expand Down
71 changes: 63 additions & 8 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/exaring/otelpgx"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
zerologadapter "github.com/jackc/pgx-zerolog"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -87,16 +88,13 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
level = tracelog.LogLevelDebug
}

// do not log cancelled queries as errors
// do not log serialization failues as errors
truncateLargeSQL(data)

// log cancellation and serialization errors at debug level
if errArg, ok := data["err"]; ok {
err, ok := errArg.(error)
if ok && errors.Is(err, context.Canceled) {
return
}

var pgerr *pgconn.PgError
if errors.As(err, &pgerr) && pgerr.SQLState() == pgSerializationFailure {
if ok && (IsCancellationError(err) || IsSerializationError(err)) {
logger.Log(ctx, tracelog.LogLevelDebug, msg, data)
return
}
}
Expand All @@ -109,6 +107,52 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
addTracer(connConfig, &tracelog.TraceLog{Logger: levelMappingFn(l), LogLevel: tracelog.LogLevelInfo})
}

func truncateLargeSQL(data map[string]any) {
const (
maxSQLLen = 350
maxSQLArgsLen = 50
)

if sqlData, ok := data["sql"]; ok {
sqlString, ok := sqlData.(string)
if ok && len(sqlString) > maxSQLLen {
data["sql"] = sqlString[:maxSQLLen] + "..."
}
}
if argsData, ok := data["args"]; ok {
argsSlice, ok := argsData.([]any)
if ok && len(argsSlice) > maxSQLArgsLen {
data["args"] = argsSlice[:maxSQLArgsLen]
}
}
}

func IsCancellationError(err error) bool {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
err.Error() == "conn closed" { // conns are sometimes closed async upon cancellation
return true
}
return false
}

func IsSerializationError(err error) bool {
var pgerr *pgconn.PgError
if errors.As(err, &pgerr) &&
// We need to check unique constraint here because some versions of postgres have an error where
// unique constraint violations are raised instead of serialization errors.
// (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com)
(pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation || pgerr.SQLState() == pgTransactionAborted) {
return true
}

if errors.Is(err, pgx.ErrTxCommitRollback) {
return true
}

return false
}

// ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig
func ConfigureOTELTracer(connConfig *pgx.ConnConfig) {
addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName()))
Expand Down Expand Up @@ -233,3 +277,14 @@ func (t *QuerierFuncs) QueryRowFunc(ctx context.Context, rowFunc func(ctx contex
func QuerierFuncsFor(d Querier) DBFuncQuerier {
return &QuerierFuncs{d: d}
}

// SleepOnErr sleeps for a short period of time after an error has occurred.
func SleepOnErr(ctx context.Context, err error, retries uint8) {
after := retry.BackoffExponentialWithJitter(25*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit
log.Ctx(ctx).Debug().Err(err).Dur("after", after).Uint8("retry", retries+1).Msg("retrying on database error")

select {
case <-time.After(after):
case <-ctx.Done():
}
}
53 changes: 41 additions & 12 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ const (

batchDeleteSize = 1000

pgSerializationFailure = "40001"
pgUniqueConstraintViolation = "23505"

livingTupleConstraint = "uq_relation_tuple_living_xid"
)

Expand Down Expand Up @@ -328,7 +325,7 @@ func (pgd *pgDatastore) ReadWriteTx(
for i := uint8(0); i <= pgd.maxRetries; i++ {
var newXID xid8
var newSnapshot pgSnapshot
err = pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error {
err = wrapError(pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error {
var err error
newXID, newSnapshot, err = createNewTransaction(ctx, tx)
if err != nil {
Expand All @@ -351,22 +348,50 @@ func (pgd *pgDatastore) ReadWriteTx(
}

return fn(rwt)
})
}))

if err != nil {
if !config.DisableRetries && errorRetryable(err) {
pgxcommon.SleepOnErr(ctx, err, i)
continue
}

return datastore.NoRevision, err
}

if i > 0 {
log.Debug().Uint8("retries", i).Msg("transaction succeeded after retry")
}

return postgresRevision{newSnapshot.markComplete(newXID.Uint64)}, nil
}

if !config.DisableRetries {
err = fmt.Errorf("max retries exceeded: %w", err)
}

return datastore.NoRevision, err
}

func wrapError(err error) error {
if pgxcommon.IsSerializationError(err) {
return common.NewSerializationError(err)
}

// hack: pgx asyncClose usually happens after cancellation,
// but the reason for it being closed is not propagated
// and all we get is attempting to perform an operation
// on cancelled connection. This keeps the same error,
// but wrapped along a cancellation so that:
// - pgx logger does not log it
// - response is sent as canceled back to the client
if err != nil && err.Error() == "conn closed" {
return errors.Join(err, context.Canceled)
}

return err
}

func (pgd *pgDatastore) Close() error {
pgd.cancelGc()

Expand All @@ -381,16 +406,20 @@ func (pgd *pgDatastore) Close() error {
}

func errorRetryable(err error) bool {
var pgerr *pgconn.PgError
if !errors.As(err, &pgerr) {
log.Debug().Err(err).Msg("couldn't determine a sqlstate error code")
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}

// We need to check unique constraint here because some versions of postgres have an error where
// unique constraint violations are raised instead of serialization errors.
// (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com)
return pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation
if pgconn.SafeToRetry(err) {
return true
}

if pgxcommon.IsSerializationError(err) {
return true
}

log.Warn().Err(err).Msg("unable to determine if pgx error is retryable")
return false
}

func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

const pgSerializationFailure = "40001"

// Implement the TestableDatastore interface
func (pgd *pgDatastore) ExampleRetryableError() error {
return &pgconn.PgError{
Expand Down

0 comments on commit 04200d5

Please sign in to comment.