From 53ca68ef5339bd17e46618c0e9fed0efee02ba0e Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 21 Dec 2023 19:52:20 -0500 Subject: [PATCH 1/4] Cleaner revision handling This is in prep for optimization of revision types --- internal/datastore/common/changes_test.go | 43 ++- internal/datastore/common/gc_test.go | 11 +- internal/datastore/crdb/caveat.go | 5 +- internal/datastore/crdb/crdb.go | 27 +- internal/datastore/crdb/crdb_test.go | 4 +- internal/datastore/crdb/pool_test.go | 3 +- internal/datastore/crdb/reader.go | 7 +- internal/datastore/crdb/stats.go | 12 +- internal/datastore/crdb/watch.go | 26 +- internal/datastore/memdb/memdb.go | 35 ++- internal/datastore/memdb/revisions.go | 58 ++-- internal/datastore/memdb/watch.go | 8 +- internal/datastore/mysql/caveat.go | 15 +- internal/datastore/mysql/datastore.go | 18 +- internal/datastore/mysql/datastore_test.go | 3 +- internal/datastore/mysql/gc.go | 6 +- internal/datastore/mysql/reader.go | 15 +- internal/datastore/mysql/revisions.go | 37 +-- internal/datastore/mysql/revisions_test.go | 32 +-- internal/datastore/mysql/watch.go | 24 +- internal/datastore/postgres/postgres.go | 2 +- internal/datastore/proxy/hedging_test.go | 7 +- internal/datastore/proxy/readonly_test.go | 5 +- .../schemacaching/intervaltracker_test.go | 6 +- .../schemacaching/standardcaching_test.go | 11 +- .../proxy/schemacaching/watchingcache.go | 4 +- .../datastore/revisions/commonrevision.go | 79 ++++++ .../revisions/commonrevision_test.go | 248 ++++++++++++++++++ internal/datastore/revisions/hlcrevision.go | 102 +++++++ .../{common => }/revisions/optimized.go | 0 .../{common => }/revisions/optimized_test.go | 12 +- .../{common => }/revisions/remoteclock.go | 36 ++- .../revisions/remoteclock_test.go | 25 +- .../datastore/revisions/timestamprevision.go | 81 ++++++ internal/datastore/revisions/txidrevision.go | 66 +++++ internal/datastore/spanner/caveat.go | 5 +- internal/datastore/spanner/reader.go | 5 +- internal/datastore/spanner/revisions.go | 19 +- internal/datastore/spanner/spanner.go | 14 +- internal/datastore/spanner/watch.go | 19 +- internal/dispatch/graph/check_test.go | 1 - internal/dispatch/graph/expand_test.go | 2 - .../dispatch/graph/lookupsubjects_test.go | 2 - internal/graph/computed/computecheck_test.go | 3 +- .../consistency/consistency_test.go | 11 +- internal/services/v1/preconditions_test.go | 3 +- pkg/cursor/cursor_test.go | 28 +- pkg/datastore/revision/decimal.go | 75 ------ .../revisionparsing/revisionparsing.go | 16 +- pkg/datastore/test/caveat.go | 10 +- pkg/datastore/test/namespace.go | 2 - pkg/datastore/test/revisions.go | 2 - pkg/datastore/test/watch.go | 1 - pkg/zedtoken/zedtoken.go | 10 +- pkg/zedtoken/zedtoken_test.go | 120 ++++++--- 55 files changed, 960 insertions(+), 461 deletions(-) create mode 100644 internal/datastore/revisions/commonrevision.go create mode 100644 internal/datastore/revisions/commonrevision_test.go create mode 100644 internal/datastore/revisions/hlcrevision.go rename internal/datastore/{common => }/revisions/optimized.go (100%) rename internal/datastore/{common => }/revisions/optimized_test.go (93%) rename internal/datastore/{common => }/revisions/remoteclock.go (73%) rename internal/datastore/{common => }/revisions/remoteclock_test.go (82%) create mode 100644 internal/datastore/revisions/timestamprevision.go create mode 100644 internal/datastore/revisions/txidrevision.go delete mode 100644 pkg/datastore/revision/decimal.go diff --git a/internal/datastore/common/changes_test.go b/internal/datastore/common/changes_test.go index b364376d26..5ba2da2f5a 100644 --- a/internal/datastore/common/changes_test.go +++ b/internal/datastore/common/changes_test.go @@ -6,11 +6,10 @@ import ( "strings" "testing" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" ) @@ -21,17 +20,13 @@ const ( ) var ( - rev1 = revision.NewFromDecimal(decimal.NewFromInt(1)) - rev2 = revision.NewFromDecimal(decimal.NewFromInt(2)) - rev3 = revision.NewFromDecimal(decimal.NewFromInt(3)) - revOneMillion = revision.NewFromDecimal(decimal.NewFromInt(1_000_000)) - revOneMillionOne = revision.NewFromDecimal(decimal.NewFromInt(1_000_001)) + rev1 = revisions.NewForTransactionID(1) + rev2 = revisions.NewForTransactionID(2) + rev3 = revisions.NewForTransactionID(3) + revOneMillion = revisions.NewForTransactionID(1_000_000) + revOneMillionOne = revisions.NewForTransactionID(1_000_001) ) -func revisionFromTransactionID(txID uint64) revision.Decimal { - return revision.NewFromDecimal(decimal.NewFromInt(int64(txID))) -} - func TestChanges(t *testing.T) { type changeEntry struct { revision uint64 @@ -310,30 +305,30 @@ func TestChanges(t *testing.T) { require := require.New(t) ctx := context.Background() - ch := NewChanges(revision.DecimalKeyFunc, datastore.WatchRelationships|datastore.WatchSchema) + ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema) for _, step := range tc.script { if step.relationship != "" { rel := tuple.MustParse(step.relationship) - err := ch.AddRelationshipChange(ctx, revisionFromTransactionID(step.revision), rel, step.op) + err := ch.AddRelationshipChange(ctx, revisions.NewForTransactionID(step.revision), rel, step.op) require.NoError(err) } for _, changed := range step.changedDefinitions { - ch.AddChangedDefinition(ctx, revisionFromTransactionID(step.revision), changed) + ch.AddChangedDefinition(ctx, revisions.NewForTransactionID(step.revision), changed) } for _, ns := range step.deletedNamespaces { - ch.AddDeletedNamespace(ctx, revisionFromTransactionID(step.revision), ns) + ch.AddDeletedNamespace(ctx, revisions.NewForTransactionID(step.revision), ns) } for _, c := range step.deletedCaveats { - ch.AddDeletedCaveat(ctx, revisionFromTransactionID(step.revision), c) + ch.AddDeletedCaveat(ctx, revisions.NewForTransactionID(step.revision), c) } } require.Equal( canonicalize(tc.expected), - canonicalize(ch.AsRevisionChanges(revision.DecimalKeyLessThanFunc)), + canonicalize(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)), ) }) } @@ -341,7 +336,7 @@ func TestChanges(t *testing.T) { func TestFilteredSchemaChanges(t *testing.T) { ctx := context.Background() - ch := NewChanges(revision.DecimalKeyFunc, datastore.WatchSchema) + ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchSchema) require.True(t, ch.IsEmpty()) require.NoError(t, ch.AddRelationshipChange(ctx, rev1, tuple.MustParse("document:firstdoc#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)) @@ -350,7 +345,7 @@ func TestFilteredSchemaChanges(t *testing.T) { func TestFilteredRelationshipChanges(t *testing.T) { ctx := context.Background() - ch := NewChanges(revision.DecimalKeyFunc, datastore.WatchRelationships) + ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships) require.True(t, ch.IsEmpty()) ch.AddDeletedNamespace(ctx, rev3, "deletedns3") @@ -359,7 +354,7 @@ func TestFilteredRelationshipChanges(t *testing.T) { func TestFilterAndRemoveRevisionChanges(t *testing.T) { ctx := context.Background() - ch := NewChanges(revision.DecimalKeyFunc, datastore.WatchRelationships|datastore.WatchSchema) + ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema) require.True(t, ch.IsEmpty()) @@ -369,7 +364,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { require.False(t, ch.IsEmpty()) - results := ch.FilterAndRemoveRevisionChanges(revision.DecimalKeyLessThanFunc, rev3) + results := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3) require.Equal(t, 2, len(results)) require.False(t, ch.IsEmpty()) @@ -388,7 +383,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { }, }, results) - remaining := ch.AsRevisionChanges(revision.DecimalKeyLessThanFunc) + remaining := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) require.Equal(t, 1, len(remaining)) require.Equal(t, []datastore.RevisionChanges{ @@ -400,11 +395,11 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { }, }, remaining) - results = ch.FilterAndRemoveRevisionChanges(revision.DecimalKeyLessThanFunc, revOneMillion) + results = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion) require.Equal(t, 1, len(results)) require.True(t, ch.IsEmpty()) - results = ch.FilterAndRemoveRevisionChanges(revision.DecimalKeyLessThanFunc, revOneMillionOne) + results = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne) require.Equal(t, 0, len(results)) require.True(t, ch.IsEmpty()) } diff --git a/internal/datastore/common/gc_test.go b/internal/datastore/common/gc_test.go index 44b645456a..e7e794c69e 100644 --- a/internal/datastore/common/gc_test.go +++ b/internal/datastore/common/gc_test.go @@ -8,19 +8,18 @@ import ( "testing" "time" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" "github.com/prometheus/client_golang/prometheus" promclient "github.com/prometheus/client_model/go" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" ) // Fake garbage collector that returns a new incremented revision each time // TxIDBefore is called. type fakeGC struct { - lastRevision int64 + lastRevision uint64 deleter gcDeleter metrics gcMetrics lock sync.RWMutex @@ -56,7 +55,7 @@ func (gc *fakeGC) TxIDBefore(_ context.Context, _ time.Time) (datastore.Revision gc.lastRevision++ - rev := revision.NewFromDecimal(decimal.NewFromInt(gc.lastRevision)) + rev := revisions.NewForTransactionID(gc.lastRevision) return rev, nil } @@ -67,9 +66,9 @@ func (gc *fakeGC) DeleteBeforeTx(_ context.Context, rev datastore.Revision) (Del gc.metrics.deleteBeforeTxCount++ - revInt := rev.(revision.Decimal).Decimal.IntPart() + revInt := rev.(revisions.TransactionIDRevision).TransactionID() - return gc.deleter.DeleteBeforeTx(revInt) + return gc.deleter.DeleteBeforeTx(int64(revInt)) } func (gc *fakeGC) HasGCRun() bool { diff --git a/internal/datastore/crdb/caveat.go b/internal/datastore/crdb/caveat.go index 33125138cd..3f66b95810 100644 --- a/internal/datastore/crdb/caveat.go +++ b/internal/datastore/crdb/caveat.go @@ -9,6 +9,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v5" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -58,7 +59,7 @@ func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core. return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err) } cr.addOverlapKey(name) - return loaded, revisionFromTimestamp(timestamp), nil + return loaded, revisions.NewHLCForTime(timestamp), nil } func (cr *crdbReader) LookupCaveatsWithNames(ctx context.Context, caveatNames []string) ([]datastore.RevisionedCaveat, error) { @@ -116,7 +117,7 @@ func (cr *crdbReader) lookupCaveats(ctx context.Context, caveatNames []string) ( } caveats = append(caveats, datastore.RevisionedCaveat{ Definition: loaded, - LastWrittenRevision: revisionFromTimestamp(bat.timestamp), + LastWrittenRevision: revisions.NewHLCForTime(bat.timestamp), }) } diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 240acfb702..63b83a532d 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -21,20 +21,21 @@ import ( datastoreinternal "github.com/authzed/spicedb/internal/datastore" "github.com/authzed/spicedb/internal/datastore/common" - "github.com/authzed/spicedb/internal/datastore/common/revisions" "github.com/authzed/spicedb/internal/datastore/crdb/migrations" "github.com/authzed/spicedb/internal/datastore/crdb/pool" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" ) func init() { datastore.Engines = append(datastore.Engines, Engine) } +var ParseRevisionString = revisions.RevisionParser(revisions.HybridLogicalClock) + var ( psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) @@ -165,7 +166,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas config.followerReadDelay, config.revisionQuantization, ), - DecimalDecoder: revision.DecimalDecoder{}, + CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, dburl: url, watchBufferLength: config.watchBufferLength, writeOverlapKeyer: keyer, @@ -244,7 +245,7 @@ func NewCRDBDatastore(ctx context.Context, url string, options ...Option) (datas type crdbDatastore struct { *revisions.RemoteClockRevisions - revision.DecimalDecoder + revisions.CommonDecoder dburl string readPool, writePool *pool.RetryPool @@ -279,7 +280,7 @@ func (cds *crdbDatastore) ReadWriteTx( f datastore.TxUserFunc, opts ...options.RWTOptionsOption, ) (datastore.Revision, error) { - var commitTimestamp revision.Decimal + var commitTimestamp datastore.Revision config := options.NewRWTOptionsWithOptions(opts...) if config.DisableRetries { @@ -415,13 +416,13 @@ func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision, return cds.headRevisionInternal(ctx) } -func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (revision.Decimal, error) { - var hlcNow revision.Decimal +func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { + var hlcNow datastore.Revision var fnErr error hlcNow, fnErr = readCRDBNow(ctx, cds.readPool) if fnErr != nil { - return revision.NoRevision, fmt.Errorf(errRevision, fnErr) + return datastore.NoRevision, fmt.Errorf(errRevision, fnErr) } return hlcNow, fnErr @@ -464,7 +465,7 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er return &features, nil } -func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (revision.Decimal, error) { +func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) { ctx, span := tracer.Start(ctx, "readCRDBNow") defer span.End() @@ -472,10 +473,10 @@ func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (revision. if err := reader.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { return row.Scan(&hlcNow) }, querySelectNow); err != nil { - return revision.NoRevision, fmt.Errorf("unable to read timestamp: %w", err) + return datastore.NoRevision, fmt.Errorf("unable to read timestamp: %w", err) } - return revision.NewFromDecimal(hlcNow), nil + return revisions.NewForHLC(hlcNow), nil } func readClusterTTLNanos(ctx context.Context, conn pgxcommon.DBFuncQuerier) (int64, error) { @@ -499,7 +500,3 @@ func readClusterTTLNanos(ctx context.Context, conn pgxcommon.DBFuncQuerier) (int return gcSeconds * 1_000_000_000, nil } - -func revisionFromTimestamp(t time.Time) revision.Decimal { - return revision.NewFromDecimal(decimal.NewFromInt(t.UnixNano())) -} diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index b0cfd9cbb1..b5deccb3b0 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -27,9 +27,9 @@ import ( crdbmigrations "github.com/authzed/spicedb/internal/datastore/crdb/migrations" "github.com/authzed/spicedb/internal/datastore/crdb/pool" + "github.com/authzed/spicedb/internal/datastore/revisions" testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" "github.com/authzed/spicedb/pkg/datastore/test" "github.com/authzed/spicedb/pkg/migrate" ) @@ -102,7 +102,7 @@ func TestCRDBDatastoreWithFollowerReads(t *testing.T) { nowRevision, err := ds.HeadRevision(ctx) require.NoError(err) - diff := nowRevision.(revision.Decimal).IntPart() - testRevision.(revision.Decimal).IntPart() + diff := nowRevision.(revisions.HLCRevision).TimestampNanoSec() - testRevision.(revisions.HLCRevision).TimestampNanoSec() require.True(diff > followerReadDelay.Nanoseconds()) } }) diff --git a/internal/datastore/crdb/pool_test.go b/internal/datastore/crdb/pool_test.go index 0524c4cca9..2e440ec6c3 100644 --- a/internal/datastore/crdb/pool_test.go +++ b/internal/datastore/crdb/pool_test.go @@ -14,7 +14,6 @@ import ( "github.com/authzed/spicedb/internal/datastore/crdb/pool" testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" "github.com/authzed/spicedb/pkg/namespace" ) @@ -134,7 +133,7 @@ func TestTxReset(t *testing.T) { require.Equal(datastore.NoRevision, rev) } else { require.NoError(err) - require.True(rev.GreaterThan(revision.NoRevision)) + require.NotEqual(datastore.NoRevision, rev) } }) } diff --git a/internal/datastore/crdb/reader.go b/internal/datastore/crdb/reader.go index 5633a1d326..ef5cef2872 100644 --- a/internal/datastore/crdb/reader.go +++ b/internal/datastore/crdb/reader.go @@ -11,6 +11,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -67,7 +68,7 @@ func (cr *crdbReader) ReadNamespaceByName( return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) } - return config, revisionFromTimestamp(timestamp), nil + return config, revisions.NewHLCForTime(timestamp), nil } func (cr *crdbReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) { @@ -190,7 +191,7 @@ func (cr crdbReader) lookupNamespaces(ctx context.Context, tx pgxcommon.DBFuncQu nsDefs = append(nsDefs, datastore.RevisionedNamespace{ Definition: loaded, - LastWrittenRevision: revisionFromTimestamp(timestamp), + LastWrittenRevision: revisions.NewHLCForTime(timestamp), }) } @@ -231,7 +232,7 @@ func loadAllNamespaces(ctx context.Context, tx pgxcommon.DBFuncQuerier, fromBuil nsDefs = append(nsDefs, datastore.RevisionedNamespace{ Definition: loaded, - LastWrittenRevision: revisionFromTimestamp(timestamp), + LastWrittenRevision: revisions.NewHLCForTime(timestamp), }) } diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index a9a9e0bd70..4d9f35a1f8 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -11,8 +11,8 @@ import ( "github.com/shopspring/decimal" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) const ( @@ -88,22 +88,22 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro }, nil } -func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (revision.Decimal, error) { +func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (datastore.Revision, error) { counterID := make([]byte, 2) _, err := rand.New(rng).Read(counterID) if err != nil { - return revision.NoRevision, fmt.Errorf("unable to select random counter: %w", err) + return datastore.NoRevision, fmt.Errorf("unable to select random counter: %w", err) } sql, args, err := upsertCounterQuery.Values(counterID, change).ToSql() if err != nil { - return revision.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err) + return datastore.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err) } var timestamp decimal.Decimal if err := tx.QueryRow(ctx, sql, args...).Scan(×tamp); err != nil { - return revision.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err) + return datastore.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err) } - return revision.NewFromDecimal(timestamp), nil + return revisions.NewForHLC(timestamp), nil } diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index b085506098..5d3e2c8125 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -15,8 +15,8 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb/pool" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -158,7 +158,7 @@ func (cds *crdbDatastore) watch( // no return value so we're not really losing anything. defer func() { go changes.Close() }() - tracked := common.NewChanges(revision.DecimalKeyFunc, opts.Content) + tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content) for changes.Next() { var tableNameBytes []byte @@ -180,13 +180,13 @@ func (cds *crdbDatastore) watch( // Resolved indicates that the specified revision is "complete"; no additional updates can come in before or at it. // Therefore, at this point, we issue tracked updates from before that time, and the checkpoint update. if details.Resolved != "" { - rev, err := cds.RevisionFromString(details.Resolved) + rev, err := revisions.HLCRevisionFromString(details.Resolved) if err != nil { sendError(fmt.Errorf("malformed resolved timestamp: %w", err)) return } - for _, revChange := range tracked.FilterAndRemoveRevisionChanges(revision.DecimalKeyLessThanFunc, rev.(revision.Decimal)) { + for _, revChange := range tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev) { revChange := revChange if !sendChange(&revChange) { return @@ -241,19 +241,19 @@ func (cds *crdbDatastore) watch( Caveat: ctxCaveat, } - rev, err := cds.RevisionFromString(details.Updated) + rev, err := revisions.HLCRevisionFromString(details.Updated) if err != nil { sendError(fmt.Errorf("malformed update timestamp: %w", err)) return } if details.After == nil { - if err := tracked.AddRelationshipChange(ctx, rev.(revision.Decimal), tuple, core.RelationTupleUpdate_DELETE); err != nil { + if err := tracked.AddRelationshipChange(ctx, rev, tuple, core.RelationTupleUpdate_DELETE); err != nil { sendError(err) return } } else { - if err := tracked.AddRelationshipChange(ctx, rev.(revision.Decimal), tuple, core.RelationTupleUpdate_TOUCH); err != nil { + if err := tracked.AddRelationshipChange(ctx, rev, tuple, core.RelationTupleUpdate_TOUCH); err != nil { sendError(err) return } @@ -267,7 +267,7 @@ func (cds *crdbDatastore) watch( definitionName := pkValues[0] - rev, err := cds.RevisionFromString(details.Updated) + rev, err := revisions.HLCRevisionFromString(details.Updated) if err != nil { sendError(fmt.Errorf("malformed update timestamp: %w", err)) return @@ -285,9 +285,9 @@ func (cds *crdbDatastore) watch( sendError(fmt.Errorf("could not unmarshal namespace definition: %w", err)) return } - tracked.AddChangedDefinition(ctx, rev.(revision.Decimal), namespaceDef) + tracked.AddChangedDefinition(ctx, rev, namespaceDef) } else { - tracked.AddDeletedNamespace(ctx, rev.(revision.Decimal), definitionName) + tracked.AddDeletedNamespace(ctx, rev, definitionName) } case tableCaveat: @@ -298,7 +298,7 @@ func (cds *crdbDatastore) watch( definitionName := pkValues[0] - rev, err := cds.RevisionFromString(details.Updated) + rev, err := revisions.HLCRevisionFromString(details.Updated) if err != nil { sendError(fmt.Errorf("malformed update timestamp: %w", err)) return @@ -316,9 +316,9 @@ func (cds *crdbDatastore) watch( sendError(fmt.Errorf("could not unmarshal caveat definition: %w", err)) return } - tracked.AddChangedDefinition(ctx, rev.(revision.Decimal), caveatDef) + tracked.AddChangedDefinition(ctx, rev, caveatDef) } else { - tracked.AddDeletedCaveat(ctx, rev.(revision.Decimal), definitionName) + tracked.AddDeletedCaveat(ctx, rev, definitionName) } } } diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 6d8d339766..7f31390e18 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -11,11 +11,10 @@ import ( "github.com/google/uuid" "github.com/hashicorp/go-memdb" - "github.com/shopspring/decimal" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -57,20 +56,20 @@ func NewMemdbDatastore( } uniqueID := uuid.NewString() - - negativeGCWindow := decimal.NewFromInt(gcWindow.Nanoseconds()).Mul(decimal.NewFromInt(-1)) - return &memdbDatastore{ + CommonDecoder: revisions.CommonDecoder{ + Kind: revisions.Timestamp, + }, db: db, revisions: []snapshot{ { - revision: revisionFromTimestamp(time.Now().UTC()).Decimal, + revision: nowRevision(), db: db, }, }, - negativeGCWindow: negativeGCWindow, - quantizationPeriod: decimal.NewFromInt(revisionQuantization.Nanoseconds()), + negativeGCWindow: gcWindow.Nanoseconds() * -1, + quantizationPeriod: revisionQuantization.Nanoseconds(), watchBufferLength: watchBufferLength, uniqueID: uniqueID, }, nil @@ -78,26 +77,24 @@ func NewMemdbDatastore( type memdbDatastore struct { sync.RWMutex - revision.DecimalDecoder + revisions.CommonDecoder db *memdb.MemDB revisions []snapshot activeWriteTxn *memdb.Txn - negativeGCWindow decimal.Decimal - quantizationPeriod decimal.Decimal + negativeGCWindow int64 + quantizationPeriod int64 watchBufferLength uint16 uniqueID string } type snapshot struct { - revision decimal.Decimal + revision revisions.TimestampRevision db *memdb.MemDB } -func (mdb *memdbDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { - dr := revisionRaw.(revision.Decimal) - +func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader { mdb.RLock() defer mdb.RUnlock() @@ -110,7 +107,7 @@ func (mdb *memdbDatastore) SnapshotReader(revisionRaw datastore.Revision) datast } revIndex := sort.Search(len(mdb.revisions), func(i int) bool { - return mdb.revisions[i].revision.GreaterThanOrEqual(dr.Decimal) + return mdb.revisions[i].revision.GreaterThan(dr) || mdb.revisions[i].revision.Equal(dr) }) // handle the case when there is no revision snapshot newer than the requested revision @@ -228,7 +225,7 @@ func (mdb *memdbDatastore) ReadWriteTx( } change := &changelog{ - revisionNanos: newRevision.IntPart(), + revisionNanos: newRevision.TimestampNanoSec(), changes: newChanges, } if err := tx.Insert(tableChangelog, change); err != nil { @@ -245,7 +242,7 @@ func (mdb *memdbDatastore) ReadWriteTx( } snap := mdb.db.Snapshot() - mdb.revisions = append(mdb.revisions, snapshot{newRevision.Decimal, snap}) + mdb.revisions = append(mdb.revisions, snapshot{newRevision, snap}) return newRevision, nil } @@ -274,7 +271,7 @@ func (mdb *memdbDatastore) Close() error { if db := mdb.db; db != nil { mdb.revisions = []snapshot{ { - revision: revisionFromTimestamp(time.Now().UTC()).Decimal, + revision: nowRevision(), db: db, }, } diff --git a/internal/datastore/memdb/revisions.go b/internal/datastore/memdb/revisions.go index 14037047cb..5717390366 100644 --- a/internal/datastore/memdb/revisions.go +++ b/internal/datastore/memdb/revisions.go @@ -5,22 +5,22 @@ import ( "fmt" "time" - "github.com/shopspring/decimal" - + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) -func revisionFromTimestamp(t time.Time) revision.Decimal { - return revision.NewFromDecimal(decimal.NewFromInt(t.UnixNano())) +var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp) + +func nowRevision() revisions.TimestampRevision { + return revisions.NewForTime(time.Now().UTC()) } -func (mdb *memdbDatastore) newRevisionID() revision.Decimal { +func (mdb *memdbDatastore) newRevisionID() revisions.TimestampRevision { mdb.Lock() defer mdb.Unlock() existing := mdb.revisions[len(mdb.revisions)-1].revision - created := revisionFromTimestamp(time.Now().UTC()).Decimal + created := nowRevision() // NOTE: The time.Now().UTC() only appears to have *microsecond* level // precision on macOS Monterey in Go 1.19.1. This means that HeadRevision @@ -32,10 +32,11 @@ func (mdb *memdbDatastore) newRevisionID() revision.Decimal { // See: https://github.com/golang/go/issues/22037 which appeared to fix // this in Go 1.9.2, but there appears to have been a reversion with either // the new version of macOS or Go. - if created.Equals(existing) { - return revision.NewFromDecimal(created.Add(decimal.NewFromInt(1))) + if created.Equal(existing) { + return revisions.NewForTimestamp(created.TimestampNanoSec() + 1) } - return revision.NewFromDecimal(created) + + return created } func (mdb *memdbDatastore) HeadRevision(_ context.Context) (datastore.Revision, error) { @@ -45,19 +46,19 @@ func (mdb *memdbDatastore) HeadRevision(_ context.Context) (datastore.Revision, return nil, fmt.Errorf("datastore has been closed") } - return revision.NewFromDecimal(mdb.headRevisionNoLock()), nil + return mdb.headRevisionNoLock(), nil } func (mdb *memdbDatastore) SquashRevisionsForTesting() { mdb.revisions = []snapshot{ { - revision: revisionFromTimestamp(time.Now().UTC()).Decimal, + revision: nowRevision(), db: mdb.db, }, } } -func (mdb *memdbDatastore) headRevisionNoLock() decimal.Decimal { +func (mdb *memdbDatastore) headRevisionNoLock() revisions.TimestampRevision { return mdb.revisions[len(mdb.revisions)-1].revision } @@ -68,54 +69,51 @@ func (mdb *memdbDatastore) OptimizedRevision(_ context.Context) (datastore.Revis return nil, fmt.Errorf("datastore has been closed") } - now := revisionFromTimestamp(time.Now().UTC()) - return revision.NewFromDecimal(now.Sub(now.Mod(mdb.quantizationPeriod))), nil + now := nowRevision() + return revisions.NewForTimestamp(now.TimestampNanoSec() - now.TimestampNanoSec()%mdb.quantizationPeriod), nil } -func (mdb *memdbDatastore) CheckRevision(_ context.Context, revisionRaw datastore.Revision) error { +func (mdb *memdbDatastore) CheckRevision(_ context.Context, dr datastore.Revision) error { mdb.RLock() defer mdb.RUnlock() if mdb.db == nil { return fmt.Errorf("datastore has been closed") } - dr, ok := revisionRaw.(revision.Decimal) - if !ok { - return datastore.NewInvalidRevisionErr(revisionRaw, datastore.CouldNotDetermineRevision) - } return mdb.checkRevisionLocalCallerMustLock(dr) } -func (mdb *memdbDatastore) checkRevisionLocalCallerMustLock(revisionRaw revision.Decimal) error { - now := revisionFromTimestamp(time.Now().UTC()) +func (mdb *memdbDatastore) checkRevisionLocalCallerMustLock(dr datastore.Revision) error { + now := nowRevision() // Ensure the revision has not fallen outside of the GC window. If it has, it is considered // invalid. - if mdb.revisionOutsideGCWindow(now, revisionRaw) { - return datastore.NewInvalidRevisionErr(revisionRaw, datastore.RevisionStale) + if mdb.revisionOutsideGCWindow(now, dr) { + return datastore.NewInvalidRevisionErr(dr, datastore.RevisionStale) } // If the revision <= now and later than the GC window, it is assumed to be valid, even if // HEAD revision is behind it. - if revisionRaw.GreaterThan(now) { + if dr.GreaterThan(now) { // If the revision is in the "future", then check to ensure that it is <= of HEAD to handle // the microsecond granularity on macos (see comment above in newRevisionID) headRevision := mdb.headRevisionNoLock() - if revisionRaw.LessThanOrEqual(headRevision) { + if dr.LessThan(headRevision) || dr.Equal(headRevision) { return nil } - return datastore.NewInvalidRevisionErr(revisionRaw, datastore.CouldNotDetermineRevision) + return datastore.NewInvalidRevisionErr(dr, datastore.CouldNotDetermineRevision) } return nil } -func (mdb *memdbDatastore) revisionOutsideGCWindow(now revision.Decimal, revisionRaw revision.Decimal) bool { +func (mdb *memdbDatastore) revisionOutsideGCWindow(now revisions.TimestampRevision, revisionRaw datastore.Revision) bool { // make an exception for head revision - it will be acceptable even if outside GC Window - if revisionRaw.Equals(mdb.headRevisionNoLock()) { + if revisionRaw.Equal(mdb.headRevisionNoLock()) { return false } - oldest := revision.NewFromDecimal(now.Add(mdb.negativeGCWindow)) + + oldest := revisions.NewForTimestamp(now.TimestampNanoSec() + mdb.negativeGCWindow) return revisionRaw.LessThan(oldest) } diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index a238851d33..528eb37fba 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -7,15 +7,13 @@ import ( "github.com/hashicorp/go-memdb" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) const errWatchError = "watch error: %w" -func (mdb *memdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - ar := afterRevision.(revision.Decimal) - +func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { updates := make(chan *datastore.RevisionChanges, mdb.watchBufferLength) errs := make(chan error, 1) @@ -28,7 +26,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, afterRevision datastore.Re defer close(updates) defer close(errs) - currentTxn := ar.IntPart() + currentTxn := ar.(revisions.TimestampRevision).TimestampNanoSec() for { var stagedUpdates []*datastore.RevisionChanges diff --git a/internal/datastore/mysql/caveat.go b/internal/datastore/mysql/caveat.go index 2168ca7d5d..84283a3bb6 100644 --- a/internal/datastore/mysql/caveat.go +++ b/internal/datastore/mysql/caveat.go @@ -7,12 +7,11 @@ import ( "fmt" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" sq "github.com/Masterminds/squirrel" - "github.com/shopspring/decimal" ) const ( @@ -36,8 +35,8 @@ func (mr *mysqlReader) ReadCaveatByName(ctx context.Context, name string) (*core defer common.LogOnError(ctx, txCleanup) var serializedDef []byte - var rev decimal.Decimal - err = tx.QueryRowContext(ctx, sqlStatement, args...).Scan(&serializedDef, &rev) + var txID uint64 + err = tx.QueryRowContext(ctx, sqlStatement, args...).Scan(&serializedDef, &txID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, datastore.NoRevision, datastore.NewCaveatNameNotFoundErr(name) @@ -49,7 +48,7 @@ func (mr *mysqlReader) ReadCaveatByName(ctx context.Context, name string) (*core if err != nil { return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, err) } - return &def, revision.NewFromDecimal(rev), nil + return &def, revisions.NewForTransactionID(txID), nil } func (mr *mysqlReader) LookupCaveatsWithNames(ctx context.Context, caveatNames []string) ([]datastore.RevisionedCaveat, error) { @@ -90,9 +89,9 @@ func (mr *mysqlReader) lookupCaveats(ctx context.Context, caveatNames []string) var caveats []datastore.RevisionedCaveat for rows.Next() { var defBytes []byte - var version decimal.Decimal + var txID uint64 - err = rows.Scan(&defBytes, &version) + err = rows.Scan(&defBytes, &txID) if err != nil { return nil, fmt.Errorf(errListCaveats, err) } @@ -103,7 +102,7 @@ func (mr *mysqlReader) lookupCaveats(ctx context.Context, caveatNames []string) } caveats = append(caveats, datastore.RevisionedCaveat{ Definition: &c, - LastWrittenRevision: revision.NewFromDecimal(version), + LastWrittenRevision: revisions.NewForTransactionID(txID), }) } if rows.Err() != nil { diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 72f91bc9cd..650c27673c 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -22,12 +22,11 @@ import ( datastoreinternal "github.com/authzed/spicedb/internal/datastore" "github.com/authzed/spicedb/internal/datastore/common" - "github.com/authzed/spicedb/internal/datastore/common/revisions" "github.com/authzed/spicedb/internal/datastore/mysql/migrations" + "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -219,6 +218,9 @@ func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Dat CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions( maxRevisionStaleness, ), + CommonDecoder: revisions.CommonDecoder{ + Kind: revisions.TransactionID, + }, } store.SetOptimizedRevisionFunc(store.optimizedRevisionFunc) @@ -250,9 +252,7 @@ func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Dat } // TODO (@vroldanbet) dupe from postgres datastore - need to refactor -func (mds *Datastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { - rev := revisionRaw.(revision.Decimal) - +func (mds *Datastore) SnapshotReader(rev datastore.Revision) datastore.Reader { createTxFunc := func(ctx context.Context) (*sql.Tx, txCleanupFunc, error) { tx, err := mds.db.BeginTx(ctx, mds.readTxOptions) if err != nil { @@ -324,7 +324,7 @@ func (mds *Datastore) ReadWriteTx( return datastore.NoRevision, wrapError(err) } - return revisionFromTransaction(newTxnID), nil + return revisions.NewForTransactionID(newTxnID), nil } if !config.DisableRetries { err = fmt.Errorf("max retries exceeded: %w", err) @@ -450,7 +450,7 @@ type Datastore struct { *QueryBuilder *revisions.CachedOptimizedRevisions - revision.DecimalDecoder + revisions.CommonDecoder } // Close closes the data store. @@ -596,9 +596,9 @@ func (mds *Datastore) seedDatabase(ctx context.Context) error { } // TODO (@vroldanbet) dupe from postgres datastore - need to refactor -func buildLivingObjectFilterForRevision(revision revision.Decimal) queryFilterer { +func buildLivingObjectFilterForRevision(revision datastore.Revision) queryFilterer { return func(original sq.SelectBuilder) sq.SelectBuilder { - return original.Where(sq.LtOrEq{colCreatedTxn: transactionFromRevision(revision)}). + return original.Where(sq.LtOrEq{colCreatedTxn: revision.(revisions.TransactionIDRevision).TransactionID()}). Where(sq.Or{ sq.Eq{colDeletedTxn: liveDeletedTxnID}, sq.Gt{colDeletedTxn: revision}, diff --git a/internal/datastore/mysql/datastore_test.go b/internal/datastore/mysql/datastore_test.go index 8fdd97da80..aa7bbd0f6d 100644 --- a/internal/datastore/mysql/datastore_test.go +++ b/internal/datastore/mysql/datastore_test.go @@ -17,6 +17,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/mysql/migrations" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/internal/testfixtures" testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/pkg/datastore" @@ -654,7 +655,7 @@ func TransactionTimestampsTest(t *testing.T, ds datastore.Datastore) { revision, err := ds.OptimizedRevision(ctx) req.NoError(err) - req.Equal(revisionFromTransaction(txID), revision) + req.Equal(revisions.NewForTransactionID(txID), revision) } func TestMySQLMigrations(t *testing.T) { diff --git a/internal/datastore/mysql/gc.go b/internal/datastore/mysql/gc.go index 706d70d679..2b9a35a03b 100644 --- a/internal/datastore/mysql/gc.go +++ b/internal/datastore/mysql/gc.go @@ -6,12 +6,11 @@ import ( "time" sq "github.com/Masterminds/squirrel" - "github.com/shopspring/decimal" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) var _ common.GarbageCollector = (*Datastore)(nil) @@ -66,7 +65,8 @@ func (mds *Datastore) TxIDBefore(ctx context.Context, before time.Time) (datasto log.Ctx(ctx).Debug().Time("before", before).Msg("no stale transactions found in the datastore") return datastore.NoRevision, nil } - return revision.NewFromDecimal(decimal.NewFromInt(value.Int64)), nil + + return revisions.NewForTransactionID(uint64(value.Int64)), nil } // TODO (@vroldanbet) dupe from postgres datastore - need to refactor diff --git a/internal/datastore/mysql/reader.go b/internal/datastore/mysql/reader.go index 285c764740..d624f99d6d 100644 --- a/internal/datastore/mysql/reader.go +++ b/internal/datastore/mysql/reader.go @@ -7,12 +7,11 @@ import ( "fmt" sq "github.com/Masterminds/squirrel" - "github.com/shopspring/decimal" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -121,8 +120,8 @@ func loadNamespace(ctx context.Context, namespace string, tx *sql.Tx, baseQuery } var config []byte - var version decimal.Decimal - err = tx.QueryRowContext(ctx, query, args...).Scan(&config, &version) + var txID uint64 + err = tx.QueryRowContext(ctx, query, args...).Scan(&config, &txID) if err != nil { if errors.Is(err, sql.ErrNoRows) { err = datastore.NewNamespaceNotFoundErr(namespace) @@ -135,7 +134,7 @@ func loadNamespace(ctx context.Context, namespace string, tx *sql.Tx, baseQuery return nil, datastore.NoRevision, err } - return loaded, revision.NewFromDecimal(version), nil + return loaded, revisions.NewForTransactionID(txID), nil } func (mr *mysqlReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) { @@ -200,8 +199,8 @@ func loadAllNamespaces(ctx context.Context, tx *sql.Tx, queryBuilder sq.SelectBu for rows.Next() { var config []byte - var version decimal.Decimal - if err := rows.Scan(&config, &version); err != nil { + var txID uint64 + if err := rows.Scan(&config, &txID); err != nil { return nil, err } @@ -212,7 +211,7 @@ func loadAllNamespaces(ctx context.Context, tx *sql.Tx, queryBuilder sq.SelectBu nsDefs = append(nsDefs, datastore.RevisionedNamespace{ Definition: loaded, - LastWrittenRevision: revision.NewFromDecimal(version), + LastWrittenRevision: revisions.NewForTransactionID(txID), }) } if rows.Err() != nil { diff --git a/internal/datastore/mysql/revisions.go b/internal/datastore/mysql/revisions.go index ec155b1c75..8b86a66a80 100644 --- a/internal/datastore/mysql/revisions.go +++ b/internal/datastore/mysql/revisions.go @@ -5,15 +5,14 @@ import ( "database/sql" "errors" "fmt" - "math/big" "time" - "github.com/shopspring/decimal" - + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) +var ParseRevisionString = revisions.RevisionParser(revisions.TransactionID) + const ( errRevision = "unable to find revision: %w" errCheckRevision = "unable to check revision: %w" @@ -69,9 +68,9 @@ func (mds *Datastore) optimizedRevisionFunc(ctx context.Context) (datastore.Revi var validForNanos time.Duration if err := mds.db.QueryRowContext(ctx, mds.optimizedRevisionQuery). Scan(&rev, &validForNanos); err != nil { - return revision.NoRevision, 0, fmt.Errorf(errRevision, err) + return datastore.NoRevision, 0, fmt.Errorf(errRevision, err) } - return revisionFromTransaction(rev), validForNanos, nil + return revisions.NewForTransactionID(rev), validForNanos, nil } func (mds *Datastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { @@ -85,20 +84,20 @@ func (mds *Datastore) HeadRevision(ctx context.Context) (datastore.Revision, err return datastore.NoRevision, nil } - return revisionFromTransaction(revision), nil + return revisions.NewForTransactionID(revision), nil } -func (mds *Datastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error { - if revisionRaw == datastore.NoRevision { - return datastore.NewInvalidRevisionErr(revisionRaw, datastore.CouldNotDetermineRevision) +func (mds *Datastore) CheckRevision(ctx context.Context, revision datastore.Revision) error { + if revision == datastore.NoRevision { + return datastore.NewInvalidRevisionErr(revision, datastore.CouldNotDetermineRevision) } - revision := revisionRaw.(revision.Decimal) - - // TODO (@vroldanbet) dupe from postgres datastore - need to refactor - - revisionTx := transactionFromRevision(revision) + rev, ok := revision.(revisions.TransactionIDRevision) + if !ok { + return fmt.Errorf("expected transaction revision, got %T", revision) + } + revisionTx := rev.TransactionID() freshEnough, unknown, err := mds.checkValidTransaction(ctx, revisionTx) if err != nil { return fmt.Errorf(errCheckRevision, err) @@ -179,11 +178,3 @@ func (mds *Datastore) createNewTransaction(ctx context.Context, tx *sql.Tx) (new return uint64(lastInsertID), nil } - -func revisionFromTransaction(txID uint64) revision.Decimal { - return revision.NewFromDecimal(decimal.NewFromBigInt(new(big.Int).SetUint64(txID), 0)) -} - -func transactionFromRevision(revision revision.Decimal) uint64 { - return uint64(revision.IntPart()) -} diff --git a/internal/datastore/mysql/revisions_test.go b/internal/datastore/mysql/revisions_test.go index 72dd1dd21d..c3ebe5d0d0 100644 --- a/internal/datastore/mysql/revisions_test.go +++ b/internal/datastore/mysql/revisions_test.go @@ -2,52 +2,30 @@ package mysql import ( "math" - "math/big" "testing" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) -func Test_revisionFromTransaction(t *testing.T) { +func TestRevisionFromTransaction(t *testing.T) { tests := []struct { name string txID uint64 want datastore.Revision }{ - {"0", 0, revision.NewFromDecimal(decimal.NewFromInt(0))}, - {"uint64 max", math.MaxUint64, revision.NewFromDecimal(decimal.NewFromBigInt(new(big.Int).SetUint64(math.MaxUint64), 0))}, + {"0", 0, revisions.NewForTransactionID(0)}, + {"uint64 max", math.MaxUint64, revisions.NewForTransactionID(math.MaxUint64)}, } for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { require := require.New(t) - got := revisionFromTransaction(tt.txID) + got := revisions.NewForTransactionID(tt.txID) require.True(tt.want.Equal(got)) }) } } - -func Test_transactionFromRevision(t *testing.T) { - tests := []struct { - name string - revision revision.Decimal - want uint64 - }{ - {"0", revision.NewFromDecimal(decimal.NewFromInt(0)), 0}, - {"uint64 max", revision.NewFromDecimal(decimal.NewFromBigInt(new(big.Int).SetUint64(math.MaxUint64), 0)), math.MaxUint64}, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - got := transactionFromRevision(tt.revision) - require.Equal(tt.want, got) - }) - } -} diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index 25a1301f62..d9a8a93a55 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -6,8 +6,8 @@ import ( "time" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" sq "github.com/Masterminds/squirrel" @@ -20,11 +20,7 @@ const ( // Watch notifies the caller about all changes to tuples. // // All events following afterRevision will be sent to the caller. -// -// TODO (@vroldanbet) dupe from postgres datastore - need to refactor func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - afterRevision := afterRevisionRaw.(revision.Decimal) - updates := make(chan *datastore.RevisionChanges, mds.watchBufferLength) errs := make(chan error, 1) @@ -33,12 +29,17 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi return updates, errs } + afterRevision, ok := afterRevisionRaw.(revisions.TransactionIDRevision) + if !ok { + errs <- datastore.NewInvalidRevisionErr(afterRevisionRaw, datastore.CouldNotDetermineRevision) + return updates, errs + } + go func() { defer close(updates) defer close(errs) - currentTxn := transactionFromRevision(afterRevision) - + currentTxn := afterRevision.TransactionID() for { var stagedUpdates []datastore.RevisionChanges var err error @@ -120,7 +121,7 @@ func (mds *Datastore) loadChanges( } defer common.LogOnError(ctx, rows.Close) - stagedChanges := common.NewChanges(revision.DecimalKeyFunc, options.Content) + stagedChanges := common.NewChanges(revisions.TransactionIDKeyFunc, options.Content) for rows.Next() { nextTuple := &core.RelationTuple{ @@ -153,13 +154,13 @@ func (mds *Datastore) loadChanges( } if createdTxn > afterRevision && createdTxn <= newRevision { - if err = stagedChanges.AddRelationshipChange(ctx, revisionFromTransaction(createdTxn), nextTuple, core.RelationTupleUpdate_TOUCH); err != nil { + if err = stagedChanges.AddRelationshipChange(ctx, revisions.NewForTransactionID(createdTxn), nextTuple, core.RelationTupleUpdate_TOUCH); err != nil { return } } if deletedTxn > afterRevision && deletedTxn <= newRevision { - if err = stagedChanges.AddRelationshipChange(ctx, revisionFromTransaction(deletedTxn), nextTuple, core.RelationTupleUpdate_DELETE); err != nil { + if err = stagedChanges.AddRelationshipChange(ctx, revisions.NewForTransactionID(deletedTxn), nextTuple, core.RelationTupleUpdate_DELETE); err != nil { return } } @@ -168,7 +169,6 @@ func (mds *Datastore) loadChanges( return } - changes = stagedChanges.AsRevisionChanges(revision.DecimalKeyLessThanFunc) - + changes = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) return } diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 49c9cfabc5..036418537b 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -24,9 +24,9 @@ import ( datastoreinternal "github.com/authzed/spicedb/internal/datastore" "github.com/authzed/spicedb/internal/datastore/common" - "github.com/authzed/spicedb/internal/datastore/common/revisions" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/internal/datastore/postgres/migrations" + "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" diff --git a/internal/datastore/proxy/hedging_test.go b/internal/datastore/proxy/hedging_test.go index 9869297661..a2c5374f7a 100644 --- a/internal/datastore/proxy/hedging_test.go +++ b/internal/datastore/proxy/hedging_test.go @@ -8,16 +8,15 @@ import ( "time" "github.com/benbjohnson/clock" - "github.com/shopspring/decimal" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/goleak" "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -29,8 +28,8 @@ var ( errKnown = errors.New("known error") errAnotherKnown = errors.New("another known error") nsKnown = "namespace_name" - revisionKnown = revision.NewFromDecimal(decimal.NewFromInt(1)) - anotherRevisionKnown = revision.NewFromDecimal(decimal.NewFromInt(2)) + revisionKnown = revisions.NewForTransactionID(1) + anotherRevisionKnown = revisions.NewForTransactionID(2) emptyIterator = common.NewSliceRelationshipIterator(nil, options.Unsorted) ) diff --git a/internal/datastore/proxy/readonly_test.go b/internal/datastore/proxy/readonly_test.go index c2f3208bda..fd73df7129 100644 --- a/internal/datastore/proxy/readonly_test.go +++ b/internal/datastore/proxy/readonly_test.go @@ -4,14 +4,13 @@ import ( "context" "testing" - "github.com/shopspring/decimal" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" ) @@ -51,7 +50,7 @@ func TestRWOperationErrors(t *testing.T) { require.Equal(datastore.NoRevision, rev) } -var expectedRevision = revision.NewFromDecimal(decimal.NewFromInt(123)) +var expectedRevision = revisions.NewForTransactionID(123) func TestReadyStatePassthrough(t *testing.T) { require := require.New(t) diff --git a/internal/datastore/proxy/schemacaching/intervaltracker_test.go b/internal/datastore/proxy/schemacaching/intervaltracker_test.go index b55d1b1f88..561652f985 100644 --- a/internal/datastore/proxy/schemacaching/intervaltracker_test.go +++ b/internal/datastore/proxy/schemacaching/intervaltracker_test.go @@ -6,12 +6,14 @@ import ( "github.com/stretchr/testify/require" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) func rev(value string) datastore.Revision { - dd := revision.DecimalDecoder{} + dd := revisions.CommonDecoder{ + Kind: revisions.HybridLogicalClock, + } rev, _ := dd.RevisionFromString(value) return rev } diff --git a/internal/datastore/proxy/schemacaching/standardcaching_test.go b/internal/datastore/proxy/schemacaching/standardcaching_test.go index 77c21edef6..d31d969116 100644 --- a/internal/datastore/proxy/schemacaching/standardcaching_test.go +++ b/internal/datastore/proxy/schemacaching/standardcaching_test.go @@ -8,17 +8,16 @@ import ( "testing" "time" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/caveats" caveattypes "github.com/authzed/spicedb/pkg/caveats/types" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" "github.com/authzed/spicedb/pkg/genutil/mapz" ns "github.com/authzed/spicedb/pkg/namespace" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -26,10 +25,10 @@ import ( ) var ( - old = revision.NewFromDecimal(decimal.NewFromInt(-100)) - zero = revision.NewFromDecimal(decimal.NewFromInt(0)) - one = revision.NewFromDecimal(decimal.NewFromInt(1)) - two = revision.NewFromDecimal(decimal.NewFromInt(2)) + old = revisions.NewForTransactionID(0) + zero = revisions.NewForTransactionID(1) + one = revisions.NewForTransactionID(2) + two = revisions.NewForTransactionID(3) nilOpts []options.RWTOptionsOption ) diff --git a/internal/datastore/proxy/schemacaching/watchingcache.go b/internal/datastore/proxy/schemacaching/watchingcache.go index 6afc63df2d..dbb604f84d 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache.go +++ b/internal/datastore/proxy/schemacaching/watchingcache.go @@ -9,11 +9,11 @@ import ( "github.com/prometheus/client_golang/prometheus" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/cache" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" "github.com/authzed/spicedb/pkg/genutil/mapz" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -264,7 +264,7 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error { log.Trace().Object("update", ss).Msg("received update from schema watch") if ss.IsCheckpoint { - if converted, ok := ss.Revision.(revision.Decimal); ok { + if converted, ok := ss.Revision.(revisions.WithInexactFloat64); ok { schemaCacheRevisionGauge.Set(converted.InexactFloat64()) } diff --git a/internal/datastore/revisions/commonrevision.go b/internal/datastore/revisions/commonrevision.go new file mode 100644 index 0000000000..15864fb26c --- /dev/null +++ b/internal/datastore/revisions/commonrevision.go @@ -0,0 +1,79 @@ +package revisions + +import ( + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" +) + +// RevisionKind is an enum of the different kinds of revisions that can be used. +type RevisionKind string + +const ( + // Timestamp is a revision that is a timestamp. + Timestamp RevisionKind = "timestamp" + + // TransactionID is a revision that is a transaction ID. + TransactionID = "txid" + + // HybridLogicalClock is a revision that is a hybrid logical clock. + HybridLogicalClock = "hlc" +) + +// ParsingFunc is a function that can parse a string into a revision. +type ParsingFunc func(revisionStr string) (rev datastore.Revision, err error) + +// RevisionParser returns a ParsingFunc for the given RevisionKind. +func RevisionParser(kind RevisionKind) ParsingFunc { + switch kind { + case TransactionID: + return parseTransactionIDRevisionString + + case Timestamp: + return parseTimestampRevisionString + + case HybridLogicalClock: + return parseHLCRevisionString + + default: + return func(revisionStr string) (rev datastore.Revision, err error) { + return nil, spiceerrors.MustBugf("unknown revision kind: %v", kind) + } + } +} + +// CommonDecoder is a revision decoder that can decode revisions of a given kind. +type CommonDecoder struct { + Kind RevisionKind +} + +func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) { + switch cd.Kind { + case TransactionID: + return parseTransactionIDRevisionString(s) + + case Timestamp: + return parseTimestampRevisionString(s) + + case HybridLogicalClock: + return parseHLCRevisionString(s) + + default: + return nil, spiceerrors.MustBugf("unknown revision kind in decoder: %v", cd.Kind) + } +} + +// WithInexactFloat64 is an interface that can be implemented by a revision to +// provide an inexact float64 representation of the revision. +type WithInexactFloat64 interface { + // WithInexactFloat64 returns a float64 that is an inexact representation of the + // revision. + InexactFloat64() float64 +} + +// WithTimestampRevision is an interface that can be implemented by a revision to +// provide a timestamp. +type WithTimestampRevision interface { + datastore.Revision + TimestampNanoSec() int64 + ConstructForTimestamp(timestampNanoSec int64) WithTimestampRevision +} diff --git a/internal/datastore/revisions/commonrevision_test.go b/internal/datastore/revisions/commonrevision_test.go new file mode 100644 index 0000000000..36d28f9d95 --- /dev/null +++ b/internal/datastore/revisions/commonrevision_test.go @@ -0,0 +1,248 @@ +package revisions + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +var kinds = map[RevisionKind]bool{Timestamp: false, TransactionID: false, HybridLogicalClock: true} + +func TestRevisionEqual(t *testing.T) { + tcs := []struct { + left string + right string + isEqual bool + }{ + { + "1", + "2", + false, + }, + { + "2", + "1", + false, + }, + { + "1", + "1", + true, + }, + { + "1.1", + "1", + false, + }, + { + "1", + "1.1", + false, + }, + { + "1.1", + "1.1", + true, + }, + } + + for _, tc := range tcs { + t.Run(tc.left+"-"+tc.right, func(t *testing.T) { + for kind, supportsDecimals := range kinds { + t.Run(string(kind), func(t *testing.T) { + if !supportsDecimals && strings.Contains(tc.left, ".") { + return + } + + if !supportsDecimals && strings.Contains(tc.right, ".") { + return + } + + parser := RevisionParser(kind) + + leftRev, err := parser(tc.left) + require.NoError(t, err) + + rightRev, err := parser(tc.right) + require.NoError(t, err) + + require.Equal(t, tc.isEqual, leftRev.Equal(rightRev)) + require.Equal(t, tc.isEqual, rightRev.Equal(leftRev)) + }) + } + }) + } +} + +func TestRevisionComparison(t *testing.T) { + tcs := []struct { + left string + right string + isLeftGreaterThan bool + }{ + { + "1", + "2", + false, + }, + { + "2", + "1", + true, + }, + { + "1", + "1", + false, + }, + { + "1.1", + "1", + true, + }, + { + "1", + "1.1", + false, + }, + { + "1.1", + "1.1", + false, + }, + } + + for _, tc := range tcs { + t.Run(tc.left+"-"+tc.right, func(t *testing.T) { + for kind, supportsDecimals := range kinds { + t.Run(string(kind), func(t *testing.T) { + if !supportsDecimals && strings.Contains(tc.left, ".") { + return + } + + if !supportsDecimals && strings.Contains(tc.right, ".") { + return + } + + parser := RevisionParser(kind) + + leftRev, err := parser(tc.left) + require.NoError(t, err) + + rightRev, err := parser(tc.right) + require.NoError(t, err) + + if leftRev.Equal(rightRev) { + require.False(t, tc.isLeftGreaterThan) + return + } + + require.Equal(t, tc.isLeftGreaterThan, leftRev.GreaterThan(rightRev)) + require.Equal(t, !tc.isLeftGreaterThan, !leftRev.GreaterThan(rightRev)) + + require.Equal(t, !tc.isLeftGreaterThan, leftRev.LessThan(rightRev)) + require.Equal(t, tc.isLeftGreaterThan, !leftRev.LessThan(rightRev)) + }) + } + }) + } +} + +func TestRevisionBidirectionalParsing(t *testing.T) { + tcs := []string{ + "1", "2", "42", "192747564535", "1.1", "1.2", "1.42", "-1235", + } + + for _, tc := range tcs { + t.Run(tc, func(t *testing.T) { + for kind := range kinds { + t.Run(string(kind), func(t *testing.T) { + parser := RevisionParser(kind) + parsed, err := parser(tc) + if err != nil { + return + } + + require.Equal(t, tc, parsed.String()) + }) + } + }) + } +} + +func TestTimestampRevisionParsing(t *testing.T) { + tcs := map[string]bool{ + "1": false, + "2": false, + "42": false, + "1257894000000000000": false, + "-1": false, + "1.23": true, + } + + for tc, expectError := range tcs { + t.Run(tc, func(t *testing.T) { + parser := RevisionParser(Timestamp) + parsed, err := parser(tc) + if expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc, parsed.String()) + }) + } +} + +func TestTransactionIDRevisionParsing(t *testing.T) { + tcs := map[string]bool{ + "1": false, + "2": false, + "42": false, + "1257894000000000000": false, + "-1": true, + "1.23": true, + } + + for tc, expectError := range tcs { + t.Run(tc, func(t *testing.T) { + parser := RevisionParser(TransactionID) + parsed, err := parser(tc) + if expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc, parsed.String()) + }) + } +} + +func TestHLCRevisionParsing(t *testing.T) { + tcs := map[string]bool{ + "1": false, + "2": false, + "42": false, + "1257894000000000000": false, + "-1": false, + "1.23": false, + "9223372036854775807.2": false, + } + + for tc, expectError := range tcs { + t.Run(tc, func(t *testing.T) { + parser := RevisionParser(HybridLogicalClock) + parsed, err := parser(tc) + if expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc, parsed.String()) + }) + } +} diff --git a/internal/datastore/revisions/hlcrevision.go b/internal/datastore/revisions/hlcrevision.go new file mode 100644 index 0000000000..d5cddd3fe1 --- /dev/null +++ b/internal/datastore/revisions/hlcrevision.go @@ -0,0 +1,102 @@ +package revisions + +import ( + "time" + + "github.com/shopspring/decimal" + + "github.com/authzed/spicedb/pkg/datastore" +) + +// HLCRevision is a revision that is a hybrid logical clock, stored as a decimal. +type HLCRevision struct { + decimal decimal.Decimal +} + +// parseHLCRevisionString parses a string into a hybrid logical clock revision. +func parseHLCRevisionString(revisionStr string) (datastore.Revision, error) { + parsed, err := decimal.NewFromString(revisionStr) + if err != nil { + return datastore.NoRevision, err + } + return HLCRevision{parsed}, nil +} + +// HLCRevisionFromString parses a string into a hybrid logical clock revision. +func HLCRevisionFromString(revisionStr string) (HLCRevision, error) { + parsed, err := decimal.NewFromString(revisionStr) + if err != nil { + return HLCRevision{decimal.Zero}, err + } + return HLCRevision{parsed}, nil +} + +// NewForHLC creates a new revision for the given hybrid logical clock. +func NewForHLC(decimal decimal.Decimal) HLCRevision { + return HLCRevision{decimal} +} + +// NewHLCForTime creates a new revision for the given time. +func NewHLCForTime(time time.Time) HLCRevision { + return HLCRevision{decimal.NewFromInt(time.UnixNano())} +} + +func (hlc HLCRevision) Equal(rhs datastore.Revision) bool { + if rhs == datastore.NoRevision { + return false + } + + rhsD := rhs.(HLCRevision) + return hlc.decimal.Equal(rhsD.decimal) +} + +func (hlc HLCRevision) GreaterThan(rhs datastore.Revision) bool { + if rhs == datastore.NoRevision { + rhs = HLCRevision{decimal.Zero} + } + + rhsD := rhs.(HLCRevision) + return hlc.decimal.GreaterThan(rhsD.decimal) +} + +func (hlc HLCRevision) LessThan(rhs datastore.Revision) bool { + if rhs == datastore.NoRevision { + rhs = HLCRevision{decimal.Zero} + } + + rhsD := rhs.(HLCRevision) + return hlc.decimal.LessThan(rhsD.decimal) +} + +func (HLCRevision) MarshalBinary() (data []byte, err error) { + panic("unimplemented") +} + +func (hlc HLCRevision) String() string { + return hlc.decimal.String() +} + +func (hlc HLCRevision) TimestampNanoSec() int64 { + return hlc.decimal.IntPart() +} + +func (hlc HLCRevision) InexactFloat64() float64 { + return float64(hlc.decimal.IntPart()) +} + +func (hlc HLCRevision) ConstructForTimestamp(timestamp int64) WithTimestampRevision { + return HLCRevision{decimal.NewFromInt(timestamp)} +} + +var _ datastore.Revision = HLCRevision{} +var _ WithTimestampRevision = HLCRevision{} + +// HLCKeyFunc is used to convert a simple HLC to an int64 for use in maps. +func HLCKeyFunc(r HLCRevision) int64 { + return r.TimestampNanoSec() +} + +// HLCKeyLessThanFunc is used to compare keys created by the HLCKeyFunc. +func HLCKeyLessThanFunc(lhs, rhs int64) bool { + return lhs < rhs +} diff --git a/internal/datastore/common/revisions/optimized.go b/internal/datastore/revisions/optimized.go similarity index 100% rename from internal/datastore/common/revisions/optimized.go rename to internal/datastore/revisions/optimized.go diff --git a/internal/datastore/common/revisions/optimized_test.go b/internal/datastore/revisions/optimized_test.go similarity index 93% rename from internal/datastore/common/revisions/optimized_test.go rename to internal/datastore/revisions/optimized_test.go index 2e0e7b1de6..34f310a80b 100644 --- a/internal/datastore/common/revisions/optimized_test.go +++ b/internal/datastore/revisions/optimized_test.go @@ -8,13 +8,11 @@ import ( "github.com/benbjohnson/clock" "github.com/samber/lo" - "github.com/shopspring/decimal" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) type trackingRevisionFunction struct { @@ -23,13 +21,13 @@ type trackingRevisionFunction struct { func (m *trackingRevisionFunction) optimizedRevisionFunc(_ context.Context) (datastore.Revision, time.Duration, error) { args := m.Called() - return args.Get(0).(revision.Decimal), args.Get(1).(time.Duration), args.Error(2) + return args.Get(0).(datastore.Revision), args.Get(1).(time.Duration), args.Error(2) } var ( - one = revision.NewFromDecimal(decimal.NewFromInt(1)) - two = revision.NewFromDecimal(decimal.NewFromInt(2)) - three = revision.NewFromDecimal(decimal.NewFromInt(3)) + one = NewForTransactionID(1) + two = NewForTransactionID(2) + three = NewForTransactionID(3) ) func cand(revs ...datastore.Revision) []datastore.Revision { @@ -193,7 +191,7 @@ func BenchmarkOptimizedRevisions(b *testing.B) { nowNS := time.Now().UnixNano() validForNS := nowNS % quantization.Nanoseconds() roundedNS := nowNS - validForNS - rev := revision.NewFromDecimal(decimal.NewFromInt(roundedNS)) + rev := NewForTransactionID(uint64(roundedNS)) return rev, time.Duration(validForNS) * time.Nanosecond, nil }) diff --git a/internal/datastore/common/revisions/remoteclock.go b/internal/datastore/revisions/remoteclock.go similarity index 73% rename from internal/datastore/common/revisions/remoteclock.go rename to internal/datastore/revisions/remoteclock.go index 635669b81d..bdf47a0c59 100644 --- a/internal/datastore/common/revisions/remoteclock.go +++ b/internal/datastore/revisions/remoteclock.go @@ -4,15 +4,13 @@ import ( "context" "time" - "github.com/shopspring/decimal" - log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" + "github.com/authzed/spicedb/pkg/spiceerrors" ) // RemoteNowFunction queries the datastore to get a current revision. -type RemoteNowFunction func(context.Context) (revision.Decimal, error) +type RemoteNowFunction func(context.Context) (datastore.Revision, error) // RemoteClockRevisions handles revision calculation for datastores that provide // their own clocks. @@ -42,12 +40,21 @@ func NewRemoteClockRevisions(gcWindow, maxRevisionStaleness, followerReadDelay, } func (rcr *RemoteClockRevisions) optimizedRevisionFunc(ctx context.Context) (datastore.Revision, time.Duration, error) { - nowHLC, err := rcr.nowFunc(ctx) + nowRev, err := rcr.nowFunc(ctx) if err != nil { - return revision.NoRevision, 0, err + return datastore.NoRevision, 0, err + } + + if nowRev == datastore.NoRevision { + return datastore.NoRevision, 0, datastore.NewInvalidRevisionErr(nowRev, datastore.CouldNotDetermineRevision) } - delayedNow := nowHLC.IntPart() - rcr.followerReadDelayNanos + nowTS, ok := nowRev.(WithTimestampRevision) + if !ok { + return datastore.NoRevision, 0, spiceerrors.MustBugf("expected with-timestamp revision, got %T", nowRev) + } + + delayedNow := nowTS.TimestampNanoSec() - rcr.followerReadDelayNanos quantized := delayedNow validForNanos := int64(0) if rcr.quantizationNanos > 0 { @@ -58,10 +65,10 @@ func (rcr *RemoteClockRevisions) optimizedRevisionFunc(ctx context.Context) (dat log.Ctx(ctx).Debug(). Time("quantized", time.Unix(0, quantized)). Int64("readSkew", rcr.followerReadDelayNanos). - Int64("totalSkew", nowHLC.IntPart()-quantized). + Int64("totalSkew", nowTS.TimestampNanoSec()-quantized). Msg("revision skews") - return revision.NewFromDecimal(decimal.NewFromInt(quantized)), time.Duration(validForNanos) * time.Nanosecond, nil + return nowTS.ConstructForTimestamp(quantized), time.Duration(validForNanos) * time.Nanosecond, nil } // SetNowFunc sets the function used to determine the head revision @@ -74,7 +81,7 @@ func (rcr *RemoteClockRevisions) CheckRevision(ctx context.Context, dsRevision d return datastore.NewInvalidRevisionErr(dsRevision, datastore.CouldNotDetermineRevision) } - revision := dsRevision.(revision.Decimal) + revision := dsRevision.(WithTimestampRevision) ctx, span := tracer.Start(ctx, "CheckRevision") defer span.End() @@ -85,8 +92,13 @@ func (rcr *RemoteClockRevisions) CheckRevision(ctx context.Context, dsRevision d return err } - nowNanos := now.IntPart() - revisionNanos := revision.IntPart() + nowTS, ok := now.(WithTimestampRevision) + if !ok { + return spiceerrors.MustBugf("expected HLC revision, got %T", now) + } + + nowNanos := nowTS.TimestampNanoSec() + revisionNanos := revision.TimestampNanoSec() isStale := revisionNanos < (nowNanos - rcr.gcWindowNanos) if isStale { diff --git a/internal/datastore/common/revisions/remoteclock_test.go b/internal/datastore/revisions/remoteclock_test.go similarity index 82% rename from internal/datastore/common/revisions/remoteclock_test.go rename to internal/datastore/revisions/remoteclock_test.go index e702e5fe45..6a7581fb59 100644 --- a/internal/datastore/common/revisions/remoteclock_test.go +++ b/internal/datastore/revisions/remoteclock_test.go @@ -6,11 +6,10 @@ import ( "time" "github.com/benbjohnson/clock" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" log "github.com/authzed/spicedb/internal/logging" - "github.com/authzed/spicedb/pkg/datastore/revision" + "github.com/authzed/spicedb/pkg/datastore" ) func TestRemoteClockOptimizedRevisions(t *testing.T) { @@ -80,20 +79,15 @@ func TestRemoteClockOptimizedRevisions(t *testing.T) { remoteClock := clock.NewMock() rcr.clockFn = remoteClock - rcr.SetNowFunc(func(ctx context.Context) (revision.Decimal, error) { + rcr.SetNowFunc(func(ctx context.Context) (datastore.Revision, error) { log.Debug().Stringer("now", remoteClock.Now()).Msg("current remote time") - return revision.NewFromDecimal( - decimal.NewFromInt(remoteClock.Now().UnixNano()), - ), nil + return NewForTime(remoteClock.Now()), nil }) for _, timeAndExpected := range tc.times { remoteClock.Set(time.Unix(timeAndExpected.unixTime, 0)) - expected := revision.NewFromDecimal( - decimal.NewFromInt(timeAndExpected.expected * 1_000_000_000), - ) - + expected := NewForTimestamp(timeAndExpected.expected * 1_000_000_000) optimized, err := rcr.OptimizedRevision(context.Background()) require.NoError(err) require.True( @@ -132,19 +126,14 @@ func TestRemoteClockCheckRevisions(t *testing.T) { remoteClock := clock.NewMock() rcr.clockFn = remoteClock - rcr.SetNowFunc(func(ctx context.Context) (revision.Decimal, error) { + rcr.SetNowFunc(func(ctx context.Context) (datastore.Revision, error) { log.Debug().Stringer("now", remoteClock.Now()).Msg("current remote time") - return revision.NewFromDecimal( - decimal.NewFromInt(remoteClock.Now().UnixNano()), - ), nil + return NewForTime(remoteClock.Now()), nil }) remoteClock.Set(time.Unix(tc.currentTime, 0)) - testRevision := revision.NewFromDecimal( - decimal.NewFromInt(tc.testRevisionSeconds * 1_000_000_000), - ) - + testRevision := NewForTimestamp(tc.testRevisionSeconds * 1_000_000_000) err := rcr.CheckRevision(context.Background(), testRevision) if tc.expectError { require.Error(err) diff --git a/internal/datastore/revisions/timestamprevision.go b/internal/datastore/revisions/timestamprevision.go new file mode 100644 index 0000000000..5490b1bcc9 --- /dev/null +++ b/internal/datastore/revisions/timestamprevision.go @@ -0,0 +1,81 @@ +package revisions + +import ( + "fmt" + "strconv" + "time" + + "github.com/authzed/spicedb/pkg/datastore" +) + +// TimestampRevision is a revision that is a timestamp. +type TimestampRevision int64 + +// NewForTime creates a new revision for the given time. +func NewForTime(time time.Time) TimestampRevision { + return TimestampRevision(time.UnixNano()) +} + +// NewForTimestamp creates a new revision for the given timestamp. +func NewForTimestamp(timestampNanosec int64) TimestampRevision { + return TimestampRevision(timestampNanosec) +} + +// parseTimestampRevisionString parses a string into a timestamp revision. +func parseTimestampRevisionString(revisionStr string) (rev datastore.Revision, err error) { + parsed, err := strconv.ParseInt(revisionStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid integer revision: %w", err) + } + + return TimestampRevision(parsed), nil +} + +func (ir TimestampRevision) Equal(other datastore.Revision) bool { + return int64(ir) == int64(other.(TimestampRevision)) +} + +func (ir TimestampRevision) GreaterThan(other datastore.Revision) bool { + return int64(ir) > int64(other.(TimestampRevision)) +} + +func (ir TimestampRevision) LessThan(other datastore.Revision) bool { + return int64(ir) < int64(other.(TimestampRevision)) +} + +func (TimestampRevision) MarshalBinary() (data []byte, err error) { + panic("unimplemented") +} + +func (ir TimestampRevision) TimestampNanoSec() int64 { + return int64(ir) +} + +func (ir TimestampRevision) String() string { + return strconv.FormatInt(int64(ir), 10) +} + +func (ir TimestampRevision) Time() time.Time { + return time.Unix(0, int64(ir)) +} + +func (ir TimestampRevision) WithInexactFloat64() float64 { + return float64(ir) +} + +func (ir TimestampRevision) ConstructForTimestamp(timestamp int64) WithTimestampRevision { + return TimestampRevision(timestamp) +} + +var _ datastore.Revision = TimestampRevision(0) +var _ WithTimestampRevision = TimestampRevision(0) + +// TimestampIDKeyFunc is used to create keys for timestamps. +func TimestampIDKeyFunc(r TimestampRevision) int64 { + return int64(r) +} + +// TimestampIDKeyLessThanFunc is used to create keys for timestamps. +func TimestampIDKeyLessThanFunc(l, r int64) bool { + return l < r +} diff --git a/internal/datastore/revisions/txidrevision.go b/internal/datastore/revisions/txidrevision.go new file mode 100644 index 0000000000..e0098af9f0 --- /dev/null +++ b/internal/datastore/revisions/txidrevision.go @@ -0,0 +1,66 @@ +package revisions + +import ( + "fmt" + "strconv" + + "github.com/authzed/spicedb/pkg/datastore" +) + +// TransactionIDRevision is a revision that is a transaction ID. +type TransactionIDRevision uint64 + +// NewForTransactionID creates a new revision for the given transaction ID. +func NewForTransactionID(transactionID uint64) TransactionIDRevision { + return TransactionIDRevision(transactionID) +} + +// parseTransactionIDRevisionString parses a string into a transaction ID revision. +func parseTransactionIDRevisionString(revisionStr string) (rev datastore.Revision, err error) { + parsed, err := strconv.ParseUint(revisionStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid integer revision: %w", err) + } + + return TransactionIDRevision(parsed), nil +} + +func (ir TransactionIDRevision) Equal(other datastore.Revision) bool { + return uint64(ir) == uint64(other.(TransactionIDRevision)) +} + +func (ir TransactionIDRevision) GreaterThan(other datastore.Revision) bool { + return uint64(ir) > uint64(other.(TransactionIDRevision)) +} + +func (ir TransactionIDRevision) LessThan(other datastore.Revision) bool { + return uint64(ir) < uint64(other.(TransactionIDRevision)) +} + +func (TransactionIDRevision) MarshalBinary() (data []byte, err error) { + panic("unimplemented") +} + +func (ir TransactionIDRevision) TransactionID() uint64 { + return uint64(ir) +} + +func (ir TransactionIDRevision) String() string { + return strconv.FormatInt(int64(ir), 10) +} + +func (ir TransactionIDRevision) WithInexactFloat64() float64 { + return float64(ir) +} + +var _ datastore.Revision = TransactionIDRevision(0) + +// TransactionIDKeyFunc is used to create keys for transaction IDs. +func TransactionIDKeyFunc(r TransactionIDRevision) uint64 { + return uint64(r) +} + +// TransactionIDKeyLessThanFunc is used to create keys for transaction IDs. +func TransactionIDKeyLessThanFunc(l, r uint64) bool { + return l < r +} diff --git a/internal/datastore/spanner/caveat.go b/internal/datastore/spanner/caveat.go index 32cbf4c5b0..33d2543010 100644 --- a/internal/datastore/spanner/caveat.go +++ b/internal/datastore/spanner/caveat.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/codes" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -32,7 +33,7 @@ func (sr spannerReader) ReadCaveatByName(ctx context.Context, name string) (*cor if err := loaded.UnmarshalVT(serialized); err != nil { return nil, datastore.NoRevision, err } - return loaded, revisionFromTimestamp(updated), nil + return loaded, revisions.NewForTime(updated), nil } func (sr spannerReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) { @@ -77,7 +78,7 @@ func (sr spannerReader) listCaveats(ctx context.Context, caveatNames []string) ( } caveats = append(caveats, datastore.RevisionedCaveat{ Definition: loaded, - LastWrittenRevision: revisionFromTimestamp(updated), + LastWrittenRevision: revisions.NewForTime(updated), }) return nil diff --git a/internal/datastore/spanner/reader.go b/internal/datastore/spanner/reader.go index 1642e182a4..8a34ae66e1 100644 --- a/internal/datastore/spanner/reader.go +++ b/internal/datastore/spanner/reader.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc/codes" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -147,7 +148,7 @@ func (sr spannerReader) ReadNamespaceByName(ctx context.Context, nsName string) return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) } - return ns, revisionFromTimestamp(updated), nil + return ns, revisions.NewForTime(updated), nil } func (sr spannerReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) { @@ -210,7 +211,7 @@ func readAllNamespaces(iter *spanner.RowIterator, span trace.Span) ([]datastore. allNamespaces = append(allNamespaces, datastore.RevisionedNamespace{ Definition: ns, - LastWrittenRevision: revisionFromTimestamp(updated), + LastWrittenRevision: revisions.NewForTime(updated), }) return nil diff --git a/internal/datastore/spanner/revisions.go b/internal/datastore/spanner/revisions.go index 0c7a3ae5d7..965bb553af 100644 --- a/internal/datastore/spanner/revisions.go +++ b/internal/datastore/spanner/revisions.go @@ -6,19 +6,20 @@ import ( "time" "cloud.google.com/go/spanner" - "github.com/shopspring/decimal" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) -func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (revision.Decimal, error) { +var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp) + +func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { now, err := sd.now(ctx) if err != nil { - return revision.NoRevision, fmt.Errorf(errRevision, err) + return datastore.NoRevision, fmt.Errorf(errRevision, err) } - return revisionFromTimestamp(now), nil + return revisions.NewForTime(now), nil } func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { @@ -35,11 +36,3 @@ func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) { return timestamp, nil } - -func revisionFromTimestamp(t time.Time) revision.Decimal { - return revision.NewFromDecimal(decimal.NewFromInt(t.UnixNano())) -} - -func timestampFromRevision(r revision.Decimal) time.Time { - return time.Unix(0, r.IntPart()) -} diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 99990f6058..2442f3e136 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -23,12 +23,11 @@ import ( "google.golang.org/grpc/codes" "github.com/authzed/spicedb/internal/datastore/common" - "github.com/authzed/spicedb/internal/datastore/common/revisions" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/internal/datastore/spanner/migrations" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -71,7 +70,7 @@ var ( type spannerDatastore struct { *revisions.RemoteClockRevisions - revision.DecimalDecoder + revisions.CommonDecoder client *spanner.Client config spannerOptions @@ -147,6 +146,9 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) ( config.followerReadDelay, config.revisionQuantization, ), + CommonDecoder: revisions.CommonDecoder{ + Kind: revisions.Timestamp, + }, client: client, config: config, database: database, @@ -188,10 +190,10 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) * } func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { - r := revisionRaw.(revision.Decimal) + r := revisionRaw.(revisions.TimestampRevision) txSource := func() readTX { - return &traceableRTX{delegate: sd.client.Single().WithTimestampBound(spanner.ReadTimestamp(timestampFromRevision(r)))} + return &traceableRTX{delegate: sd.client.Single().WithTimestampBound(spanner.ReadTimestamp(r.Time()))} } executor := common.QueryExecutor{Executor: queryExecutor(txSource)} return spannerReader{executor, txSource} @@ -237,7 +239,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF return datastore.NoRevision, err } - return revisionFromTimestamp(ts), nil + return revisions.NewForTime(ts), nil } func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index ffcb18a278..6f09ceab13 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -16,8 +16,8 @@ import ( "google.golang.org/api/option" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -101,7 +101,12 @@ func (sd spannerDatastore) watch( return } - afterRevision := afterRevisionRaw.(revision.Decimal) + afterRevision, ok := afterRevisionRaw.(revisions.TimestampRevision) + if !ok { + sendError(datastore.NewInvalidRevisionErr(afterRevisionRaw, datastore.CouldNotDetermineRevision)) + return + } + reader, err := changestreams.NewReaderWithConfig( ctx, project, @@ -109,7 +114,7 @@ func (sd spannerDatastore) watch( database, CombinedChangeStreamName, changestreams.Config{ - StartTimestamp: timestampFromRevision(afterRevision), + StartTimestamp: afterRevision.Time(), HeartbeatInterval: heartbeatInterval, SpannerClientOptions: []option.ClientOption{ option.WithCredentialsFile(sd.config.credentialsFilePath), @@ -132,10 +137,10 @@ func (sd spannerDatastore) watch( err = reader.Read(ctx, func(result *changestreams.ReadResult) error { // See: https://cloud.google.com/spanner/docs/change-streams/details for _, record := range result.ChangeRecords { - tracked := common.NewChanges(revision.DecimalKeyFunc, opts.Content) + tracked := common.NewChanges(revisions.TimestampIDKeyFunc, opts.Content) for _, dcr := range record.DataChangeRecords { - changeRevision := revisionFromTimestamp(dcr.CommitTimestamp) + changeRevision := revisions.NewForTime(dcr.CommitTimestamp) modType := dcr.ModType // options are INSERT, UPDATE, DELETE for _, mod := range dcr.Mods { @@ -273,7 +278,7 @@ func (sd spannerDatastore) watch( } if !tracked.IsEmpty() { - for _, revChange := range tracked.AsRevisionChanges(revision.DecimalKeyLessThanFunc) { + for _, revChange := range tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc) { revChange := revChange if !sendChange(&revChange) { return datastore.NewWatchDisconnectedErr() @@ -284,7 +289,7 @@ func (sd spannerDatastore) watch( if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { for _, hbr := range record.HeartbeatRecords { if !sendChange(&datastore.RevisionChanges{ - Revision: revisionFromTimestamp(hbr.Timestamp), + Revision: revisions.NewForTime(hbr.Timestamp), IsCheckpoint: true, }) { return datastore.NewWatchDisconnectedErr() diff --git a/internal/dispatch/graph/check_test.go b/internal/dispatch/graph/check_test.go index fa147a254d..a25fe7469e 100644 --- a/internal/dispatch/graph/check_test.go +++ b/internal/dispatch/graph/check_test.go @@ -169,7 +169,6 @@ func TestMaxDepth(t *testing.T) { revision, err := common.UpdateTuplesInDatastore(ctx, ds, mutation) require.NoError(err) - require.True(revision.GreaterThan(datastore.NoRevision)) dispatch := NewLocalOnlyDispatcher(10) diff --git a/internal/dispatch/graph/expand_test.go b/internal/dispatch/graph/expand_test.go index 04b10aaa49..466e83f482 100644 --- a/internal/dispatch/graph/expand_test.go +++ b/internal/dispatch/graph/expand_test.go @@ -20,7 +20,6 @@ import ( expand "github.com/authzed/spicedb/internal/graph" datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/internal/testfixtures" - "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/graph" core "github.com/authzed/spicedb/pkg/proto/core/v1" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" @@ -290,7 +289,6 @@ func TestMaxDepthExpand(t *testing.T) { revision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) require.NoError(err) - require.True(revision.GreaterThan(datastore.NoRevision)) require.NoError(datastoremw.SetInContext(ctx, ds)) dispatch := NewLocalOnlyDispatcher(10) diff --git a/internal/dispatch/graph/lookupsubjects_test.go b/internal/dispatch/graph/lookupsubjects_test.go index c0ce9025f5..88cd8c4bb4 100644 --- a/internal/dispatch/graph/lookupsubjects_test.go +++ b/internal/dispatch/graph/lookupsubjects_test.go @@ -17,7 +17,6 @@ import ( datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/internal/testfixtures" itestutil "github.com/authzed/spicedb/internal/testutil" - "github.com/authzed/spicedb/pkg/datastore" corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/tuple" @@ -207,7 +206,6 @@ func TestLookupSubjectsMaxDepth(t *testing.T) { tpl := tuple.Parse("folder:oops#owner@folder:oops#owner") revision, err := common.WriteTuples(ctx, ds, corev1.RelationTupleUpdate_CREATE, tpl) require.NoError(err) - require.True(revision.GreaterThan(datastore.NoRevision)) dis := NewLocalOnlyDispatcher(10) stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupSubjectsResponse](ctx) diff --git a/internal/graph/computed/computecheck_test.go b/internal/graph/computed/computecheck_test.go index b867206a02..36623db76b 100644 --- a/internal/graph/computed/computecheck_test.go +++ b/internal/graph/computed/computecheck_test.go @@ -14,7 +14,6 @@ import ( datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/pkg/caveats/types" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" core "github.com/authzed/spicedb/pkg/proto/core/v1" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/schemadsl/compiler" @@ -867,7 +866,7 @@ func TestComputeCheckError(t *testing.T) { }, Subject: &core.ObjectAndRelation{}, CaveatContext: nil, - AtRevision: revision.NoRevision, + AtRevision: datastore.NoRevision, MaximumDepth: 50, DebugOption: computed.BasicDebuggingEnabled, }, diff --git a/internal/middleware/consistency/consistency_test.go b/internal/middleware/consistency/consistency_test.go index 236df1d9b7..58a2555246 100644 --- a/internal/middleware/consistency/consistency_test.go +++ b/internal/middleware/consistency/consistency_test.go @@ -6,21 +6,20 @@ import ( "testing" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/cursor" - "github.com/authzed/spicedb/pkg/datastore/revision" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/zedtoken" ) var ( - zero = revision.NewFromDecimal(decimal.NewFromInt(0)) - optimized = revision.NewFromDecimal(decimal.NewFromInt(100)) - exact = revision.NewFromDecimal(decimal.NewFromInt(123)) - head = revision.NewFromDecimal(decimal.NewFromInt(145)) + zero = revisions.NewForTransactionID(0) + optimized = revisions.NewForTransactionID(100) + exact = revisions.NewForTransactionID(123) + head = revisions.NewForTransactionID(145) ) func TestAddRevisionToContextNoneSupplied(t *testing.T) { diff --git a/internal/services/v1/preconditions_test.go b/internal/services/v1/preconditions_test.go index 31690b00a3..243a9eab48 100644 --- a/internal/services/v1/preconditions_test.go +++ b/internal/services/v1/preconditions_test.go @@ -27,8 +27,7 @@ func TestPreconditions(t *testing.T) { uninitialized, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) - ds, revision := testfixtures.StandardDatastoreWithData(uninitialized, require) - require.True(revision.GreaterThan(datastore.NoRevision)) + ds, _ := testfixtures.StandardDatastoreWithData(uninitialized, require) ctx := context.Background() _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { diff --git a/pkg/cursor/cursor_test.go b/pkg/cursor/cursor_test.go index 9f8f142cb1..9013fb51df 100644 --- a/pkg/cursor/cursor_test.go +++ b/pkg/cursor/cursor_test.go @@ -5,22 +5,22 @@ import ( "testing" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" - "github.com/authzed/spicedb/pkg/datastore/revision" + "github.com/authzed/spicedb/internal/datastore/revisions" + "github.com/authzed/spicedb/pkg/datastore" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" ) var ( - revision1 = decimal.NewFromInt(1) - revision2 = decimal.NewFromInt(2) + revision1 = revisions.NewForTransactionID(1) + revision2 = revisions.NewForTransactionID(2) ) func TestEncodeDecode(t *testing.T) { for _, tc := range []struct { name string - revision decimal.Decimal + revision datastore.Revision sections []string hash string }{ @@ -48,7 +48,7 @@ func TestEncodeDecode(t *testing.T) { require := require.New(t) encoded, err := EncodeFromDispatchCursor(&dispatch.Cursor{ Sections: tc.sections, - }, tc.hash, revision.NewFromDecimal(tc.revision)) + }, tc.hash, tc.revision) require.NoError(err) require.NotNil(encoded) @@ -58,10 +58,12 @@ func TestEncodeDecode(t *testing.T) { require.Equal(tc.sections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(encoded, revision.DecimalDecoder{}) + decodedRev, err := DecodeToDispatchRevision(encoded, revisions.CommonDecoder{ + Kind: revisions.TransactionID, + }) require.NoError(err) require.NotNil(decodedRev) - require.Equal(revision.NewFromDecimal(tc.revision), decodedRev) + require.Equal(tc.revision, decodedRev) }) } } @@ -70,7 +72,7 @@ func TestDecode(t *testing.T) { for _, testCase := range []struct { name string token string - expectedRevision decimal.Decimal + expectedRevision datastore.Revision expectedSections []string expectedHash string expectError bool @@ -78,7 +80,7 @@ func TestDecode(t *testing.T) { { name: "invalid", token: "abc", - expectedRevision: decimal.Zero, + expectedRevision: datastore.NoRevision, expectedSections: []string{}, expectedHash: "", expectError: true, @@ -136,11 +138,13 @@ func TestDecode(t *testing.T) { decodedRev, err := DecodeToDispatchRevision(&v1.Cursor{ Token: testCase.token, - }, revision.DecimalDecoder{}) + }, revisions.CommonDecoder{ + Kind: revisions.TransactionID, + }) require.NoError(err) require.True( - revision.NewFromDecimal(testCase.expectedRevision).Equal(decodedRev), + testCase.expectedRevision.Equal(decodedRev), "%s != %s", testCase.expectedRevision, decodedRev, diff --git a/pkg/datastore/revision/decimal.go b/pkg/datastore/revision/decimal.go deleted file mode 100644 index d2704c2b2d..0000000000 --- a/pkg/datastore/revision/decimal.go +++ /dev/null @@ -1,75 +0,0 @@ -package revision - -import ( - "github.com/shopspring/decimal" - - "github.com/authzed/spicedb/pkg/datastore" -) - -type Decimal struct { - decimal.Decimal -} - -var NoRevision Decimal - -func NewFromDecimal(d decimal.Decimal) Decimal { - return Decimal{d} -} - -func (d Decimal) Equal(rhs datastore.Revision) bool { - if rhs == datastore.NoRevision { - return false - } - - rhsD := rhs.(Decimal) - - return d.Decimal.Equal(rhsD.Decimal) -} - -func (d Decimal) GreaterThan(rhs datastore.Revision) bool { - if rhs == datastore.NoRevision { - rhs = Decimal{decimal.Zero} - } - - rhsD := rhs.(Decimal) - - return d.Decimal.GreaterThan(rhsD.Decimal) -} - -func (d Decimal) LessThan(rhs datastore.Revision) bool { - if rhs == datastore.NoRevision { - rhs = Decimal{decimal.Zero} - } - - rhsD := rhs.(Decimal) - - return d.Decimal.LessThan(rhsD.Decimal) -} - -var _ datastore.Revision = Decimal{} - -type DecimalDecoder struct{} - -func (DecimalDecoder) RevisionFromString(s string) (datastore.Revision, error) { - parsed, err := decimal.NewFromString(s) - if err != nil { - return datastore.NoRevision, err - } - return Decimal{parsed}, nil -} - -// DecimalKeyFunc is used to convert a simple Decimal to an int64 for use in maps. -func DecimalKeyFunc(r Decimal) int64 { - return r.IntPart() -} - -// DecimalKeyLessThanFunc is used to compare keys created by the DecimalKeyFunc. -func DecimalKeyLessThanFunc(lhs, rhs int64) bool { - return lhs < rhs -} - -// ParseRevisionString parses a decimal revision string into a revision. -func ParseRevisionString(revisionStr string) (rev datastore.Revision, err error) { - dd := DecimalDecoder{} - return dd.RevisionFromString(revisionStr) -} diff --git a/pkg/datastore/revisionparsing/revisionparsing.go b/pkg/datastore/revisionparsing/revisionparsing.go index 38eb2bc66d..a6188a6720 100644 --- a/pkg/datastore/revisionparsing/revisionparsing.go +++ b/pkg/datastore/revisionparsing/revisionparsing.go @@ -5,20 +5,16 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/datastore/mysql" "github.com/authzed/spicedb/internal/datastore/postgres" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/internal/datastore/spanner" - "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) -// ParsingFunc defines a type representing parsing a revision string into a revision. -type ParsingFunc func(revisionStr string) (rev datastore.Revision, err error) - // ParseRevisionStringByDatastoreEngineID defines a map from datastore engine ID to its associated // revision parsing function. -var ParseRevisionStringByDatastoreEngineID = map[string]ParsingFunc{ - memdb.Engine: revision.ParseRevisionString, - crdb.Engine: revision.ParseRevisionString, +var ParseRevisionStringByDatastoreEngineID = map[string]revisions.ParsingFunc{ + memdb.Engine: memdb.ParseRevisionString, + crdb.Engine: crdb.ParseRevisionString, postgres.Engine: postgres.ParseRevisionString, - mysql.Engine: revision.ParseRevisionString, - spanner.Engine: revision.ParseRevisionString, + mysql.Engine: mysql.ParseRevisionString, + spanner.Engine: spanner.ParseRevisionString, } diff --git a/pkg/datastore/test/caveat.go b/pkg/datastore/test/caveat.go index 98babc3752..c50af406a6 100644 --- a/pkg/datastore/test/caveat.go +++ b/pkg/datastore/test/caveat.go @@ -34,7 +34,6 @@ func CaveatNotFoundTest(t *testing.T, tester DatastoreTester) { startRevision, err := ds.HeadRevision(ctx) require.NoError(err) - require.True(startRevision.GreaterThan(datastore.NoRevision)) _, _, err = ds.SnapshotReader(startRevision).ReadCaveatByName(ctx, "unknown") require.True(errors.As(err, &datastore.ErrCaveatNameNotFound{})) @@ -66,12 +65,11 @@ func WriteReadDeleteCaveatTest(t *testing.T, tester DatastoreTester) { // The caveat can be looked up by name cr := ds.SnapshotReader(rev) - cv, readRev, err := cr.ReadCaveatByName(ctx, coreCaveat.Name) + cv, _, err := cr.ReadCaveatByName(ctx, coreCaveat.Name) req.NoError(err) foundDiff := cmp.Diff(coreCaveat, cv, protocmp.Transform()) req.Empty(foundDiff) - req.True(readRev.GreaterThan(datastore.NoRevision)) // All caveats can be listed when no arg is provided // Manually check the caveat's contents. @@ -265,17 +263,15 @@ func CaveatSnapshotReadsTest(t *testing.T, tester DatastoreTester) { // check most recent revision cr := ds.SnapshotReader(newRev) - cv, fetchedRev, err := cr.ReadCaveatByName(ctx, coreCaveat.Name) + cv, _, err := cr.ReadCaveatByName(ctx, coreCaveat.Name) req.NoError(err) req.Equal(newExpression, cv.SerializedExpression) - req.True(fetchedRev.GreaterThan(datastore.NoRevision)) // check previous revision cr = ds.SnapshotReader(oldRev) - cv, fetchedRev, err = cr.ReadCaveatByName(ctx, coreCaveat.Name) + cv, _, err = cr.ReadCaveatByName(ctx, coreCaveat.Name) req.NoError(err) req.Equal(oldExpression, cv.SerializedExpression) - req.True(fetchedRev.GreaterThan(datastore.NoRevision)) } func CaveatedRelationshipWatchTest(t *testing.T, tester DatastoreTester) { diff --git a/pkg/datastore/test/namespace.go b/pkg/datastore/test/namespace.go index ca08bd6991..b3daacfe31 100644 --- a/pkg/datastore/test/namespace.go +++ b/pkg/datastore/test/namespace.go @@ -42,7 +42,6 @@ func NamespaceNotFoundTest(t *testing.T, tester DatastoreTester) { startRevision, err := ds.HeadRevision(ctx) require.NoError(err) - require.True(startRevision.GreaterThan(datastore.NoRevision)) _, _, err = ds.SnapshotReader(startRevision).ReadNamespaceByName(ctx, "unknown") require.True(errors.As(err, &datastore.ErrNamespaceNotFound{})) @@ -60,7 +59,6 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) { startRevision, err := ds.HeadRevision(ctx) require.NoError(err) - require.True(startRevision.GreaterThan(datastore.NoRevision)) nsDefs, err := ds.SnapshotReader(startRevision).ListAllNamespaces(ctx) require.NoError(err) diff --git a/pkg/datastore/test/revisions.go b/pkg/datastore/test/revisions.go index ec80eef406..bda389e35c 100644 --- a/pkg/datastore/test/revisions.go +++ b/pkg/datastore/test/revisions.go @@ -37,7 +37,6 @@ func RevisionQuantizationTest(t *testing.T, tester DatastoreTester) { ctx := context.Background() veryFirstRevision, err := ds.OptimizedRevision(ctx) require.NoError(err) - require.True(veryFirstRevision.GreaterThan(datastore.NoRevision)) postSetupRevision := setupDatastore(ds, require) require.True(postSetupRevision.GreaterThan(veryFirstRevision)) @@ -54,7 +53,6 @@ func RevisionQuantizationTest(t *testing.T, tester DatastoreTester) { // Get the new now revision nowRevision, err := ds.HeadRevision(ctx) require.NoError(err) - require.True(nowRevision.GreaterThan(datastore.NoRevision)) // Let the quantization window expire time.Sleep(tc.quantizationRange) diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index 50c18a4f20..b59b549638 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -201,7 +201,6 @@ func WatchCancelTest(t *testing.T, tester DatastoreTester) { protocmp.Transform(), ) require.Empty(foundDiff) - require.True(created.Revision.GreaterThan(datastore.NoRevision)) } else { errWait := time.NewTimer(waitForChangesTimeout) require.Zero(created) diff --git a/pkg/zedtoken/zedtoken.go b/pkg/zedtoken/zedtoken.go index 8eedfbc149..797b3e184b 100644 --- a/pkg/zedtoken/zedtoken.go +++ b/pkg/zedtoken/zedtoken.go @@ -7,10 +7,8 @@ import ( "fmt" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/shopspring/decimal" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" zedtoken "github.com/authzed/spicedb/pkg/proto/impl/v1" ) @@ -87,7 +85,13 @@ func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revisio switch ver := decoded.VersionOneof.(type) { case *zedtoken.DecodedZedToken_DeprecatedV1Zookie: - return revision.NewFromDecimal(decimal.NewFromInt(int64(ver.DeprecatedV1Zookie.Revision))), nil + revString := fmt.Sprintf("%d", ver.DeprecatedV1Zookie.Revision) + parsed, err := ds.RevisionFromString(revString) + if err != nil { + return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + } + return parsed, nil + case *zedtoken.DecodedZedToken_V1: parsed, err := ds.RevisionFromString(ver.V1.Revision) if err != nil { diff --git a/pkg/zedtoken/zedtoken_test.go b/pkg/zedtoken/zedtoken_test.go index 7d067b7dbe..7737d54320 100644 --- a/pkg/zedtoken/zedtoken_test.go +++ b/pkg/zedtoken/zedtoken_test.go @@ -8,22 +8,24 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/require" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/revision" ) var encodeRevisionTests = []datastore.Revision{ - revision.NewFromDecimal(decimal.Zero), - revision.NewFromDecimal(decimal.NewFromInt(1)), - revision.NewFromDecimal(decimal.NewFromInt(2)), - revision.NewFromDecimal(decimal.NewFromInt(4)), - revision.NewFromDecimal(decimal.NewFromInt(8)), - revision.NewFromDecimal(decimal.NewFromInt(16)), - revision.NewFromDecimal(decimal.NewFromInt(128)), - revision.NewFromDecimal(decimal.NewFromInt(256)), - revision.NewFromDecimal(decimal.NewFromInt(1621538189028928000)), - revision.NewFromDecimal(decimal.New(12345, -2)), - revision.NewFromDecimal(decimal.New(0, -10)), + revisions.NewForTransactionID(1), + revisions.NewForTransactionID(2), + revisions.NewForTransactionID(4), + revisions.NewForTransactionID(8), + revisions.NewForTransactionID(16), + revisions.NewForTransactionID(128), + revisions.NewForTransactionID(256), + revisions.NewForTransactionID(1621538189028928000), +} + +var encodeHLCRevisionTests = []datastore.Revision{ + revisions.NewForHLC(decimal.New(12345, -2)), + revisions.NewForHLC(decimal.New(0, -10)), } func TestZedTokenEncode(t *testing.T) { @@ -34,7 +36,26 @@ func TestZedTokenEncode(t *testing.T) { encoded, err := NewFromRevision(rev) require.NoError(err) - decoded, err := DecodeRevision(encoded, revision.DecimalDecoder{}) + decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + Kind: revisions.TransactionID, + }) + require.NoError(err) + require.True(rev.Equal(decoded)) + }) + } +} + +func TestZedTokenEncodeHLC(t *testing.T) { + for _, rev := range encodeHLCRevisionTests { + rev := rev + t.Run(rev.String(), func(t *testing.T) { + require := require.New(t) + encoded, err := NewFromRevision(rev) + require.NoError(err) + + decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + Kind: revisions.HybridLogicalClock, + }) require.NoError(err) require.True(rev.Equal(decoded)) }) @@ -44,91 +65,126 @@ func TestZedTokenEncode(t *testing.T) { var decodeTests = []struct { format string token string - expectedRevision decimal.Decimal + expectedRevision datastore.Revision expectError bool }{ { format: "invalid", token: "abc", - expectedRevision: decimal.Zero, + expectedRevision: datastore.NoRevision, expectError: true, }, { format: "V1 Zookie", token: "CAESAA==", - expectedRevision: decimal.Zero, + expectedRevision: revisions.NewForTransactionID(0), expectError: false, }, { format: "V1 Zookie", token: "CAESAggB", - expectedRevision: decimal.NewFromInt(1), + expectedRevision: revisions.NewForTransactionID(1), expectError: false, }, { format: "V1 Zookie", token: "CAESAggC", - expectedRevision: decimal.NewFromInt(2), + expectedRevision: revisions.NewForTransactionID(2), expectError: false, }, { format: "V1 Zookie", token: "CAESAwiAAg==", - expectedRevision: decimal.NewFromInt(256), + expectedRevision: revisions.NewForTransactionID(256), expectError: false, }, { format: "V1 Zookie", token: "CAIaAwoBMA==", - expectedRevision: decimal.Zero, + expectedRevision: revisions.NewForTransactionID(0), expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMQ==", - expectedRevision: decimal.NewFromInt(1), + expectedRevision: revisions.NewForTransactionID(1), expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMg==", - expectedRevision: decimal.NewFromInt(2), + expectedRevision: revisions.NewForTransactionID(2), expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBNA==", - expectedRevision: decimal.NewFromInt(4), + expectedRevision: revisions.NewForTransactionID(4), expectError: false, }, +} + +func TestDecode(t *testing.T) { + for _, testCase := range decodeTests { + testCase := testCase + testName := fmt.Sprintf("%s(%s)=>%s", testCase.format, testCase.token, testCase.expectedRevision) + t.Run(testName, func(t *testing.T) { + require := require.New(t) + + decoded, err := DecodeRevision(&v1.ZedToken{ + Token: testCase.token, + }, revisions.CommonDecoder{ + Kind: revisions.TransactionID, + }) + if testCase.expectError { + require.Error(err) + } else { + require.NoError(err) + require.True( + testCase.expectedRevision.Equal(decoded), + "%s != %s", + testCase.expectedRevision, + decoded, + ) + } + }) + } +} + +var hlcDecodeTests = []struct { + format string + token string + expectedRevision datastore.Revision + expectError bool +}{ { format: "V1 ZedToken", token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", - expectedRevision: decimal.NewFromInt(1621538189028928000), + expectedRevision: revisions.NewForHLC(decimal.NewFromInt(1621538189028928000)), expectError: false, }, { format: "V1 ZedToken", token: "CAIaCAoGMTIzLjQ1", - expectedRevision: decimal.New(12345, -2), + expectedRevision: revisions.NewForHLC(decimal.New(12345, -2)), expectError: false, }, { format: "V1 ZedToken", token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", - expectedRevision: (func() decimal.Decimal { + expectedRevision: (func() datastore.Revision { v, err := decimal.NewFromString("1693540940373045727.0000000001") if err != nil { panic(err) } - return v + return revisions.NewForHLC(v) })(), expectError: false, }, } -func TestDecode(t *testing.T) { - for _, testCase := range decodeTests { +func TestHLCDecode(t *testing.T) { + for _, testCase := range hlcDecodeTests { testCase := testCase testName := fmt.Sprintf("%s(%s)=>%s", testCase.format, testCase.token, testCase.expectedRevision) t.Run(testName, func(t *testing.T) { @@ -136,13 +192,15 @@ func TestDecode(t *testing.T) { decoded, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, - }, revision.DecimalDecoder{}) + }, revisions.CommonDecoder{ + Kind: revisions.HybridLogicalClock, + }) if testCase.expectError { require.Error(err) } else { require.NoError(err) require.True( - revision.NewFromDecimal(testCase.expectedRevision).Equal(decoded), + testCase.expectedRevision.Equal(decoded), "%s != %s", testCase.expectedRevision, decoded, From c79673decd04dc05e8b33c3b216c1f813ecb3c85 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 22 Dec 2023 12:01:43 -0500 Subject: [PATCH 2/4] Remove encoding.BinaryMarshaler from datastore.Revision as it appears to no longer be used --- internal/datastore/revisions/hlcrevision.go | 4 ---- internal/datastore/revisions/timestamprevision.go | 4 ---- internal/datastore/revisions/txidrevision.go | 4 ---- pkg/datastore/datastore.go | 7 ------- 4 files changed, 19 deletions(-) diff --git a/internal/datastore/revisions/hlcrevision.go b/internal/datastore/revisions/hlcrevision.go index d5cddd3fe1..54214cb975 100644 --- a/internal/datastore/revisions/hlcrevision.go +++ b/internal/datastore/revisions/hlcrevision.go @@ -68,10 +68,6 @@ func (hlc HLCRevision) LessThan(rhs datastore.Revision) bool { return hlc.decimal.LessThan(rhsD.decimal) } -func (HLCRevision) MarshalBinary() (data []byte, err error) { - panic("unimplemented") -} - func (hlc HLCRevision) String() string { return hlc.decimal.String() } diff --git a/internal/datastore/revisions/timestamprevision.go b/internal/datastore/revisions/timestamprevision.go index 5490b1bcc9..5c20e31b6e 100644 --- a/internal/datastore/revisions/timestamprevision.go +++ b/internal/datastore/revisions/timestamprevision.go @@ -43,10 +43,6 @@ func (ir TimestampRevision) LessThan(other datastore.Revision) bool { return int64(ir) < int64(other.(TimestampRevision)) } -func (TimestampRevision) MarshalBinary() (data []byte, err error) { - panic("unimplemented") -} - func (ir TimestampRevision) TimestampNanoSec() int64 { return int64(ir) } diff --git a/internal/datastore/revisions/txidrevision.go b/internal/datastore/revisions/txidrevision.go index e0098af9f0..1b12046271 100644 --- a/internal/datastore/revisions/txidrevision.go +++ b/internal/datastore/revisions/txidrevision.go @@ -37,10 +37,6 @@ func (ir TransactionIDRevision) LessThan(other datastore.Revision) bool { return uint64(ir) < uint64(other.(TransactionIDRevision)) } -func (TransactionIDRevision) MarshalBinary() (data []byte, err error) { - panic("unimplemented") -} - func (ir TransactionIDRevision) TransactionID() uint64 { return uint64(ir) } diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index f19b44ae86..f7426eeef4 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -2,7 +2,6 @@ package datastore import ( "context" - "encoding" "fmt" "sort" "strings" @@ -16,7 +15,6 @@ import ( "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" - "github.com/authzed/spicedb/pkg/spiceerrors" ) var Engines []string @@ -546,7 +544,6 @@ type RelationshipIterator interface { // each datastore implementation. type Revision interface { fmt.Stringer - encoding.BinaryMarshaler // Equal returns whether the revisions should be considered equal. Equal(Revision) bool @@ -576,10 +573,6 @@ func (nilRevision) String() string { return "nil" } -func (nilRevision) MarshalBinary() ([]byte, error) { - return nil, spiceerrors.MustBugf("the nil revision should never be serialized") -} - // NoRevision is a zero type for the revision that will make changing the // revision type in the future a bit easier if necessary. Implementations // should use any time they want to signal an empty/error revision. From 84a90d3750a83ecb9896fc8fcb954a81a6157131 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 22 Dec 2023 13:45:50 -0500 Subject: [PATCH 3/4] Switch to using CRDB's decimal library, which reduce memory --- go.mod | 1 + go.sum | 2 + internal/datastore/common/changes_test.go | 78 +++++++++++++ internal/datastore/revisions/hlcrevision.go | 98 +++++++++++----- .../datastore/revisions/hlcrevision_test.go | 107 ++++++++++++++++++ .../datastore/revisions/timestamprevision.go | 6 +- 6 files changed, 260 insertions(+), 32 deletions(-) create mode 100644 internal/datastore/revisions/hlcrevision_test.go diff --git a/go.mod b/go.mod index 18596ccae4..d0ddef34e0 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/xxhash/v2 v2.2.0 github.com/cloudspannerecosystem/spanner-change-streams-tail v0.3.1 + github.com/cockroachdb/apd v1.1.0 github.com/creasty/defaults v1.7.0 github.com/dalzilio/rudd v1.1.1-0.20230806153452-9e08a6ea8170 github.com/dlmiddlecote/sqlstats v1.0.2 diff --git a/go.sum b/go.sum index b899bd89de..ba9bcb4620 100644 --- a/go.sum +++ b/go.sum @@ -185,6 +185,8 @@ github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= diff --git a/internal/datastore/common/changes_test.go b/internal/datastore/common/changes_test.go index 5ba2da2f5a..4fe4a652e3 100644 --- a/internal/datastore/common/changes_test.go +++ b/internal/datastore/common/changes_test.go @@ -404,6 +404,84 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { require.True(t, ch.IsEmpty()) } +func TestHLCOrdering(t *testing.T) { + ctx := context.Background() + + ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema) + require.True(t, ch.IsEmpty()) + + rev1, err := revisions.HLCRevisionFromString("1.1") + require.NoError(t, err) + + rev0, err := revisions.HLCRevisionFromString("1.0") + require.NoError(t, err) + + err = ch.AddRelationshipChange(ctx, rev1, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_DELETE) + require.NoError(t, err) + + err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH) + require.NoError(t, err) + + remaining := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc) + require.Equal(t, 2, len(remaining)) + + require.Equal(t, []datastore.RevisionChanges{ + { + Revision: rev0, + RelationshipChanges: []*core.RelationTupleUpdate{ + tuple.Touch(tuple.MustParse("document:foo#viewer@user:tom")), + }, + DeletedNamespaces: []string{}, + DeletedCaveats: []string{}, + ChangedDefinitions: []datastore.SchemaDefinition{}, + }, + { + Revision: rev1, + RelationshipChanges: []*core.RelationTupleUpdate{ + tuple.Delete(tuple.MustParse("document:foo#viewer@user:tom")), + }, + DeletedNamespaces: []string{}, + DeletedCaveats: []string{}, + ChangedDefinitions: []datastore.SchemaDefinition{}, + }, + }, remaining) +} + +func TestHLCSameRevision(t *testing.T) { + ctx := context.Background() + + ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema) + require.True(t, ch.IsEmpty()) + + rev0, err := revisions.HLCRevisionFromString("1.0") + require.NoError(t, err) + + rev0again, err := revisions.HLCRevisionFromString("1.0") + require.NoError(t, err) + + err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH) + require.NoError(t, err) + + err = ch.AddRelationshipChange(ctx, rev0again, tuple.MustParse("document:foo#viewer@user:sarah"), core.RelationTupleUpdate_TOUCH) + require.NoError(t, err) + + remaining := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc) + require.Equal(t, 1, len(remaining)) + + require.Equal(t, []datastore.RevisionChanges{ + { + Revision: rev0, + RelationshipChanges: []*core.RelationTupleUpdate{ + tuple.Touch(tuple.MustParse("document:foo#viewer@user:tom")), + tuple.Touch(tuple.MustParse("document:foo#viewer@user:sarah")), + }, + DeletedNamespaces: []string{}, + DeletedCaveats: []string{}, + ChangedDefinitions: []datastore.SchemaDefinition{}, + }, + }, remaining) +} + func TestCanonicalize(t *testing.T) { testCases := []struct { name string diff --git a/internal/datastore/revisions/hlcrevision.go b/internal/datastore/revisions/hlcrevision.go index 54214cb975..5a093250c8 100644 --- a/internal/datastore/revisions/hlcrevision.go +++ b/internal/datastore/revisions/hlcrevision.go @@ -3,96 +3,134 @@ package revisions import ( "time" + "github.com/cockroachdb/apd" "github.com/shopspring/decimal" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" ) +var zeroHLC = HLCRevision(*apd.New(0, 0)) + // HLCRevision is a revision that is a hybrid logical clock, stored as a decimal. -type HLCRevision struct { - decimal decimal.Decimal -} +type HLCRevision apd.Decimal // parseHLCRevisionString parses a string into a hybrid logical clock revision. func parseHLCRevisionString(revisionStr string) (datastore.Revision, error) { - parsed, err := decimal.NewFromString(revisionStr) + parsed, _, err := apd.NewFromString(revisionStr) if err != nil { return datastore.NoRevision, err } - return HLCRevision{parsed}, nil + if parsed == nil { + return datastore.NoRevision, spiceerrors.MustBugf("got nil parsed HLC") + } + return HLCRevision(*parsed), nil } // HLCRevisionFromString parses a string into a hybrid logical clock revision. func HLCRevisionFromString(revisionStr string) (HLCRevision, error) { - parsed, err := decimal.NewFromString(revisionStr) + parsed, _, err := apd.NewFromString(revisionStr) if err != nil { - return HLCRevision{decimal.Zero}, err + return zeroHLC, err } - return HLCRevision{parsed}, nil + if parsed == nil { + return zeroHLC, spiceerrors.MustBugf("got nil parsed HLC") + } + return HLCRevision(*parsed), nil } // NewForHLC creates a new revision for the given hybrid logical clock. func NewForHLC(decimal decimal.Decimal) HLCRevision { - return HLCRevision{decimal} + return HLCRevision(*apd.NewWithBigInt(decimal.Coefficient(), decimal.Exponent())) } // NewHLCForTime creates a new revision for the given time. func NewHLCForTime(time time.Time) HLCRevision { - return HLCRevision{decimal.NewFromInt(time.UnixNano())} + return HLCRevision(*apd.New(0, 0).SetInt64(time.UnixNano())) } func (hlc HLCRevision) Equal(rhs datastore.Revision) bool { if rhs == datastore.NoRevision { - return false + rhs = HLCRevision(*apd.New(0, 0)) } - rhsD := rhs.(HLCRevision) - return hlc.decimal.Equal(rhsD.decimal) + lhsD := apd.Decimal(hlc) + lhsDP := &lhsD + rhsD := apd.Decimal(rhs.(HLCRevision)) + return lhsDP.Cmp(&rhsD) == 0 } func (hlc HLCRevision) GreaterThan(rhs datastore.Revision) bool { if rhs == datastore.NoRevision { - rhs = HLCRevision{decimal.Zero} + rhs = HLCRevision(*apd.New(0, 0)) } - rhsD := rhs.(HLCRevision) - return hlc.decimal.GreaterThan(rhsD.decimal) + lhsD := apd.Decimal(hlc) + lhsDP := &lhsD + rhsD := apd.Decimal(rhs.(HLCRevision)) + return lhsDP.Cmp(&rhsD) == 1 } func (hlc HLCRevision) LessThan(rhs datastore.Revision) bool { if rhs == datastore.NoRevision { - rhs = HLCRevision{decimal.Zero} + return false } - rhsD := rhs.(HLCRevision) - return hlc.decimal.LessThan(rhsD.decimal) + lhsD := apd.Decimal(hlc) + lhsDP := &lhsD + rhsD := apd.Decimal(rhs.(HLCRevision)) + return lhsDP.Cmp(&rhsD) == -1 } func (hlc HLCRevision) String() string { - return hlc.decimal.String() + d := apd.Decimal(hlc) + dp := &d + return dp.String() } func (hlc HLCRevision) TimestampNanoSec() int64 { - return hlc.decimal.IntPart() + d := apd.Decimal(hlc) + dp := &d + c := apd.BaseContext + output := new(apd.Decimal) + _, _ = c.Floor(output, dp) + i, _ := output.Int64() + return i } func (hlc HLCRevision) InexactFloat64() float64 { - return float64(hlc.decimal.IntPart()) + d := apd.Decimal(hlc) + dp := &d + f, _ := dp.Float64() + return f } func (hlc HLCRevision) ConstructForTimestamp(timestamp int64) WithTimestampRevision { - return HLCRevision{decimal.NewFromInt(timestamp)} + return HLCRevision(*(apd.New(0, 0).SetInt64(timestamp))) } -var _ datastore.Revision = HLCRevision{} -var _ WithTimestampRevision = HLCRevision{} +var ( + _ datastore.Revision = HLCRevision{} + _ WithTimestampRevision = HLCRevision{} +) -// HLCKeyFunc is used to convert a simple HLC to an int64 for use in maps. -func HLCKeyFunc(r HLCRevision) int64 { - return r.TimestampNanoSec() +// HLCKeyFunc is used to convert a simple HLC for use in maps. +func HLCKeyFunc(r HLCRevision) string { + return r.String() } // HLCKeyLessThanFunc is used to compare keys created by the HLCKeyFunc. -func HLCKeyLessThanFunc(lhs, rhs int64) bool { - return lhs < rhs +func HLCKeyLessThanFunc(lhs, rhs string) bool { + // Return the HLCs as strings to ensure precise is maintained. + lp := mustParseDecimal(lhs) + rp := mustParseDecimal(rhs) + return lp.Cmp(rp) == -1 +} + +func mustParseDecimal(value string) *apd.Decimal { + parsed, _, err := apd.NewFromString(value) + if err != nil { + panic("could not parse decimal") + } + return parsed } diff --git a/internal/datastore/revisions/hlcrevision_test.go b/internal/datastore/revisions/hlcrevision_test.go new file mode 100644 index 0000000000..67cef24bb0 --- /dev/null +++ b/internal/datastore/revisions/hlcrevision_test.go @@ -0,0 +1,107 @@ +package revisions + +import ( + "testing" + "time" + + "github.com/shopspring/decimal" + "github.com/stretchr/testify/require" +) + +func TestNewForHLC(t *testing.T) { + tcs := []string{ + "1", + "2", + "42", + "1257894000000000000", + "-1", + "1.23", + "9223372036854775807.2", + "9223372036854775807.2345987348543", + "1703283409994227985.0000000004", + } + + for _, tc := range tcs { + t.Run(tc, func(t *testing.T) { + d, err := decimal.NewFromString(tc) + require.NoError(t, err) + + rev := NewForHLC(d) + require.Equal(t, tc, rev.String()) + }) + } +} + +func TestTimestampNanoSec(t *testing.T) { + tcs := map[string]int64{ + "1": 1, + "2": 2, + "42": 42, + "1257894000000000000": 1257894000000000000, + "-1": -1, + "1.23": 1, + "9223372036854775807.2": 9223372036854775807, + "9223372036854775807.2345987348543": 9223372036854775807, + "1703283409994227985.0000000004": 1703283409994227985, + } + + for tc, nano := range tcs { + t.Run(tc, func(t *testing.T) { + rev, err := HLCRevisionFromString(tc) + require.NoError(t, err) + + require.Equal(t, nano, rev.TimestampNanoSec()) + }) + } +} + +func TestNewHLCForTime(t *testing.T) { + time := time.Now() + rev := NewForTime(time) + require.Equal(t, time.UnixNano(), rev.TimestampNanoSec()) +} + +func TestHLCKeyLessThanFunc(t *testing.T) { + tcs := []struct { + left string + right string + isLessThan bool + }{ + { + "1", "2", true, + }, + { + "2", "1", false, + }, + { + "2", "2", false, + }, + { + "1", "1.1", true, + }, + { + "1.1", "1.1", false, + }, + { + "1.1", "1", false, + }, + { + "1703283409994227985.0000000004", "1703283409994227985.0000000005", true, + }, + } + + for _, tc := range tcs { + t.Run(tc.left+"-"+tc.right, func(t *testing.T) { + left, err := HLCRevisionFromString(tc.left) + require.NoError(t, err) + + right, err := HLCRevisionFromString(tc.right) + require.NoError(t, err) + + lk := HLCKeyFunc(left) + rk := HLCKeyFunc(right) + + require.Equal(t, tc.isLessThan, HLCKeyLessThanFunc(lk, rk)) + }) + } +} diff --git a/internal/datastore/revisions/timestamprevision.go b/internal/datastore/revisions/timestamprevision.go index 5c20e31b6e..f57307a5c7 100644 --- a/internal/datastore/revisions/timestamprevision.go +++ b/internal/datastore/revisions/timestamprevision.go @@ -63,8 +63,10 @@ func (ir TimestampRevision) ConstructForTimestamp(timestamp int64) WithTimestamp return TimestampRevision(timestamp) } -var _ datastore.Revision = TimestampRevision(0) -var _ WithTimestampRevision = TimestampRevision(0) +var ( + _ datastore.Revision = TimestampRevision(0) + _ WithTimestampRevision = TimestampRevision(0) +) // TimestampIDKeyFunc is used to create keys for timestamps. func TimestampIDKeyFunc(r TimestampRevision) int64 { From 1fc3e2b362fe6ae0b96d7561591da745c699c12a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 22 Dec 2023 14:04:40 -0500 Subject: [PATCH 4/4] Update e2e test --- e2e/go.mod | 9 +++++++++ e2e/go.sum | 7 +++++++ e2e/newenemy/newenemy_test.go | 6 +++--- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/e2e/go.mod b/e2e/go.mod index 4eba004d20..67b447977c 100644 --- a/e2e/go.mod +++ b/e2e/go.mod @@ -20,13 +20,17 @@ require ( require ( github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect github.com/authzed/cel-go v0.17.5 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect + github.com/cockroachdb/apd v1.1.0 // indirect github.com/creasty/defaults v1.7.0 // indirect github.com/dave/jennifer v1.6.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect github.com/fatih/structtag v1.2.0 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect @@ -34,14 +38,19 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jzelinskie/stringz v0.0.2 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rs/zerolog v1.31.0 // indirect github.com/samber/lo v1.38.1 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.13.0 // indirect diff --git a/e2e/go.sum b/e2e/go.sum index 2914d05de5..9f7e004774 100644 --- a/e2e/go.sum +++ b/e2e/go.sum @@ -54,6 +54,8 @@ github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe h1:QQ3GSy+MqSHxm/d8nC github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creasty/defaults v1.7.0 h1:eNdqZvc5B509z18lD8yc212CAqJNvfT1Jq6L8WowdBA= github.com/creasty/defaults v1.7.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= @@ -91,6 +93,7 @@ github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBj github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -158,6 +161,8 @@ github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lthibault/jitterbug v2.0.0+incompatible h1:qouq51IKzlMx25+15jbxhC/d79YyTj0q6XFoptNqaUw= github.com/lthibault/jitterbug v2.0.0+incompatible/go.mod h1:2l7akWd27PScEs6YkjyUVj/8hKgNhbbQ3KiJgJtlf6o= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -174,6 +179,7 @@ github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79 h1:Dmx8g2747UTVPzSkmoh github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79/go.mod h1:E26fwEtRNigBfFfHDWsklmo0T7Ixbg0XXgck+Hq4O9k= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795 h1:pH+U6pJP0BhxqQ4njBUjOg0++WMMvv3eByWzB+oATBY= github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= @@ -212,6 +218,7 @@ github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8w github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index 73d74b9357..f4bfd3389d 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" - "github.com/authzed/spicedb/pkg/datastore/revision" + "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/zedtoken" "github.com/authzed/spicedb/e2e" @@ -396,8 +396,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb ns2AllowlistLeader := getLeaderNodeForNamespace(ctx, crdb[2].Conn(), allowlists[i].Relationship.Subject.Object.ObjectType) r1leader, r2leader := getLeaderNode(ctx, crdb[2].Conn(), blockusers[i].Relationship), getLeaderNode(ctx, crdb[2].Conn(), allowlists[i].Relationship) - z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revision.DecimalDecoder{}) - z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revision.DecimalDecoder{}) + z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) t.Log(sleep, z1, z2, z1.GreaterThan(z2), r1leader, r2leader, ns1BlocklistLeader, ns1UserLeader, ns2ResourceLeader, ns2AllowlistLeader) if z1.GreaterThan(z2) {