From f6c161968e7c72c7f0745d981951b445e7897e18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 28 Sep 2023 23:35:59 +0100 Subject: [PATCH] serialization errors are not well handled in the Postgres datastore: - they tend to be logged by the `pgx` driver with potentially very large SQL statements - they are not retried, nor there is a backoff. There are also other types of errors that are not being retried, like `conn closed` (which pgx considers safe to retry) - they return to the client as unknown instead of the recommended aborted gRPC code - no information is logged on retries in `debug` level - logs can get extremely verbose due to large SQL and args Serialization errors can also manifest as a transaction rollback, and so code is added to retry and wrap the error accordingly, in which case gRPC `aborted` is returned. Additionally, retries after serialization can lead to longer requests, and longer requests can hit the deadline. Sometimes a cancellation leads to the pgx connection being asynchronously closed, losing information about the original cause that closed it. Unfortunately, there isn't an exported type for this, so it's detected via the string. The error is turned into a multi-error that has cancellation. In a local load test, evidence shows that in all cases the original reason was cancellation. A call to `pgx.SafeToRetry` is added to handle various pgx internal errors (like `conn closed`) and determine if they can be retried. This has helped retry errors it didn't use to retry before. Retries also now have a backoff, since the default maximum of 10 can knock down the database. The retry function is refactored, and the minimum wait is reduced since the total backoff grows pretty quickly, and will easily make the client hit the typical deadlines. The log message for retries is moved to Debug since serialization errors can be pretty common so it shouldn't be considered a warning. I also detected that the backoff function wasn't respecting the context cancellation, so even when the client had canceled, the goroutine would be still asleep. The function was changed to cancel immediately after the client signal. --- internal/datastore/common/errors.go | 26 +++++++ internal/datastore/crdb/pool/pool.go | 13 +--- internal/datastore/crdb/watch.go | 3 +- internal/datastore/postgres/common/errors.go | 1 + internal/datastore/postgres/common/pgx.go | 71 +++++++++++++++++--- internal/datastore/postgres/postgres.go | 53 +++++++++++---- internal/datastore/postgres/postgres_test.go | 2 + 7 files changed, 138 insertions(+), 31 deletions(-) diff --git a/internal/datastore/common/errors.go b/internal/datastore/common/errors.go index 430b7f2daa..f38759966a 100644 --- a/internal/datastore/common/errors.go +++ b/internal/datastore/common/errors.go @@ -13,6 +13,32 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +// SerializationError is returned when there's been a serialization +// error while performing a datastore operation +type SerializationError struct { + error +} + +func (err SerializationError) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + err, + codes.Aborted, + spiceerrors.ForReason( + v1.ErrorReason_ERROR_REASON_WRITE_OR_DELETE_PRECONDITION_FAILURE, + map[string]string{}, + ), + ) +} + +func (err SerializationError) Unwrap() error { + return err.error +} + +// NewSerializationError creates a new SerializationError +func NewSerializationError(err error) error { + return SerializationError{err} +} + // CreateRelationshipExistsError is an error returned when attempting to CREATE an already-existing // relationship. type CreateRelationshipExistsError struct { diff --git a/internal/datastore/crdb/pool/pool.go b/internal/datastore/crdb/pool/pool.go index a2c179357e..443a681166 100644 --- a/internal/datastore/crdb/pool/pool.go +++ b/internal/datastore/crdb/pool/pool.go @@ -8,13 +8,13 @@ import ( "sync" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" + "github.com/authzed/spicedb/internal/datastore/postgres/common" log "github.com/authzed/spicedb/internal/logging" ) @@ -291,7 +291,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn) p.healthTracker.SetNodeHealth(nodeID, false) } - SleepOnErr(ctx, err, retries) + common.SleepOnErr(ctx, err, retries) conn, err = p.acquireFromDifferentNode(ctx, nodeID) if err != nil { @@ -301,7 +301,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn) } if errors.As(err, &retryable) { log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("retryable error") - SleepOnErr(ctx, err, retries) + common.SleepOnErr(ctx, err, retries) continue } conn.Release() @@ -323,13 +323,6 @@ func (p *RetryPool) GC(conn *pgx.Conn) { delete(p.nodeForConn, conn) } -// SleepOnErr sleeps for a short period of time after an error has occurred. -func SleepOnErr(ctx context.Context, err error, retries uint8) { - after := retry.BackoffExponentialWithJitter(100*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit - log.Ctx(ctx).Warn().Err(err).Dur("after", after).Msg("retrying on database error") - time.Sleep(after) -} - func (p *RetryPool) acquireFromDifferentNode(ctx context.Context, nodeID uint32) (*pgxpool.Conn, error) { log.Ctx(ctx).Info().Uint32("node_id", nodeID).Msg("acquiring a connection from a different node") for { diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 51f3b9526b..25c03e1955 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -15,6 +15,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb/pool" + pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -270,7 +271,7 @@ func (cds *crdbDatastore) WatchSchema(ctx context.Context, afterRevision datasto running = true // Sleep a bit for retrying. - pool.SleepOnErr(ctx, err, uint8(retryCount)) + pgxcommon.SleepOnErr(ctx, err, uint8(retryCount)) return } diff --git a/internal/datastore/postgres/common/errors.go b/internal/datastore/postgres/common/errors.go index 550fa685a5..f7ed9f41dc 100644 --- a/internal/datastore/postgres/common/errors.go +++ b/internal/datastore/postgres/common/errors.go @@ -14,6 +14,7 @@ import ( const ( pgUniqueConstraintViolation = "23505" pgSerializationFailure = "40001" + pgTransactionAborted = "25P02" ) var ( diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index 156b8d7af0..97c381c14f 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -8,6 +8,7 @@ import ( "time" "github.com/exaring/otelpgx" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" zerologadapter "github.com/jackc/pgx-zerolog" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -87,16 +88,13 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) { level = tracelog.LogLevelDebug } - // do not log cancelled queries as errors - // do not log serialization failues as errors + truncateLargeSQL(data) + + // log cancellation and serialization errors at debug level if errArg, ok := data["err"]; ok { err, ok := errArg.(error) - if ok && errors.Is(err, context.Canceled) { - return - } - - var pgerr *pgconn.PgError - if errors.As(err, &pgerr) && pgerr.SQLState() == pgSerializationFailure { + if ok && (IsCancellationError(err) || IsSerializationError(err)) { + logger.Log(ctx, tracelog.LogLevelDebug, msg, data) return } } @@ -109,6 +107,52 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) { addTracer(connConfig, &tracelog.TraceLog{Logger: levelMappingFn(l), LogLevel: tracelog.LogLevelInfo}) } +func truncateLargeSQL(data map[string]any) { + const ( + maxSQLLen = 250 + maxSQLArgsLen = 50 + ) + + if sqlData, ok := data["sql"]; ok { + sqlString, ok := sqlData.(string) + if ok && len(sqlString) > maxSQLLen { + data["sql"] = sqlString[:maxSQLLen] + "..." + } + } + if argsData, ok := data["args"]; ok { + argsSlice, ok := argsData.([]any) + if ok && len(argsSlice) > maxSQLArgsLen { + data["args"] = argsSlice[:maxSQLArgsLen] + } + } +} + +func IsCancellationError(err error) bool { + if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + err.Error() == "conn closed" { // conns are sometimes closed async upon cancellation + return true + } + return false +} + +func IsSerializationError(err error) bool { + var pgerr *pgconn.PgError + if errors.As(err, &pgerr) && + // We need to check unique constraint here because some versions of postgres have an error where + // unique constraint violations are raised instead of serialization errors. + // (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com) + (pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation || pgerr.SQLState() == pgTransactionAborted) { + return true + } + + if errors.Is(err, pgx.ErrTxCommitRollback) { + return true + } + + return false +} + // ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig func ConfigureOTELTracer(connConfig *pgx.ConnConfig) { addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName())) @@ -233,3 +277,14 @@ func (t *QuerierFuncs) QueryRowFunc(ctx context.Context, rowFunc func(ctx contex func QuerierFuncsFor(d Querier) DBFuncQuerier { return &QuerierFuncs{d: d} } + +// SleepOnErr sleeps for a short period of time after an error has occurred. +func SleepOnErr(ctx context.Context, err error, retries uint8) { + after := retry.BackoffExponentialWithJitter(25*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit + log.Ctx(ctx).Debug().Err(err).Dur("after", after).Uint8("retry", retries+1).Msg("retrying on database error") + + select { + case <-time.After(after): + case <-ctx.Done(): + } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 08ae8bdbbe..f5321c38af 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -71,9 +71,6 @@ const ( batchDeleteSize = 1000 - pgSerializationFailure = "40001" - pgUniqueConstraintViolation = "23505" - livingTupleConstraint = "uq_relation_tuple_living_xid" ) @@ -328,7 +325,7 @@ func (pgd *pgDatastore) ReadWriteTx( for i := uint8(0); i <= pgd.maxRetries; i++ { var newXID xid8 var newSnapshot pgSnapshot - err = pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { + err = wrapError(pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { var err error newXID, newSnapshot, err = createNewTransaction(ctx, tx) if err != nil { @@ -351,22 +348,50 @@ func (pgd *pgDatastore) ReadWriteTx( } return fn(rwt) - }) + })) + if err != nil { if !config.DisableRetries && errorRetryable(err) { + pgxcommon.SleepOnErr(ctx, err, i) continue } + return datastore.NoRevision, err } + if i > 0 { + log.Debug().Uint8("retries", i).Msg("transaction succeeded after retry") + } + return postgresRevision{newSnapshot.markComplete(newXID.Uint64)}, nil } + if !config.DisableRetries { err = fmt.Errorf("max retries exceeded: %w", err) } + return datastore.NoRevision, err } +func wrapError(err error) error { + if pgxcommon.IsSerializationError(err) { + return common.NewSerializationError(err) + } + + // hack: pgx asyncClose usually happens after cancellation, + // but the reason for it being closed is not propagated + // and all we get is attempting to perform an operation + // on cancelled connection. This keeps the same error, + // but wrapped along a cancellation so that: + // - pgx logger does not log it + // - response is sent as canceled back to the client + if err != nil && err.Error() == "conn closed" { + return errors.Join(err, context.Canceled) + } + + return err +} + func (pgd *pgDatastore) Close() error { pgd.cancelGc() @@ -381,16 +406,20 @@ func (pgd *pgDatastore) Close() error { } func errorRetryable(err error) bool { - var pgerr *pgconn.PgError - if !errors.As(err, &pgerr) { - log.Debug().Err(err).Msg("couldn't determine a sqlstate error code") + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return false } - // We need to check unique constraint here because some versions of postgres have an error where - // unique constraint violations are raised instead of serialization errors. - // (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com) - return pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation + if pgconn.SafeToRetry(err) { + return true + } + + if pgxcommon.IsSerializationError(err) { + return true + } + + log.Warn().Err(err).Msg("unable to determine if pgx error is retryable") + return false } func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { diff --git a/internal/datastore/postgres/postgres_test.go b/internal/datastore/postgres/postgres_test.go index ad00fbde38..891073d026 100644 --- a/internal/datastore/postgres/postgres_test.go +++ b/internal/datastore/postgres/postgres_test.go @@ -34,6 +34,8 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +const pgSerializationFailure = "40001" + // Implement the TestableDatastore interface func (pgd *pgDatastore) ExampleRetryableError() error { return &pgconn.PgError{