diff --git a/internal/datastore/memdb/errors.go b/internal/datastore/memdb/errors.go new file mode 100644 index 0000000000..92343840a0 --- /dev/null +++ b/internal/datastore/memdb/errors.go @@ -0,0 +1,37 @@ +package memdb + +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + + "github.com/authzed/spicedb/pkg/spiceerrors" +) + +// ErrSerializationMaxRetriesReached occurs when a write request has reached its maximum number +// of retries due to serialization errors. +type ErrSerializationMaxRetriesReached struct { + error +} + +// NewSerializationMaxRetriesReachedErr constructs a new max retries reached error. +func NewSerializationMaxRetriesReachedErr(baseErr error) error { + return ErrSerializationMaxRetriesReached{ + error: baseErr, + } +} + +// GRPCStatus implements retrieving the gRPC status for the error. +func (err ErrSerializationMaxRetriesReached) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + err, + codes.DeadlineExceeded, + spiceerrors.ForReason( + v1.ErrorReason_ERROR_REASON_UNSPECIFIED, + map[string]string{ + "details": "too many updates were made to the in-memory datastore at once; this datastore has limited write throughput capability", + }, + ), + ) +} diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 6cd7fd7633..162c827c54 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -248,7 +248,7 @@ func (mdb *memdbDatastore) ReadWriteTx( return newRevision, nil } - return datastore.NoRevision, errors.New("serialization max retries exceeded") + return datastore.NoRevision, NewSerializationMaxRetriesReachedErr(errors.New("serialization max retries exceeded; please reduce your parallel writes")) } func (mdb *memdbDatastore) ReadyState(_ context.Context) (datastore.ReadyState, error) { diff --git a/internal/datastore/memdb/memdb_test.go b/internal/datastore/memdb/memdb_test.go index a9a5b7a432..a484dece16 100644 --- a/internal/datastore/memdb/memdb_test.go +++ b/internal/datastore/memdb/memdb_test.go @@ -12,9 +12,11 @@ import ( "golang.org/x/sync/errgroup" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" test "github.com/authzed/spicedb/pkg/datastore/test" ns "github.com/authzed/spicedb/pkg/namespace" corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/tuple" ) type memDBTest struct{} @@ -77,3 +79,37 @@ func TestConcurrentWritePanic(t *testing.T) { }, 1*time.Second, 10*time.Millisecond) require.ErrorIs(err, recoverErr) } + +func TestConcurrentWriteRelsError(t *testing.T) { + require := require.New(t) + + ds, err := NewMemdbDatastore(0, 1*time.Hour, 1*time.Hour) + require.NoError(err) + + ctx := context.Background() + + // Kick off a number of writes to ensure at least one hits an error. + g := errgroup.Group{} + + for i := 0; i < 50; i++ { + i := i + g.Go(func() error { + _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + updates := []*corev1.RelationTupleUpdate{} + for j := 0; j < 500; j++ { + updates = append(updates, &corev1.RelationTupleUpdate{ + Operation: corev1.RelationTupleUpdate_TOUCH, + Tuple: tuple.MustParse(fmt.Sprintf("document:doc-%d-%d#viewer@user:tom", i, j)), + }) + } + + return rwt.WriteRelationships(ctx, updates) + }, options.WithDisableRetries(true)) + return err + }) + } + + werr := g.Wait() + require.Error(werr) + require.ErrorContains(werr, "serialization max retries exceeded") +} diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 2e49c18b2a..2683fdfbf4 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -12,6 +12,7 @@ import ( v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/grpcutil" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -1462,3 +1463,38 @@ func standardTuplesWithout(without map[string]struct{}) map[string]struct{} { } return out } + +func TestManyConcurrentWriteRelationshipsReturnsSerializationErrorOnMemdb(t *testing.T) { + require := require.New(t) + + conn, cleanup, _, _ := testserver.NewTestServer(require, 0, memdb.DisableGC, true, tf.StandardDatastoreWithData) + client := v1.NewPermissionsServiceClient(conn) + t.Cleanup(cleanup) + + // Kick off a number of writes to ensure at least one hits an error, as memdb's write throughput + // is limited. + g := errgroup.Group{} + + for i := 0; i < 50; i++ { + i := i + g.Go(func() error { + updates := []*v1.RelationshipUpdate{} + for j := 0; j < 500; j++ { + updates = append(updates, &v1.RelationshipUpdate{ + Operation: v1.RelationshipUpdate_OPERATION_CREATE, + Relationship: tuple.MustToRelationship(tuple.MustParse(fmt.Sprintf("document:doc-%d-%d#viewer@user:tom", i, j))), + }) + } + + _, err := client.WriteRelationships(context.Background(), &v1.WriteRelationshipsRequest{ + Updates: updates, + }) + return err + }) + } + + werr := g.Wait() + require.Error(werr) + require.ErrorContains(werr, "serialization max retries exceeded") + grpcutil.RequireStatus(t, codes.DeadlineExceeded, werr) +}