Skip to content

Commit

Permalink
Merge pull request #1552 from authzed/improve-experience-on-pg-serial…
Browse files Browse the repository at this point in the history
…ization-errors

Improve experience on pg serialization errors
  • Loading branch information
vroldanbet authored Oct 4, 2023
2 parents 9035a64 + a4cc965 commit 21f67a5
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 47 deletions.
2 changes: 1 addition & 1 deletion e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
toolchain go1.21.1

require (
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403
github.com/authzed/spicedb v1.23.1
github.com/brianvoe/gofakeit/v6 v6.23.0
Expand Down
4 changes: 2 additions & 2 deletions e2e/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2Aawl
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 h1:goHVqTbFX3AIo0tzGr14pgfAW2ZfPChKO21Z9MGf/gk=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d h1:wSt3hHgOOS2vFvRZC0r5hzr9BijmHn9El7pRs88ublI=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7 h1:2sKwbLnVbfe0LkPuWkpntlXRby2bSjkIb7aotZUQQ7I=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/cel-go v0.17.5 h1:lfpkNrR99B5QRHg5qdG9oLu/kguVlZC68VJuMk8tH9Y=
github.com/authzed/cel-go v0.17.5/go.mod h1:XL/zEq5hKGVF8aOdMbG7w+BQPihLjY2W8N+UIygDA2I=
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403 h1:bQeIwWWRI9bl93poTqpix4sYHi+gnXUPK7N6bMtXzBE=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
cloud.google.com/go/spanner v1.47.0
github.com/IBM/pgxpoolprometheus v1.1.1
github.com/Masterminds/squirrel v1.5.4
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7
github.com/authzed/cel-go v0.17.5
github.com/authzed/consistent v0.1.0
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ github.com/ashanbrown/forbidigo v1.6.0 h1:D3aewfM37Yb3pxHujIPSpTf6oQk9sc9WZi8ger
github.com/ashanbrown/forbidigo v1.6.0/go.mod h1:Y8j9jy9ZYAEHXdu723cUlraTqbzjKF1MUyfOKL+AjcU=
github.com/ashanbrown/makezero v1.1.1 h1:iCQ87C0V0vSyO+M9E/FZYbu65auqH0lnsOkf5FcB28s=
github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d h1:wSt3hHgOOS2vFvRZC0r5hzr9BijmHn9El7pRs88ublI=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7 h1:2sKwbLnVbfe0LkPuWkpntlXRby2bSjkIb7aotZUQQ7I=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/cel-go v0.17.5 h1:lfpkNrR99B5QRHg5qdG9oLu/kguVlZC68VJuMk8tH9Y=
github.com/authzed/cel-go v0.17.5/go.mod h1:XL/zEq5hKGVF8aOdMbG7w+BQPihLjY2W8N+UIygDA2I=
github.com/authzed/consistent v0.1.0 h1:tlh1wvKoRbjRhMm2P+X5WQQyR54SRoS4MyjLOg17Mp8=
Expand Down
26 changes: 26 additions & 0 deletions internal/datastore/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

// SerializationError is returned when there's been a serialization
// error while performing a datastore operation
type SerializationError struct {
error
}

func (err SerializationError) GRPCStatus() *status.Status {
return spiceerrors.WithCodeAndDetails(
err,
codes.Aborted,
spiceerrors.ForReason(
v1.ErrorReason_ERROR_REASON_SERIALIZATION_FAILURE,
map[string]string{},
),
)
}

func (err SerializationError) Unwrap() error {
return err.error
}

// NewSerializationError creates a new SerializationError
func NewSerializationError(err error) error {
return SerializationError{err}
}

// CreateRelationshipExistsError is an error returned when attempting to CREATE an already-existing
// relationship.
type CreateRelationshipExistsError struct {
Expand Down
5 changes: 4 additions & 1 deletion internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageColl

case <-time.After(nextInterval):
log.Ctx(ctx).Info().
Dur("interval", nextInterval).
Dur("window", window).
Dur("timeout", timeout).
Msg("running garbage collection worker")

err := RunGarbageCollection(gc, window, timeout)
Expand Down Expand Up @@ -178,7 +181,7 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
}

collectionDuration := time.Since(startTime)
log.Ctx(ctx).Debug().
log.Ctx(ctx).Info().
Stringer("highestTxID", watermark).
Dur("duration", collectionDuration).
Time("nowTime", now).
Expand Down
13 changes: 3 additions & 10 deletions internal/datastore/crdb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"sync"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

Expand Down Expand Up @@ -291,7 +291,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
p.healthTracker.SetNodeHealth(nodeID, false)
}

SleepOnErr(ctx, err, retries)
common.SleepOnErr(ctx, err, retries)

conn, err = p.acquireFromDifferentNode(ctx, nodeID)
if err != nil {
Expand All @@ -301,7 +301,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
}
if errors.As(err, &retryable) {
log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("retryable error")
SleepOnErr(ctx, err, retries)
common.SleepOnErr(ctx, err, retries)
continue
}
conn.Release()
Expand All @@ -323,13 +323,6 @@ func (p *RetryPool) GC(conn *pgx.Conn) {
delete(p.nodeForConn, conn)
}

// SleepOnErr sleeps for a short period of time after an error has occurred.
func SleepOnErr(ctx context.Context, err error, retries uint8) {
after := retry.BackoffExponentialWithJitter(100*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit
log.Ctx(ctx).Warn().Err(err).Dur("after", after).Msg("retrying on database error")
time.Sleep(after)
}

func (p *RetryPool) acquireFromDifferentNode(ctx context.Context, nodeID uint32) (*pgxpool.Conn, error) {
log.Ctx(ctx).Info().Uint32("node_id", nodeID).Msg("acquiring a connection from a different node")
for {
Expand Down
3 changes: 2 additions & 1 deletion internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/crdb/pool"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (cds *crdbDatastore) WatchSchema(ctx context.Context, afterRevision datasto
running = true

// Sleep a bit for retrying.
pool.SleepOnErr(ctx, err, uint8(retryCount))
pgxcommon.SleepOnErr(ctx, err, uint8(retryCount))
return
}

Expand Down
1 change: 1 addition & 0 deletions internal/datastore/postgres/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
pgUniqueConstraintViolation = "23505"
pgSerializationFailure = "40001"
pgTransactionAborted = "25P02"
)

var (
Expand Down
76 changes: 68 additions & 8 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/exaring/otelpgx"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
zerologadapter "github.com/jackc/pgx-zerolog"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -87,16 +88,13 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
level = tracelog.LogLevelDebug
}

// do not log cancelled queries as errors
// do not log serialization failues as errors
truncateLargeSQL(data)

// log cancellation and serialization errors at debug level
if errArg, ok := data["err"]; ok {
err, ok := errArg.(error)
if ok && errors.Is(err, context.Canceled) {
return
}

var pgerr *pgconn.PgError
if errors.As(err, &pgerr) && pgerr.SQLState() == pgSerializationFailure {
if ok && (IsCancellationError(err) || IsSerializationError(err)) {
logger.Log(ctx, tracelog.LogLevelDebug, msg, data)
return
}
}
Expand All @@ -109,6 +107,57 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
addTracer(connConfig, &tracelog.TraceLog{Logger: levelMappingFn(l), LogLevel: tracelog.LogLevelInfo})
}

// truncateLargeSQL takes arguments of a SQL statement provided via pgx's tracelog.LoggerFunc and
// replaces SQL statements and SQL arguments with placeholders when the statements and/or arguments
// exceed a certain length. This helps de-clutter logs when statements have hundreds to thousands of placeholders.
// The change is done in place.
func truncateLargeSQL(data map[string]any) {
const (
maxSQLLen = 350
maxSQLArgsLen = 50
)

if sqlData, ok := data["sql"]; ok {
sqlString, ok := sqlData.(string)
if ok && len(sqlString) > maxSQLLen {
data["sql"] = sqlString[:maxSQLLen] + "..."
}
}
if argsData, ok := data["args"]; ok {
argsSlice, ok := argsData.([]any)
if ok && len(argsSlice) > maxSQLArgsLen {
data["args"] = argsSlice[:maxSQLArgsLen]
}
}
}

// IsCancellationError determines if an error returned by pgx has been caused by context cancellation.
func IsCancellationError(err error) bool {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
err.Error() == "conn closed" { // conns are sometimes closed async upon cancellation
return true
}
return false
}

func IsSerializationError(err error) bool {
var pgerr *pgconn.PgError
if errors.As(err, &pgerr) &&
// We need to check unique constraint here because some versions of postgres have an error where
// unique constraint violations are raised instead of serialization errors.
// (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com)
(pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation || pgerr.SQLState() == pgTransactionAborted) {
return true
}

if errors.Is(err, pgx.ErrTxCommitRollback) {
return true
}

return false
}

// ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig
func ConfigureOTELTracer(connConfig *pgx.ConnConfig) {
addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName()))
Expand Down Expand Up @@ -233,3 +282,14 @@ func (t *QuerierFuncs) QueryRowFunc(ctx context.Context, rowFunc func(ctx contex
func QuerierFuncsFor(d Querier) DBFuncQuerier {
return &QuerierFuncs{d: d}
}

// SleepOnErr sleeps for a short period of time after an error has occurred.
func SleepOnErr(ctx context.Context, err error, retries uint8) {
after := retry.BackoffExponentialWithJitter(25*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit
log.Ctx(ctx).Debug().Err(err).Dur("after", after).Uint8("retry", retries+1).Msg("retrying on database error")

select {
case <-time.After(after):
case <-ctx.Done():
}
}
17 changes: 9 additions & 8 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ func (pgd *pgDatastore) TxIDBefore(ctx context.Context, before time.Time) (datas
return postgresRevision{snapshot}, nil
}

func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (removed common.DeletionCounts, err error) {
func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (common.DeletionCounts, error) {
revision := txID.(postgresRevision)

minTxAlive := newXid8(revision.snapshot.xmin)

removed := common.DeletionCounts{}
var err error
// Delete any relationship rows that were already dead when this transaction started
removed.Relationships, err = pgd.batchDelete(
ctx,
Expand All @@ -79,7 +80,7 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis
sq.Lt{colDeletedXid: minTxAlive},
)
if err != nil {
return
return removed, fmt.Errorf("failed to GC relationships table: %w", err)
}

// Delete all transaction rows with ID < the transaction ID.
Expand All @@ -93,7 +94,7 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis
sq.Lt{colXID: minTxAlive},
)
if err != nil {
return
return removed, fmt.Errorf("failed to GC transactions table: %w", err)
}

// Delete any namespace rows with deleted_transaction <= the transaction ID.
Expand All @@ -104,10 +105,10 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis
sq.Lt{colDeletedXid: minTxAlive},
)
if err != nil {
return
return removed, fmt.Errorf("failed to GC namespaces table: %w", err)
}

return
return removed, err
}

func (pgd *pgDatastore) batchDelete(
Expand All @@ -116,7 +117,7 @@ func (pgd *pgDatastore) batchDelete(
pkCols []string,
filter sqlFilter,
) (int64, error) {
sql, args, err := psql.Select(pkCols...).From(tableName).Where(filter).Limit(batchDeleteSize).ToSql()
sql, args, err := psql.Select(pkCols...).From(tableName).Where(filter).Limit(gcBatchDeleteSize).ToSql()
if err != nil {
return -1, err
}
Expand All @@ -137,7 +138,7 @@ func (pgd *pgDatastore) batchDelete(

rowsDeleted := cr.RowsAffected()
deletedCount += rowsDeleted
if rowsDeleted < batchDeleteSize {
if rowsDeleted < gcBatchDeleteSize {
break
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

const createAliveRelByResourceRelationSubjectIndex = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_alive_by_resource_rel_subject_covering
ON relation_tuple (namespace, relation, userset_namespace)
INCLUDE (userset_object_id, userset_relation, caveat_name, caveat_context)
WHERE deleted_xid = '9223372036854775807'::xid8;`

func init() {
if err := DatabaseMigrations.Register("add-rel-by-alive-resource-relation-subject", "add-tuned-gc-index",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, createAliveRelByResourceRelationSubjectIndex); err != nil {
return fmt.Errorf("failed to create index for alive relationships by resource/relation/subject: %w", err)
}
return nil
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Loading

0 comments on commit 21f67a5

Please sign in to comment.