diff --git a/internal/datastore/common/errors.go b/internal/datastore/common/errors.go index 430b7f2daa..f38759966a 100644 --- a/internal/datastore/common/errors.go +++ b/internal/datastore/common/errors.go @@ -13,6 +13,32 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +// SerializationError is returned when there's been a serialization +// error while performing a datastore operation +type SerializationError struct { + error +} + +func (err SerializationError) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + err, + codes.Aborted, + spiceerrors.ForReason( + v1.ErrorReason_ERROR_REASON_WRITE_OR_DELETE_PRECONDITION_FAILURE, + map[string]string{}, + ), + ) +} + +func (err SerializationError) Unwrap() error { + return err.error +} + +// NewSerializationError creates a new SerializationError +func NewSerializationError(err error) error { + return SerializationError{err} +} + // CreateRelationshipExistsError is an error returned when attempting to CREATE an already-existing // relationship. type CreateRelationshipExistsError struct { diff --git a/internal/datastore/crdb/pool/pool.go b/internal/datastore/crdb/pool/pool.go index a2c179357e..443a681166 100644 --- a/internal/datastore/crdb/pool/pool.go +++ b/internal/datastore/crdb/pool/pool.go @@ -8,13 +8,13 @@ import ( "sync" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" + "github.com/authzed/spicedb/internal/datastore/postgres/common" log "github.com/authzed/spicedb/internal/logging" ) @@ -291,7 +291,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn) p.healthTracker.SetNodeHealth(nodeID, false) } - SleepOnErr(ctx, err, retries) + common.SleepOnErr(ctx, err, retries) conn, err = p.acquireFromDifferentNode(ctx, nodeID) if err != nil { @@ -301,7 +301,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn) } if errors.As(err, &retryable) { log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("retryable error") - SleepOnErr(ctx, err, retries) + common.SleepOnErr(ctx, err, retries) continue } conn.Release() @@ -323,13 +323,6 @@ func (p *RetryPool) GC(conn *pgx.Conn) { delete(p.nodeForConn, conn) } -// SleepOnErr sleeps for a short period of time after an error has occurred. -func SleepOnErr(ctx context.Context, err error, retries uint8) { - after := retry.BackoffExponentialWithJitter(100*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit - log.Ctx(ctx).Warn().Err(err).Dur("after", after).Msg("retrying on database error") - time.Sleep(after) -} - func (p *RetryPool) acquireFromDifferentNode(ctx context.Context, nodeID uint32) (*pgxpool.Conn, error) { log.Ctx(ctx).Info().Uint32("node_id", nodeID).Msg("acquiring a connection from a different node") for { diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 51f3b9526b..25c03e1955 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -15,6 +15,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb/pool" + pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -270,7 +271,7 @@ func (cds *crdbDatastore) WatchSchema(ctx context.Context, afterRevision datasto running = true // Sleep a bit for retrying. - pool.SleepOnErr(ctx, err, uint8(retryCount)) + pgxcommon.SleepOnErr(ctx, err, uint8(retryCount)) return } diff --git a/internal/datastore/postgres/common/errors.go b/internal/datastore/postgres/common/errors.go index 550fa685a5..f7ed9f41dc 100644 --- a/internal/datastore/postgres/common/errors.go +++ b/internal/datastore/postgres/common/errors.go @@ -14,6 +14,7 @@ import ( const ( pgUniqueConstraintViolation = "23505" pgSerializationFailure = "40001" + pgTransactionAborted = "25P02" ) var ( diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index 156b8d7af0..ecb6d70ef8 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -8,6 +8,7 @@ import ( "time" "github.com/exaring/otelpgx" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" zerologadapter "github.com/jackc/pgx-zerolog" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -87,16 +88,13 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) { level = tracelog.LogLevelDebug } - // do not log cancelled queries as errors - // do not log serialization failues as errors + truncateLargeSQL(data) + + // log cancellation and serialization errors at debug level if errArg, ok := data["err"]; ok { err, ok := errArg.(error) - if ok && errors.Is(err, context.Canceled) { - return - } - - var pgerr *pgconn.PgError - if errors.As(err, &pgerr) && pgerr.SQLState() == pgSerializationFailure { + if ok && (IsCancellationError(err) || IsSerializationError(err)) { + logger.Log(ctx, tracelog.LogLevelDebug, msg, data) return } } @@ -109,6 +107,52 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) { addTracer(connConfig, &tracelog.TraceLog{Logger: levelMappingFn(l), LogLevel: tracelog.LogLevelInfo}) } +func truncateLargeSQL(data map[string]any) { + const ( + maxSQLLen = 350 + maxSQLArgsLen = 50 + ) + + if sqlData, ok := data["sql"]; ok { + sqlString, ok := sqlData.(string) + if ok && len(sqlString) > maxSQLLen { + data["sql"] = sqlString[:maxSQLLen] + "..." + } + } + if argsData, ok := data["args"]; ok { + argsSlice, ok := argsData.([]any) + if ok && len(argsSlice) > maxSQLArgsLen { + data["args"] = argsSlice[:maxSQLArgsLen] + } + } +} + +func IsCancellationError(err error) bool { + if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + err.Error() == "conn closed" { // conns are sometimes closed async upon cancellation + return true + } + return false +} + +func IsSerializationError(err error) bool { + var pgerr *pgconn.PgError + if errors.As(err, &pgerr) && + // We need to check unique constraint here because some versions of postgres have an error where + // unique constraint violations are raised instead of serialization errors. + // (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com) + (pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation || pgerr.SQLState() == pgTransactionAborted) { + return true + } + + if errors.Is(err, pgx.ErrTxCommitRollback) { + return true + } + + return false +} + // ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig func ConfigureOTELTracer(connConfig *pgx.ConnConfig) { addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName())) @@ -233,3 +277,14 @@ func (t *QuerierFuncs) QueryRowFunc(ctx context.Context, rowFunc func(ctx contex func QuerierFuncsFor(d Querier) DBFuncQuerier { return &QuerierFuncs{d: d} } + +// SleepOnErr sleeps for a short period of time after an error has occurred. +func SleepOnErr(ctx context.Context, err error, retries uint8) { + after := retry.BackoffExponentialWithJitter(25*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit + log.Ctx(ctx).Debug().Err(err).Dur("after", after).Uint8("retry", retries+1).Msg("retrying on database error") + + select { + case <-time.After(after): + case <-ctx.Done(): + } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 08ae8bdbbe..f5321c38af 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -71,9 +71,6 @@ const ( batchDeleteSize = 1000 - pgSerializationFailure = "40001" - pgUniqueConstraintViolation = "23505" - livingTupleConstraint = "uq_relation_tuple_living_xid" ) @@ -328,7 +325,7 @@ func (pgd *pgDatastore) ReadWriteTx( for i := uint8(0); i <= pgd.maxRetries; i++ { var newXID xid8 var newSnapshot pgSnapshot - err = pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { + err = wrapError(pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { var err error newXID, newSnapshot, err = createNewTransaction(ctx, tx) if err != nil { @@ -351,22 +348,50 @@ func (pgd *pgDatastore) ReadWriteTx( } return fn(rwt) - }) + })) + if err != nil { if !config.DisableRetries && errorRetryable(err) { + pgxcommon.SleepOnErr(ctx, err, i) continue } + return datastore.NoRevision, err } + if i > 0 { + log.Debug().Uint8("retries", i).Msg("transaction succeeded after retry") + } + return postgresRevision{newSnapshot.markComplete(newXID.Uint64)}, nil } + if !config.DisableRetries { err = fmt.Errorf("max retries exceeded: %w", err) } + return datastore.NoRevision, err } +func wrapError(err error) error { + if pgxcommon.IsSerializationError(err) { + return common.NewSerializationError(err) + } + + // hack: pgx asyncClose usually happens after cancellation, + // but the reason for it being closed is not propagated + // and all we get is attempting to perform an operation + // on cancelled connection. This keeps the same error, + // but wrapped along a cancellation so that: + // - pgx logger does not log it + // - response is sent as canceled back to the client + if err != nil && err.Error() == "conn closed" { + return errors.Join(err, context.Canceled) + } + + return err +} + func (pgd *pgDatastore) Close() error { pgd.cancelGc() @@ -381,16 +406,20 @@ func (pgd *pgDatastore) Close() error { } func errorRetryable(err error) bool { - var pgerr *pgconn.PgError - if !errors.As(err, &pgerr) { - log.Debug().Err(err).Msg("couldn't determine a sqlstate error code") + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return false } - // We need to check unique constraint here because some versions of postgres have an error where - // unique constraint violations are raised instead of serialization errors. - // (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com) - return pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation + if pgconn.SafeToRetry(err) { + return true + } + + if pgxcommon.IsSerializationError(err) { + return true + } + + log.Warn().Err(err).Msg("unable to determine if pgx error is retryable") + return false } func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { diff --git a/internal/datastore/postgres/postgres_test.go b/internal/datastore/postgres/postgres_test.go index ad00fbde38..891073d026 100644 --- a/internal/datastore/postgres/postgres_test.go +++ b/internal/datastore/postgres/postgres_test.go @@ -34,6 +34,8 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +const pgSerializationFailure = "40001" + // Implement the TestableDatastore interface func (pgd *pgDatastore) ExampleRetryableError() error { return &pgconn.PgError{