Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve experience on pg serialization errors #1552

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
toolchain go1.21.1

require (
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403
github.com/authzed/spicedb v1.23.1
github.com/brianvoe/gofakeit/v6 v6.23.0
Expand Down
4 changes: 2 additions & 2 deletions e2e/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2Aawl
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 h1:goHVqTbFX3AIo0tzGr14pgfAW2ZfPChKO21Z9MGf/gk=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d h1:wSt3hHgOOS2vFvRZC0r5hzr9BijmHn9El7pRs88ublI=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7 h1:2sKwbLnVbfe0LkPuWkpntlXRby2bSjkIb7aotZUQQ7I=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/cel-go v0.17.5 h1:lfpkNrR99B5QRHg5qdG9oLu/kguVlZC68VJuMk8tH9Y=
github.com/authzed/cel-go v0.17.5/go.mod h1:XL/zEq5hKGVF8aOdMbG7w+BQPihLjY2W8N+UIygDA2I=
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403 h1:bQeIwWWRI9bl93poTqpix4sYHi+gnXUPK7N6bMtXzBE=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
cloud.google.com/go/spanner v1.47.0
github.com/IBM/pgxpoolprometheus v1.1.1
github.com/Masterminds/squirrel v1.5.4
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7
github.com/authzed/cel-go v0.17.5
github.com/authzed/consistent v0.1.0
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ github.com/ashanbrown/forbidigo v1.6.0 h1:D3aewfM37Yb3pxHujIPSpTf6oQk9sc9WZi8ger
github.com/ashanbrown/forbidigo v1.6.0/go.mod h1:Y8j9jy9ZYAEHXdu723cUlraTqbzjKF1MUyfOKL+AjcU=
github.com/ashanbrown/makezero v1.1.1 h1:iCQ87C0V0vSyO+M9E/FZYbu65auqH0lnsOkf5FcB28s=
github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d h1:wSt3hHgOOS2vFvRZC0r5hzr9BijmHn9El7pRs88ublI=
github.com/authzed/authzed-go v0.9.1-0.20230830212047-e1e7da6e877d/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7 h1:2sKwbLnVbfe0LkPuWkpntlXRby2bSjkIb7aotZUQQ7I=
github.com/authzed/authzed-go v0.10.1-0.20231003161414-9c9116f212b7/go.mod h1:9Pl5jDQJHrjbMDuCrsa+Q6Tqmi1f2pDdIn/qNGI++vA=
github.com/authzed/cel-go v0.17.5 h1:lfpkNrR99B5QRHg5qdG9oLu/kguVlZC68VJuMk8tH9Y=
github.com/authzed/cel-go v0.17.5/go.mod h1:XL/zEq5hKGVF8aOdMbG7w+BQPihLjY2W8N+UIygDA2I=
github.com/authzed/consistent v0.1.0 h1:tlh1wvKoRbjRhMm2P+X5WQQyR54SRoS4MyjLOg17Mp8=
Expand Down
26 changes: 26 additions & 0 deletions internal/datastore/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

// SerializationError is returned when there's been a serialization
// error while performing a datastore operation
type SerializationError struct {
error
}

func (err SerializationError) GRPCStatus() *status.Status {
return spiceerrors.WithCodeAndDetails(
err,
codes.Aborted,
spiceerrors.ForReason(
v1.ErrorReason_ERROR_REASON_SERIALIZATION_FAILURE,
map[string]string{},
),
)
}

func (err SerializationError) Unwrap() error {
return err.error
}

// NewSerializationError creates a new SerializationError
func NewSerializationError(err error) error {
return SerializationError{err}
}

// CreateRelationshipExistsError is an error returned when attempting to CREATE an already-existing
// relationship.
type CreateRelationshipExistsError struct {
Expand Down
5 changes: 4 additions & 1 deletion internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageColl

case <-time.After(nextInterval):
log.Ctx(ctx).Info().
Dur("interval", nextInterval).
Dur("window", window).
Dur("timeout", timeout).
Msg("running garbage collection worker")

err := RunGarbageCollection(gc, window, timeout)
Expand Down Expand Up @@ -178,7 +181,7 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
}

collectionDuration := time.Since(startTime)
log.Ctx(ctx).Debug().
log.Ctx(ctx).Info().
Stringer("highestTxID", watermark).
Dur("duration", collectionDuration).
Time("nowTime", now).
Expand Down
13 changes: 3 additions & 10 deletions internal/datastore/crdb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"sync"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

Expand Down Expand Up @@ -291,7 +291,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
p.healthTracker.SetNodeHealth(nodeID, false)
}

SleepOnErr(ctx, err, retries)
common.SleepOnErr(ctx, err, retries)

conn, err = p.acquireFromDifferentNode(ctx, nodeID)
if err != nil {
Expand All @@ -301,7 +301,7 @@ func (p *RetryPool) withRetries(ctx context.Context, fn func(conn *pgxpool.Conn)
}
if errors.As(err, &retryable) {
log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("retryable error")
SleepOnErr(ctx, err, retries)
common.SleepOnErr(ctx, err, retries)
continue
}
conn.Release()
Expand All @@ -323,13 +323,6 @@ func (p *RetryPool) GC(conn *pgx.Conn) {
delete(p.nodeForConn, conn)
}

// SleepOnErr sleeps for a short period of time after an error has occurred.
func SleepOnErr(ctx context.Context, err error, retries uint8) {
after := retry.BackoffExponentialWithJitter(100*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit
log.Ctx(ctx).Warn().Err(err).Dur("after", after).Msg("retrying on database error")
time.Sleep(after)
}

func (p *RetryPool) acquireFromDifferentNode(ctx context.Context, nodeID uint32) (*pgxpool.Conn, error) {
log.Ctx(ctx).Info().Uint32("node_id", nodeID).Msg("acquiring a connection from a different node")
for {
Expand Down
3 changes: 2 additions & 1 deletion internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/crdb/pool"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (cds *crdbDatastore) WatchSchema(ctx context.Context, afterRevision datasto
running = true

// Sleep a bit for retrying.
pool.SleepOnErr(ctx, err, uint8(retryCount))
pgxcommon.SleepOnErr(ctx, err, uint8(retryCount))
return
}

Expand Down
1 change: 1 addition & 0 deletions internal/datastore/postgres/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
pgUniqueConstraintViolation = "23505"
pgSerializationFailure = "40001"
pgTransactionAborted = "25P02"
)

var (
Expand Down
76 changes: 68 additions & 8 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/exaring/otelpgx"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
zerologadapter "github.com/jackc/pgx-zerolog"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -87,16 +88,13 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
level = tracelog.LogLevelDebug
}

// do not log cancelled queries as errors
// do not log serialization failues as errors
truncateLargeSQL(data)

// log cancellation and serialization errors at debug level
if errArg, ok := data["err"]; ok {
err, ok := errArg.(error)
if ok && errors.Is(err, context.Canceled) {
return
}

var pgerr *pgconn.PgError
if errors.As(err, &pgerr) && pgerr.SQLState() == pgSerializationFailure {
if ok && (IsCancellationError(err) || IsSerializationError(err)) {
logger.Log(ctx, tracelog.LogLevelDebug, msg, data)
return
}
}
Expand All @@ -109,6 +107,57 @@ func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
addTracer(connConfig, &tracelog.TraceLog{Logger: levelMappingFn(l), LogLevel: tracelog.LogLevelInfo})
}

// truncateLargeSQL takes arguments of a SQL statement provided via pgx's tracelog.LoggerFunc and
// replaces SQL statements and SQL arguments with placeholders when the statements and/or arguments
// exceed a certain length. This helps de-clutter logs when statements have hundreds to thousands of placeholders.
// The change is done in place.
func truncateLargeSQL(data map[string]any) {
const (
maxSQLLen = 350
maxSQLArgsLen = 50
)

if sqlData, ok := data["sql"]; ok {
sqlString, ok := sqlData.(string)
if ok && len(sqlString) > maxSQLLen {
data["sql"] = sqlString[:maxSQLLen] + "..."
}
}
if argsData, ok := data["args"]; ok {
argsSlice, ok := argsData.([]any)
if ok && len(argsSlice) > maxSQLArgsLen {
data["args"] = argsSlice[:maxSQLArgsLen]
}
}
}

// IsCancellationError determines if an error returned by pgx has been caused by context cancellation.
func IsCancellationError(err error) bool {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
err.Error() == "conn closed" { // conns are sometimes closed async upon cancellation
return true
}
return false
}

func IsSerializationError(err error) bool {
var pgerr *pgconn.PgError
if errors.As(err, &pgerr) &&
// We need to check unique constraint here because some versions of postgres have an error where
// unique constraint violations are raised instead of serialization errors.
// (e.g. https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com)
(pgerr.SQLState() == pgSerializationFailure || pgerr.SQLState() == pgUniqueConstraintViolation || pgerr.SQLState() == pgTransactionAborted) {
return true
}

if errors.Is(err, pgx.ErrTxCommitRollback) {
return true
}

return false
}

// ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig
func ConfigureOTELTracer(connConfig *pgx.ConnConfig) {
addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName()))
Expand Down Expand Up @@ -233,3 +282,14 @@ func (t *QuerierFuncs) QueryRowFunc(ctx context.Context, rowFunc func(ctx contex
func QuerierFuncsFor(d Querier) DBFuncQuerier {
return &QuerierFuncs{d: d}
}

// SleepOnErr sleeps for a short period of time after an error has occurred.
func SleepOnErr(ctx context.Context, err error, retries uint8) {
after := retry.BackoffExponentialWithJitter(25*time.Millisecond, 0.5)(ctx, uint(retries+1)) // add one so we always wait at least a little bit
log.Ctx(ctx).Debug().Err(err).Dur("after", after).Uint8("retry", retries+1).Msg("retrying on database error")

select {
case <-time.After(after):
case <-ctx.Done():
}
}
17 changes: 9 additions & 8 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ func (pgd *pgDatastore) TxIDBefore(ctx context.Context, before time.Time) (datas
return postgresRevision{snapshot}, nil
}

func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (removed common.DeletionCounts, err error) {
func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (common.DeletionCounts, error) {
revision := txID.(postgresRevision)

minTxAlive := newXid8(revision.snapshot.xmin)

removed := common.DeletionCounts{}
var err error
// Delete any relationship rows that were already dead when this transaction started
removed.Relationships, err = pgd.batchDelete(
ctx,
Expand All @@ -79,7 +80,7 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis
sq.Lt{colDeletedXid: minTxAlive},
)
if err != nil {
return
return removed, fmt.Errorf("failed to GC relationships table: %w", err)
}

// Delete all transaction rows with ID < the transaction ID.
Expand All @@ -93,7 +94,7 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis
sq.Lt{colXID: minTxAlive},
)
if err != nil {
return
return removed, fmt.Errorf("failed to GC transactions table: %w", err)
}

// Delete any namespace rows with deleted_transaction <= the transaction ID.
Expand All @@ -104,10 +105,10 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis
sq.Lt{colDeletedXid: minTxAlive},
)
if err != nil {
return
return removed, fmt.Errorf("failed to GC namespaces table: %w", err)
}

return
return removed, err
}

func (pgd *pgDatastore) batchDelete(
Expand All @@ -116,7 +117,7 @@ func (pgd *pgDatastore) batchDelete(
pkCols []string,
filter sqlFilter,
) (int64, error) {
sql, args, err := psql.Select(pkCols...).From(tableName).Where(filter).Limit(batchDeleteSize).ToSql()
sql, args, err := psql.Select(pkCols...).From(tableName).Where(filter).Limit(gcBatchDeleteSize).ToSql()
if err != nil {
return -1, err
}
Expand All @@ -137,7 +138,7 @@ func (pgd *pgDatastore) batchDelete(

rowsDeleted := cr.RowsAffected()
deletedCount += rowsDeleted
if rowsDeleted < batchDeleteSize {
if rowsDeleted < gcBatchDeleteSize {
break
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

const createAliveRelByResourceRelationSubjectIndex = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_alive_by_resource_rel_subject_covering
ON relation_tuple (namespace, relation, userset_namespace)
INCLUDE (userset_object_id, userset_relation, caveat_name, caveat_context)
WHERE deleted_xid = '9223372036854775807'::xid8;`

func init() {
if err := DatabaseMigrations.Register("add-rel-by-alive-resource-relation-subject", "add-tuned-gc-index",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, createAliveRelByResourceRelationSubjectIndex); err != nil {
return fmt.Errorf("failed to create index for alive relationships by resource/relation/subject: %w", err)
}
return nil
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Loading