From 239dc2d45471bf895f714bc1e4a1004905aa6918 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 27 Mar 2024 15:15:43 +0200 Subject: [PATCH] Add additional unit tests for expected behavior and fix errors for BulkLoad --- internal/datastore/crdb/crdb.go | 6 +- internal/datastore/postgres/common/errors.go | 6 +- internal/datastore/postgres/postgres.go | 6 +- .../taskrunner/preloadedtaskrunner_test.go | 12 +++ pkg/datastore/test/bulk.go | 86 +++++++++++++++++++ pkg/datastore/test/datastore.go | 2 + 6 files changed, 110 insertions(+), 8 deletions(-) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 5f0a3b9e37..d6c686177b 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -72,10 +72,10 @@ const ( queryTransactionNowPreV23 = querySelectNow queryTransactionNow = "SHOW COMMIT TIMESTAMP" queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;" - - livingTupleConstraint = "pk_relation_tuple" ) +var livingTupleConstraints = []string{"pk_relation_tuple"} + func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datastore.Datastore, error) { config, err := generateConfig(options) if err != nil { @@ -348,7 +348,7 @@ func (cds *crdbDatastore) ReadWriteTx( func wrapError(err error) error { // If a unique constraint violation is returned, then its likely that the cause // was an existing relationship. - if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraint, err); cerr != nil { + if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraints, err); cerr != nil { return cerr } return err diff --git a/internal/datastore/postgres/common/errors.go b/internal/datastore/postgres/common/errors.go index f7ed9f41dc..61d00c6002 100644 --- a/internal/datastore/postgres/common/errors.go +++ b/internal/datastore/postgres/common/errors.go @@ -3,6 +3,7 @@ package common import ( "errors" "regexp" + "slices" "strings" "github.com/jackc/pgx/v5/pgconn" @@ -24,9 +25,10 @@ var ( // ConvertToWriteConstraintError converts the given Postgres error into a CreateRelationshipExistsError // if applicable. If not applicable, returns nils. -func ConvertToWriteConstraintError(livingTupleConstraint string, err error) error { +func ConvertToWriteConstraintError(livingTupleConstraints []string, err error) error { var pgerr *pgconn.PgError - if errors.As(err, &pgerr) && pgerr.Code == pgUniqueConstraintViolation && pgerr.ConstraintName == livingTupleConstraint { + + if errors.As(err, &pgerr) && pgerr.Code == pgUniqueConstraintViolation && slices.Contains(livingTupleConstraints, pgerr.ConstraintName) { found := createConflictDetailsRegex.FindStringSubmatch(pgerr.Detail) if found != nil { return dscommon.NewCreateRelationshipExistsError(&core.RelationTuple{ diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 8313b90edb..c0144b741c 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -74,10 +74,10 @@ const ( tracingDriverName = "postgres-tracing" gcBatchDeleteSize = 1000 - - livingTupleConstraint = "uq_relation_tuple_living_xid" ) +var livingTupleConstraints = []string{"uq_relation_tuple_living_xid", "pk_relation_tuple"} + func init() { dbsql.Register(tracingDriverName, sqlmw.Driver(stdlib.GetDefaultDriver(), new(traceInterceptor))) } @@ -476,7 +476,7 @@ func (pgd *pgDatastore) RepairOperations() []datastore.RepairOperation { func wrapError(err error) error { // If a unique constraint violation is returned, then its likely that the cause // was an existing relationship given as a CREATE. - if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraint, err); cerr != nil { + if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraints, err); cerr != nil { return cerr } diff --git a/internal/taskrunner/preloadedtaskrunner_test.go b/internal/taskrunner/preloadedtaskrunner_test.go index d04248131e..acee93d83c 100644 --- a/internal/taskrunner/preloadedtaskrunner_test.go +++ b/internal/taskrunner/preloadedtaskrunner_test.go @@ -152,3 +152,15 @@ func TestPreloadedTaskRunnerReturnsError(t *testing.T) { require.GreaterOrEqual(t, count, 1) require.Less(t, count, 9) } + +func TestPreloadedTaskRunnerEmpty(t *testing.T) { + defer goleak.VerifyNone(t) + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + tr := NewPreloadedTaskRunner(ctx, 3, 10) + err := tr.StartAndWait() + require.NoError(t, err) +} diff --git a/pkg/datastore/test/bulk.go b/pkg/datastore/test/bulk.go index 2e8f3b23aa..a5fa31c576 100644 --- a/pkg/datastore/test/bulk.go +++ b/pkg/datastore/test/bulk.go @@ -7,9 +7,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + + "github.com/authzed/grpcutil" "github.com/authzed/spicedb/internal/testfixtures" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -77,6 +81,88 @@ func BulkUploadErrorsTest(t *testing.T, tester DatastoreTester) { require.Error(err) } +func BulkUploadAlreadyExistsSameCallErrorTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + ctx := context.Background() + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) + + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + inserted, err := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator( + testfixtures.DocumentNS.Name, + "viewer", + testfixtures.UserNS.Name, + 1, + t, + )) + require.NoError(err) + require.Equal(uint64(1), inserted) + + _, serr := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator( + testfixtures.DocumentNS.Name, + "viewer", + testfixtures.UserNS.Name, + 1, + t, + )) + return serr + }, options.WithDisableRetries(true)) + + // NOTE: spanner does not return an error for duplicates. + if err == nil { + return + } + + grpcutil.RequireStatus(t, codes.AlreadyExists, err) +} + +func BulkUploadAlreadyExistsErrorTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + ctx := context.Background() + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) + + // Bulk write a single relationship. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + inserted, err := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator( + testfixtures.DocumentNS.Name, + "viewer", + testfixtures.UserNS.Name, + 1, + t, + )) + require.NoError(err) + require.Equal(uint64(1), inserted) + return nil + }, options.WithDisableRetries(true)) + require.NoError(err) + + // Bulk write it again and ensure we get the expected error. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + _, serr := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator( + testfixtures.DocumentNS.Name, + "viewer", + testfixtures.UserNS.Name, + 1, + t, + )) + return serr + }, options.WithDisableRetries(true)) + + // NOTE: spanner does not return an error for duplicates. + if err == nil { + return + } + + grpcutil.RequireStatus(t, codes.AlreadyExists, err) +} + type onlyErrorSource struct{} var errOnlyError = errors.New("source iterator error") diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index b841d879e1..40c007c6f0 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -131,6 +131,8 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestBulkUpload", func(t *testing.T) { BulkUploadTest(t, tester) }) t.Run("TestBulkUploadErrors", func(t *testing.T) { BulkUploadErrorsTest(t, tester) }) + t.Run("TestBulkUploadAlreadyExistsError", func(t *testing.T) { BulkUploadAlreadyExistsErrorTest(t, tester) }) + t.Run("TestBulkUploadAlreadyExistsSameCallError", func(t *testing.T) { BulkUploadAlreadyExistsSameCallErrorTest(t, tester) }) t.Run("TestStats", func(t *testing.T) { StatsTest(t, tester) })