From f6c25f52f98976c87d6c2cb4bc27dae203838a52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 28 Sep 2023 23:35:59 +0100 Subject: [PATCH] improve experience with serialization errors serialization errors are not well handled in the postgres datastore: - they tend to be logged by the pgx driver with potentially very large SQL statements - they are not retried - they return to the client as unknown instead of the recommended aborted gRPC code - no information is logged on retries in Debug level Serialization can also manifest as a transaction rollback, and so code is added to retry and wrap the error accordingly so gRPC Aborted is returned. Additionally, retries after serialization can lead to longer requests, and longer requests can hit the deadline. Sometimes a cancellation leads to the pgx connection to be asynchronously closed, loosing information about the original cause it was closed. Unfortunately there isn't an exported type for this, so it's detected via the string. The error is turned into a multi error that has cancellation. In a local load-test, evidence shows that in all cases the original reason was cancellation. A call to pgx.SafeToRetry is added to handle various pgx internal errors (like "conn closed") and determine if they can be retried. --- internal/datastore/common/errors.go | 26 +++++++++++ internal/datastore/postgres/common/errors.go | 1 + internal/datastore/postgres/common/pgx.go | 8 +++- internal/datastore/postgres/postgres.go | 45 ++++++++++++++++++-- 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/internal/datastore/common/errors.go b/internal/datastore/common/errors.go index 430b7f2daa..f38759966a 100644 --- a/internal/datastore/common/errors.go +++ b/internal/datastore/common/errors.go @@ -13,6 +13,32 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +// SerializationError is returned when there's been a serialization +// error while performing a datastore operation +type SerializationError struct { + error +} + +func (err SerializationError) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + err, + codes.Aborted, + spiceerrors.ForReason( + v1.ErrorReason_ERROR_REASON_WRITE_OR_DELETE_PRECONDITION_FAILURE, + map[string]string{}, + ), + ) +} + +func (err SerializationError) Unwrap() error { + return err.error +} + +// NewSerializationError creates a new SerializationError +func NewSerializationError(err error) error { + return SerializationError{err} +} + // CreateRelationshipExistsError is an error returned when attempting to CREATE an already-existing // relationship. type CreateRelationshipExistsError struct { diff --git a/internal/datastore/postgres/common/errors.go b/internal/datastore/postgres/common/errors.go index 550fa685a5..f7ed9f41dc 100644 --- a/internal/datastore/postgres/common/errors.go +++ b/internal/datastore/postgres/common/errors.go @@ -14,6 +14,7 @@ import ( const ( pgUniqueConstraintViolation = "23505" pgSerializationFailure = "40001" + pgTransactionAborted = "25P02" ) var ( diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index 156b8d7af0..319a699f54 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -91,12 +91,16 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) { // do not log serialization failues as errors if errArg, ok := data["err"]; ok { err, ok := errArg.(error) - if ok && errors.Is(err, context.Canceled) { + if ok && (errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, pgx.ErrTxCommitRollback)) || + err.Error() == "conn closed" { // conns are sometimes closed async upon cancellation return } var pgerr *pgconn.PgError - if errors.As(err, &pgerr) && pgerr.SQLState() == pgSerializationFailure { + if errors.As(err, &pgerr) && + (pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgTransactionAborted) { return } } diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 08ae8bdbbe..d5dba64479 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -73,6 +73,7 @@ const ( pgSerializationFailure = "40001" pgUniqueConstraintViolation = "23505" + pgTransactionAborted = "25P02" livingTupleConstraint = "uq_relation_tuple_living_xid" ) @@ -328,7 +329,7 @@ func (pgd *pgDatastore) ReadWriteTx( for i := uint8(0); i <= pgd.maxRetries; i++ { var newXID xid8 var newSnapshot pgSnapshot - err = pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { + err = wrapError(pgx.BeginTxFunc(ctx, pgd.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { var err error newXID, newSnapshot, err = createNewTransaction(ctx, tx) if err != nil { @@ -351,22 +352,48 @@ func (pgd *pgDatastore) ReadWriteTx( } return fn(rwt) - }) + })) if err != nil { if !config.DisableRetries && errorRetryable(err) { + log.Debug().Err(err).Msg("retrying transaction") continue } + return datastore.NoRevision, err } + if i > 0 { + log.Debug().Uint8("retries", i).Msg("transaction succeeded after retry") + } return postgresRevision{newSnapshot.markComplete(newXID.Uint64)}, nil } + if !config.DisableRetries { err = fmt.Errorf("max retries exceeded: %w", err) } + return datastore.NoRevision, err } +func wrapError(err error) error { + if errors.Is(err, pgx.ErrTxCommitRollback) { + return common.NewSerializationError(err) + } + + // workaround: pgx asyncClose usually happens after cancellation, + // but the reason for it being closed is not propagated + // and all we get is attempting to perform an operation + // on cancelled connection. This keeps the same error, + // but wrapped along a cancellation so that: + // - pgx logger does not log it + // - response is sent as canceled back to the client + if err != nil && err.Error() == "conn closed" { + return errors.Join(err, context.Canceled) + } + + return err +} + func (pgd *pgDatastore) Close() error { pgd.cancelGc() @@ -382,6 +409,18 @@ func (pgd *pgDatastore) Close() error { func errorRetryable(err error) bool { var pgerr *pgconn.PgError + if errors.Is(err, pgx.ErrTxCommitRollback) { + return true + } + + if pgconn.SafeToRetry(err) { + return true + } + + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + if !errors.As(err, &pgerr) { log.Debug().Err(err).Msg("couldn't determine a sqlstate error code") return false @@ -390,7 +429,7 @@ func errorRetryable(err error) bool { // We need to check unique constraint here because some versions of postgres have an error where // unique constraint violations are raised instead of serialization errors. // (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com) - return pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation + return pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation || pgerr.SQLState() == pgTransactionAborted } func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {