Skip to content

Commit

Permalink
Change CRDB driver to use new method for getting transaction timestamp
Browse files Browse the repository at this point in the history
The existing call disables most optimizations on write transactions, but was necessary for legacy reasons. Following this change, any CRDB version 23 (or later) will use the newer, better call

Also changes the stats system to use the CRDB stats instead of a custom table
  • Loading branch information
josephschorr committed Feb 28, 2024
1 parent 66a871c commit 03ee3f8
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 81 deletions.
46 changes: 30 additions & 16 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ const (
errUnableToInstantiate = "unable to instantiate datastore"
errRevision = "unable to find revision: %w"

querySelectNow = "SELECT cluster_logical_timestamp()"
queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;"
querySelectNow = "SELECT cluster_logical_timestamp()"
queryTransactionNowPreV23 = "SELECT cluster_logical_timestamp()"
queryTransactionNow = "SHOW COMMIT TIMESTAMP"
queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;"

livingTupleConstraint = "pk_relation_tuple"
)
Expand Down Expand Up @@ -122,6 +124,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
changefeedQuery = queryChangefeedPreV22
}

transactionNowQuery := queryTransactionNow
if version.Major < 23 {
log.Info().Object("version", version).Msg("using transaction now query for CRDB version < 23")
transactionNowQuery = queryTransactionNowPreV23
}

clusterTTLNanos, err := readClusterTTLNanos(initCtx, initPool)
if err != nil {
return nil, fmt.Errorf("unable to read cluster gc window: %w", err)
Expand Down Expand Up @@ -172,8 +180,9 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
disableStats: config.disableStats,
beginChangefeedQuery: changefeedQuery,
transactionNowQuery: transactionNowQuery,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -254,9 +263,10 @@ type crdbDatastore struct {
watchBufferWriteTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
disableStats bool
analyzeBeforeStatistics bool

beginChangefeedQuery string
transactionNowQuery string

featureGroup singleflight.Group[string, *datastore.Features]

Expand Down Expand Up @@ -321,21 +331,11 @@ func (cds *crdbDatastore) ReadWriteTx(
}
}

if cds.disableStats {
var err error
commitTimestamp, err = readCRDBNow(ctx, querier)
if err != nil {
return fmt.Errorf("error getting commit timestamp: %w", err)
}
return nil
}

var err error
commitTimestamp, err = updateCounter(ctx, tx, rwt.relCountChange)
commitTimestamp, err = cds.readTransactionCommitRev(ctx, querier)
if err != nil {
return fmt.Errorf("error updating relationship counter: %w", err)
return fmt.Errorf("error getting commit timestamp: %w", err)
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -467,6 +467,20 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
return &features, nil
}

func (cds *crdbDatastore) readTransactionCommitRev(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "readTransactionCommitRev")
defer span.End()

var hlcNow decimal.Decimal
if err := reader.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&hlcNow)
}, cds.transactionNowQuery); err != nil {
return datastore.NoRevision, fmt.Errorf("unable to read timestamp: %w", err)
}

return revisions.NewForHLC(hlcNow)
}

func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "readCRDBNow")
defer span.End()
Expand Down
8 changes: 7 additions & 1 deletion internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestCRDBDatastore(t *testing.T) {
RevisionQuantization(revisionQuantization),
WatchBufferLength(watchBufferLength),
OverlapStrategy(overlapStrategyPrefix),
DebugAnalyzeBeforeStatistics(),
)
require.NoError(t, err)
return ds
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestCRDBDatastoreWithFollowerReads(t *testing.T) {
GCWindow(gcWindow),
RevisionQuantization(quantization),
FollowerReadDelay(followerReadDelay),
DebugAnalyzeBeforeStatistics(),
)
require.NoError(err)
return ds
Expand Down Expand Up @@ -134,15 +136,19 @@ func TestWatchFeatureDetection(t *testing.T) {
require.NoError(t, err)
},
expectEnabled: false,
expectMessage: "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: ERROR: user unprivileged does not have CHANGEFEED privilege on relation relation_tuple (SQLSTATE 42501)",
expectMessage: "(SQLSTATE 42501)",
},
{
name: "rangefeeds enabled, user has permission",
postInit: func(ctx context.Context, adminConn *pgx.Conn) {
_, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
require.NoError(t, err)

_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, tableTuple))
require.NoError(t, err)

_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, tableTuple))
require.NoError(t, err)
},
expectEnabled: true,
},
Expand Down
17 changes: 10 additions & 7 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type crdbOptions struct {
maxRetries uint8
overlapStrategy string
overlapKey string
disableStats bool
enableConnectionBalancing bool
analyzeBeforeStatistics bool

enablePrometheusStats bool
}
Expand Down Expand Up @@ -65,7 +65,6 @@ func generateConfig(options []Option) (crdbOptions, error) {
maxRetries: defaultMaxRetries,
overlapKey: defaultOverlapKey,
overlapStrategy: defaultOverlapStrategy,
disableStats: false,
enablePrometheusStats: defaultEnablePrometheusStats,
enableConnectionBalancing: defaultEnableConnectionBalancing,
connectRate: defaultConnectRate,
Expand Down Expand Up @@ -283,11 +282,6 @@ func OverlapKey(key string) Option {
return func(po *crdbOptions) { po.overlapKey = key }
}

// DisableStats disables recording counts to the stats table
func DisableStats(disable bool) Option {
return func(po *crdbOptions) { po.disableStats = disable }
}

// WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres
// clients being used by the datastore are enabled.
//
Expand All @@ -303,3 +297,12 @@ func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
func WithEnableConnectionBalancing(connectionBalancing bool) Option {
return func(po *crdbOptions) { po.enableConnectionBalancing = connectionBalancing }
}

// DebugAnalyzeBeforeStatistics signals to the Statistics method that it should
// run Analyze on the database before returning statistics. This should only be
// used for debug and testing.
//
// Disabled by default.
func DebugAnalyzeBeforeStatistics() Option {
return func(po *crdbOptions) { po.analyzeBeforeStatistics = true }
}
132 changes: 78 additions & 54 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,24 @@ package crdb
import (
"context"
"fmt"
"math/rand"
"time"
"slices"

"github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v5"
"github.com/shopspring/decimal"
"github.com/rs/zerolog/log"

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/pkg/datastore"
)

const (
tableMetadata = "metadata"
colUniqueID = "unique_id"

tableCounters = "relationship_estimate_counters"
colID = "id"
colCount = "count"
)

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s AS OF SYSTEM TIME follower_read_timestamp()", colCount, tableCounters)

upsertCounterQuery = psql.Insert(tableCounters).Columns(
colID,
colCount,
).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters))

rng = rand.NewSource(time.Now().UnixNano())

uniqueID string
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
uniqueID string
)

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
Expand All @@ -52,14 +37,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
}

var nsDefs []datastore.RevisionedNamespace
var relCount int64

if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&relCount)
}, queryRelationshipEstimate); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read relationship count: %w", err)
}

if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
if err != nil {
Expand All @@ -76,37 +53,84 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
return datastore.Stats{}, err
}

// NOTE: this is a stop-gap solution to prevent panics in telemetry collection
if relCount < 0 {
relCount = 0
}

return datastore.Stats{
UniqueID: uniqueID,
EstimatedRelationshipCount: uint64(relCount),
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs),
}, nil
}
if cds.analyzeBeforeStatistics {
if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, "ANALYZE "+tableTuple); err != nil {
return fmt.Errorf("unable to analyze tuple table: %w", err)
}

func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (datastore.Revision, error) {
counterID := make([]byte, 2)
// nolint:gosec
// G404 use of non cryptographically secure random number generator is not concern here,
// as this is only used to randomly distributed the counters across multiple rows and reduce write row contention
_, err := rand.New(rng).Read(counterID)
if err != nil {
return datastore.NoRevision, fmt.Errorf("unable to select random counter: %w", err)
return nil
}); err != nil {
return datastore.Stats{}, err
}
}

sql, args, err := upsertCounterQuery.Values(counterID, change).ToSql()
if err != nil {
return datastore.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err)
}
var estimatedRelCount uint64
if err := cds.readPool.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error {
hasRows := false

for rows.Next() {
hasRows = true
values, err := rows.Values()
if err != nil {
log.Warn().Err(err).Msg("unable to read statistics")
return nil
}

// Ensure this the correct row.
isCorrectRow := false
for index, fd := range rows.FieldDescriptions() {
if fd.Name != "column_names" {
continue
}

columnNames, ok := values[index].([]any)
if !ok {
log.Warn().Msg("unable to read column names")
return nil
}

if slices.Contains(columnNames, "namespace") &&
slices.Contains(columnNames, "object_id") &&
slices.Contains(columnNames, "relation") &&
slices.Contains(columnNames, "userset_namespace") &&
slices.Contains(columnNames, "userset_object_id") &&
slices.Contains(columnNames, "userset_relation") {
isCorrectRow = true
break
}
}

if !isCorrectRow {
continue
}

// Read the estimated relationship count.
for index, fd := range rows.FieldDescriptions() {
if fd.Name != "row_count" {
continue
}

rowCount, ok := values[index].(int64)
if !ok {
log.Warn().Msg("unable to read row count")
return nil
}

estimatedRelCount = uint64(rowCount)
return nil
}
}

var timestamp decimal.Decimal
if err := tx.QueryRow(ctx, sql, args...).Scan(&timestamp); err != nil {
return datastore.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err)
log.Warn().Bool("has-rows", hasRows).Msg("unable to find row count in statistics query result")
return nil
}, "SHOW STATISTICS FOR TABLE relation_tuple;"); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to query unique estimated row count: %w", err)
}

return revisions.NewForHLC(timestamp)
return datastore.Stats{
UniqueID: uniqueID,
EstimatedRelationshipCount: estimatedRelCount,
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs),
}, nil
}
2 changes: 1 addition & 1 deletion internal/testserver/datastore/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

const (
CRDBTestVersionTag = "v22.2.0"
CRDBTestVersionTag = "v23.1.16"

enableRangefeeds = `SET CLUSTER SETTING kv.rangefeed.enabled = true;`
)
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er
crdb.OverlapStrategy(opts.OverlapStrategy),
crdb.WatchBufferLength(opts.WatchBufferLength),
crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout),
crdb.DisableStats(opts.DisableStats),
crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics),
crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing),
crdb.ConnectRate(opts.ConnectRate),
Expand Down
1 change: 1 addition & 0 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories)
t.Run("TestCreateDeleteTouchTest", func(t *testing.T) { CreateDeleteTouchTest(t, tester) })
t.Run("TestCreateTouchDeleteTouchTest", func(t *testing.T) { CreateTouchDeleteTouchTest(t, tester) })
t.Run("TestTouchAlreadyExistingCaveated", func(t *testing.T) { TouchAlreadyExistingCaveatedTest(t, tester) })
t.Run("TestBulkDeleteRelationships", func(t *testing.T) { BulkDeleteRelationshipsTest(t, tester) })

t.Run("TestMultipleReadsInRWT", func(t *testing.T) { MultipleReadsInRWTTest(t, tester) })
t.Run("TestConcurrentWriteSerialization", func(t *testing.T) { ConcurrentWriteSerializationTest(t, tester) })
Expand Down
2 changes: 1 addition & 1 deletion pkg/datastore/test/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func StatsTest(t *testing.T, tester DatastoreTester) {

if stats.EstimatedRelationshipCount == uint64(0) && retryCount > 0 {
// Sleep for a bit to get the stats table to update.
time.Sleep(50 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
continue
}

Expand Down
Loading

0 comments on commit 03ee3f8

Please sign in to comment.