diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 63b83a532d..b74423fd7d 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -166,13 +166,14 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas config.followerReadDelay, config.revisionQuantization, ), - CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, - dburl: url, - watchBufferLength: config.watchBufferLength, - writeOverlapKeyer: keyer, - overlapKeyInit: keySetInit, - disableStats: config.disableStats, - beginChangefeedQuery: changefeedQuery, + CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, + dburl: url, + watchBufferLength: config.watchBufferLength, + watchBufferWriteTimeout: config.watchBufferWriteTimeout, + writeOverlapKeyer: keyer, + overlapKeyInit: keySetInit, + disableStats: config.disableStats, + beginChangefeedQuery: changefeedQuery, } ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal) @@ -247,12 +248,13 @@ type crdbDatastore struct { *revisions.RemoteClockRevisions revisions.CommonDecoder - dburl string - readPool, writePool *pool.RetryPool - watchBufferLength uint16 - writeOverlapKeyer overlapKeyer - overlapKeyInit func(ctx context.Context) keySet - disableStats bool + dburl string + readPool, writePool *pool.RetryPool + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + writeOverlapKeyer overlapKeyer + overlapKeyInit func(ctx context.Context) keySet + disableStats bool beginChangefeedQuery string diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index d285d43393..905861b27f 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -12,6 +12,7 @@ type crdbOptions struct { connectRate time.Duration watchBufferLength uint16 + watchBufferWriteTimeout time.Duration revisionQuantization time.Duration followerReadDelay time.Duration maxRevisionStalenessPercent float64 @@ -37,6 +38,7 @@ const ( defaultFollowerReadDelay = 0 * time.Second defaultMaxRevisionStalenessPercent = 0.1 defaultWatchBufferLength = 128 + defaultWatchBufferWriteTimeout = 1 * time.Second defaultSplitSize = 1024 defaultMaxRetries = 5 @@ -56,6 +58,7 @@ func generateConfig(options []Option) (crdbOptions, error) { computed := crdbOptions{ gcWindow: 24 * time.Hour, watchBufferLength: defaultWatchBufferLength, + watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, revisionQuantization: defaultRevisionQuantization, followerReadDelay: defaultFollowerReadDelay, maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, @@ -216,6 +219,12 @@ func WatchBufferLength(watchBufferLength uint16) Option { return func(po *crdbOptions) { po.watchBufferLength = watchBufferLength } } +// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, +// after which the caller to the watch will be disconnected. +func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { + return func(po *crdbOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout } +} + // RevisionQuantization is the time bucket size to which advertised revisions // will be rounded. // diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 5d3e2c8125..0cd8749daa 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -54,7 +54,12 @@ type changeDetails struct { } func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - updates := make(chan *datastore.RevisionChanges, cds.watchBufferLength) + watchBufferLength := options.WatchBufferLength + if watchBufferLength <= 0 { + watchBufferLength = cds.watchBufferLength + } + + updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) features, err := cds.Features(ctx) @@ -137,12 +142,28 @@ func (cds *crdbDatastore) watch( errs <- err } + watchBufferWriteTimeout := opts.WatchBufferWriteTimeout + if watchBufferWriteTimeout <= 0 { + watchBufferWriteTimeout = cds.watchBufferWriteTimeout + } + sendChange := func(change *datastore.RevisionChanges) bool { select { case updates <- change: return true default: + // If we cannot immediately write, setup the timer and try again. + } + + timer := time.NewTimer(watchBufferWriteTimeout) + defer timer.Stop() + + select { + case updates <- change: + return true + + case <-timer.C: errs <- datastore.NewWatchDisconnectedErr() return false } diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 7f31390e18..9199d0bb5b 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -68,10 +68,11 @@ func NewMemdbDatastore( }, }, - negativeGCWindow: gcWindow.Nanoseconds() * -1, - quantizationPeriod: revisionQuantization.Nanoseconds(), - watchBufferLength: watchBufferLength, - uniqueID: uniqueID, + negativeGCWindow: gcWindow.Nanoseconds() * -1, + quantizationPeriod: revisionQuantization.Nanoseconds(), + watchBufferLength: watchBufferLength, + watchBufferWriteTimeout: 100 * time.Millisecond, + uniqueID: uniqueID, }, nil } @@ -83,10 +84,11 @@ type memdbDatastore struct { revisions []snapshot activeWriteTxn *memdb.Txn - negativeGCWindow int64 - quantizationPeriod int64 - watchBufferLength uint16 - uniqueID string + negativeGCWindow int64 + quantizationPeriod int64 + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + uniqueID string } type snapshot struct { diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 262fbadf6f..1f63e2d5c7 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/hashicorp/go-memdb" @@ -14,7 +15,12 @@ import ( const errWatchError = "watch error: %w" func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - updates := make(chan *datastore.RevisionChanges, mdb.watchBufferLength) + watchBufferLength := options.WatchBufferLength + if watchBufferLength == 0 { + watchBufferLength = mdb.watchBufferLength + } + + updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if options.Content&datastore.WatchSchema == datastore.WatchSchema { @@ -22,6 +28,33 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt return updates, errs } + watchBufferWriteTimeout := options.WatchBufferWriteTimeout + if watchBufferWriteTimeout == 0 { + watchBufferWriteTimeout = mdb.watchBufferWriteTimeout + } + + sendChange := func(change *datastore.RevisionChanges) bool { + select { + case updates <- change: + return true + + default: + // If we cannot immediately write, setup the timer and try again. + } + + timer := time.NewTimer(watchBufferWriteTimeout) + defer timer.Stop() + + select { + case updates <- change: + return true + + case <-timer.C: + errs <- datastore.NewWatchDisconnectedErr() + return false + } + } + go func() { defer close(updates) defer close(errs) @@ -40,10 +73,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt // Write the staged updates to the channel for _, changeToWrite := range stagedUpdates { - select { - case updates <- changeToWrite: - default: - errs <- datastore.NewWatchDisconnectedErr() + if !sendChange(changeToWrite) { return } } diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 650c27673c..714a61c5a1 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -197,24 +197,25 @@ func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Dat ) store := &Datastore{ - db: db, - driver: driver, - url: uri, - revisionQuantization: config.revisionQuantization, - gcWindow: config.gcWindow, - gcInterval: config.gcInterval, - gcTimeout: config.gcMaxOperationTime, - gcCtx: gcCtx, - cancelGc: cancelGc, - watchBufferLength: config.watchBufferLength, - optimizedRevisionQuery: revisionQuery, - validTransactionQuery: validTransactionQuery, - createTxn: createTxn, - createBaseTxn: createBaseTxn, - QueryBuilder: queryBuilder, - readTxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true}, - maxRetries: config.maxRetries, - analyzeBeforeStats: config.analyzeBeforeStats, + db: db, + driver: driver, + url: uri, + revisionQuantization: config.revisionQuantization, + gcWindow: config.gcWindow, + gcInterval: config.gcInterval, + gcTimeout: config.gcMaxOperationTime, + gcCtx: gcCtx, + cancelGc: cancelGc, + watchBufferLength: config.watchBufferLength, + watchBufferWriteTimeout: config.watchBufferWriteTimeout, + optimizedRevisionQuery: revisionQuery, + validTransactionQuery: validTransactionQuery, + createTxn: createTxn, + createBaseTxn: createBaseTxn, + QueryBuilder: queryBuilder, + readTxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true}, + maxRetries: config.maxRetries, + analyzeBeforeStats: config.analyzeBeforeStats, CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions( maxRevisionStaleness, ), @@ -430,12 +431,13 @@ type Datastore struct { url string analyzeBeforeStats bool - revisionQuantization time.Duration - gcWindow time.Duration - gcInterval time.Duration - gcTimeout time.Duration - watchBufferLength uint16 - maxRetries uint8 + revisionQuantization time.Duration + gcWindow time.Duration + gcInterval time.Duration + gcTimeout time.Duration + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + maxRetries uint8 optimizedRevisionQuery string validTransactionQuery string diff --git a/internal/datastore/mysql/options.go b/internal/datastore/mysql/options.go index 0ae0ac51c2..e75d070b8a 100644 --- a/internal/datastore/mysql/options.go +++ b/internal/datastore/mysql/options.go @@ -15,6 +15,7 @@ const ( defaultConnMaxIdleTime = 30 * time.Minute defaultConnMaxLifetime = 30 * time.Minute defaultWatchBufferLength = 128 + defaultWatchBufferWriteTimeout = 1 * time.Second defaultQuantization = 5 * time.Second defaultMaxRevisionStalenessPercent = 0.1 defaultEnablePrometheusStats = false @@ -29,6 +30,7 @@ type mysqlOptions struct { gcMaxOperationTime time.Duration maxRevisionStalenessPercent float64 watchBufferLength uint16 + watchBufferWriteTimeout time.Duration tablePrefix string enablePrometheusStats bool maxOpenConns int @@ -50,6 +52,7 @@ func generateConfig(options []Option) (mysqlOptions, error) { gcInterval: defaultGarbageCollectionInterval, gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime, watchBufferLength: defaultWatchBufferLength, + watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, maxOpenConns: defaultMaxOpenConns, connMaxIdleTime: defaultConnMaxIdleTime, connMaxLifetime: defaultConnMaxLifetime, @@ -86,6 +89,12 @@ func WatchBufferLength(watchBufferLength uint16) Option { } } +// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, +// after which the caller to the watch will be disconnected. +func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { + return func(mo *mysqlOptions) { mo.watchBufferWriteTimeout = watchBufferWriteTimeout } +} + // RevisionQuantization is the time bucket size to which advertised // revisions will be rounded. // diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index d9a8a93a55..e33d2a6791 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -21,7 +21,12 @@ const ( // // All events following afterRevision will be sent to the caller. func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - updates := make(chan *datastore.RevisionChanges, mds.watchBufferLength) + watchBufferLength := options.WatchBufferLength + if watchBufferLength <= 0 { + watchBufferLength = mds.watchBufferLength + } + + updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if options.Content&datastore.WatchSchema == datastore.WatchSchema { @@ -35,6 +40,33 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi return updates, errs } + watchBufferWriteTimeout := options.WatchBufferWriteTimeout + if watchBufferWriteTimeout <= 0 { + watchBufferWriteTimeout = mds.watchBufferWriteTimeout + } + + sendChange := func(change *datastore.RevisionChanges) bool { + select { + case updates <- change: + return true + + default: + // If we cannot immediately write, setup the timer and try again. + } + + timer := time.NewTimer(watchBufferWriteTimeout) + defer timer.Stop() + + select { + case updates <- change: + return true + + case <-timer.C: + errs <- datastore.NewWatchDisconnectedErr() + return false + } + } + go func() { defer close(updates) defer close(errs) @@ -56,11 +88,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi // Write the staged updates to the channel for _, changeToWrite := range stagedUpdates { changeToWrite := changeToWrite - - select { - case updates <- &changeToWrite: - default: - errs <- datastore.NewWatchDisconnectedErr() + if !sendChange(&changeToWrite) { return } } diff --git a/internal/datastore/postgres/options.go b/internal/datastore/postgres/options.go index c3edbb9122..b10e6dd231 100644 --- a/internal/datastore/postgres/options.go +++ b/internal/datastore/postgres/options.go @@ -12,12 +12,13 @@ type postgresOptions struct { maxRevisionStalenessPercent float64 - watchBufferLength uint16 - revisionQuantization time.Duration - gcWindow time.Duration - gcInterval time.Duration - gcMaxOperationTime time.Duration - maxRetries uint8 + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + revisionQuantization time.Duration + gcWindow time.Duration + gcInterval time.Duration + gcMaxOperationTime time.Duration + maxRetries uint8 enablePrometheusStats bool analyzeBeforeStatistics bool @@ -48,6 +49,7 @@ const ( errQuantizationTooLarge = "revision quantization interval (%s) must be less than GC window (%s)" defaultWatchBufferLength = 128 + defaultWatchBufferWriteTimeout = 1 * time.Second defaultGarbageCollectionWindow = 24 * time.Hour defaultGarbageCollectionInterval = time.Minute * 3 defaultGarbageCollectionMaxOperationTime = time.Minute @@ -68,6 +70,7 @@ func generateConfig(options []Option) (postgresOptions, error) { gcInterval: defaultGarbageCollectionInterval, gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime, watchBufferLength: defaultWatchBufferLength, + watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, revisionQuantization: defaultQuantization, maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, enablePrometheusStats: defaultEnablePrometheusStats, @@ -228,6 +231,12 @@ func WatchBufferLength(watchBufferLength uint16) Option { return func(po *postgresOptions) { po.watchBufferLength = watchBufferLength } } +// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, +// after which the caller to the watch will be disconnected. +func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { + return func(po *postgresOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout } +} + // RevisionQuantization is the time bucket size to which advertised // revisions will be rounded. // diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 036418537b..b3b2b59af0 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -248,6 +248,7 @@ func newPostgresDatastore( readPool: pgxcommon.MustNewInterceptorPooler(readPool, config.queryInterceptor), writePool: pgxcommon.MustNewInterceptorPooler(writePool, config.queryInterceptor), watchBufferLength: config.watchBufferLength, + watchBufferWriteTimeout: config.watchBufferWriteTimeout, optimizedRevisionQuery: revisionQuery, validTransactionQuery: validTransactionQuery, gcWindow: config.gcWindow, @@ -288,6 +289,7 @@ type pgDatastore struct { dburl string readPool, writePool pgxcommon.ConnPooler watchBufferLength uint16 + watchBufferWriteTimeout time.Duration optimizedRevisionQuery string validTransactionQuery string gcWindow time.Duration diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 976c9b92cb..d113a49c7b 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -67,7 +67,12 @@ func (pgd *pgDatastore) Watch( afterRevisionRaw datastore.Revision, options datastore.WatchOptions, ) (<-chan *datastore.RevisionChanges, <-chan error) { - updates := make(chan *datastore.RevisionChanges, pgd.watchBufferLength) + watchBufferLength := options.WatchBufferLength + if watchBufferLength <= 0 { + watchBufferLength = pgd.watchBufferLength + } + + updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if !pgd.watchEnabled { @@ -81,6 +86,33 @@ func (pgd *pgDatastore) Watch( watchSleep = minimumWatchSleep } + watchBufferWriteTimeout := options.WatchBufferWriteTimeout + if watchBufferWriteTimeout <= 0 { + watchBufferWriteTimeout = pgd.watchBufferWriteTimeout + } + + sendChange := func(change *datastore.RevisionChanges) bool { + select { + case updates <- change: + return true + + default: + // If we cannot immediately write, setup the timer and try again. + } + + timer := time.NewTimer(watchBufferWriteTimeout) + defer timer.Stop() + + select { + case updates <- change: + return true + + case <-timer.C: + errs <- datastore.NewWatchDisconnectedErr() + return false + } + } + go func() { defer close(updates) defer close(errs) @@ -113,12 +145,7 @@ func (pgd *pgDatastore) Watch( for _, changeToWrite := range changesToWrite { changeToWrite := changeToWrite - - select { - case updates <- &changeToWrite: - // Nothing to do here, we've already written to the channel. - default: - errs <- datastore.NewWatchDisconnectedErr() + if !sendChange(&changeToWrite) { return } } @@ -137,14 +164,10 @@ func (pgd *pgDatastore) Watch( // move revisions forward outside of changes, these could be necessary if the caller is // watching only a *subset* of changes. if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { - select { - case updates <- &datastore.RevisionChanges{ + if !sendChange(&datastore.RevisionChanges{ Revision: currentTxn, IsCheckpoint: true, - }: - // Nothing to do here, we've already written to the channel. - default: - errs <- datastore.NewWatchDisconnectedErr() + }) { return } } diff --git a/internal/datastore/spanner/options.go b/internal/datastore/spanner/options.go index 5376579e6b..4e645448ae 100644 --- a/internal/datastore/spanner/options.go +++ b/internal/datastore/spanner/options.go @@ -9,6 +9,7 @@ import ( type spannerOptions struct { watchBufferLength uint16 + watchBufferWriteTimeout time.Duration revisionQuantization time.Duration followerReadDelay time.Duration maxRevisionStalenessPercent float64 @@ -39,6 +40,7 @@ const ( defaultFollowerReadDelay = 0 * time.Second defaultMaxRevisionStalenessPercent = 0.1 defaultWatchBufferLength = 128 + defaultWatchBufferWriteTimeout = 1 * time.Second defaultDisableStats = false maxRevisionQuantization = 24 * time.Hour ) @@ -53,6 +55,7 @@ func generateConfig(options []Option) (spannerOptions, error) { defaultNumberConnections := max(1, math.Round(float64(runtime.GOMAXPROCS(0)))) computed := spannerOptions{ watchBufferLength: defaultWatchBufferLength, + watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, revisionQuantization: defaultRevisionQuantization, followerReadDelay: defaultFollowerReadDelay, maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, @@ -95,6 +98,12 @@ func WatchBufferLength(watchBufferLength uint16) Option { } } +// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, +// after which the caller to the watch will be disconnected. +func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { + return func(so *spannerOptions) { so.watchBufferWriteTimeout = watchBufferWriteTimeout } +} + // RevisionQuantization is the time bucket size to which advertised revisions // will be rounded. // diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 2442f3e136..bbc1933e3b 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -72,6 +72,9 @@ type spannerDatastore struct { *revisions.RemoteClockRevisions revisions.CommonDecoder + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + client *spanner.Client config spannerOptions database string @@ -149,9 +152,11 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) ( CommonDecoder: revisions.CommonDecoder{ Kind: revisions.Timestamp, }, - client: client, - config: config, - database: database, + client: client, + config: config, + database: database, + watchBufferWriteTimeout: config.watchBufferWriteTimeout, + watchBufferLength: config.watchBufferLength, } ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal) diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index 6f09ceab13..5988db5970 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -52,7 +52,12 @@ func parseDatabaseName(db string) (project, instance, database string, err error } func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - updates := make(chan *datastore.RevisionChanges, 10) + watchBufferLength := opts.WatchBufferLength + if watchBufferLength <= 0 { + watchBufferLength = sd.watchBufferLength + } + + updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) go sd.watch(ctx, afterRevision, opts, updates, errs) @@ -85,12 +90,29 @@ func (sd spannerDatastore) watch( errs <- err } + watchBufferWriteTimeout := opts.WatchBufferWriteTimeout + if watchBufferWriteTimeout <= 0 { + watchBufferWriteTimeout = sd.watchBufferWriteTimeout + } + sendChange := func(change *datastore.RevisionChanges) bool { select { case updates <- change: return true default: + // If we cannot immediately write, setup the timer and try again. + } + + timer := time.NewTimer(watchBufferWriteTimeout) + defer timer.Stop() + + select { + case updates <- change: + return true + + case <-timer.C: + errs <- datastore.NewWatchDisconnectedErr() return false } } diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index 64140ec42c..d2391bfa2c 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -141,7 +141,8 @@ type Config struct { TablePrefix string `debugmap:"visible"` // Internal - WatchBufferLength uint16 `debugmap:"visible"` + WatchBufferLength uint16 `debugmap:"visible"` + WatchBufferWriteTimeout time.Duration `debugmap:"visible"` // Migrations MigrationPhase string `debugmap:"visible"` @@ -212,7 +213,8 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.Uint64Var(&opts.SpannerMaxSessions, flagName("datastore-spanner-max-sessions"), 400, "maximum number of sessions across all Spanner gRPC connections the client can have at a given time") flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), "", "prefix to add to the name of all SpiceDB database tables") flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in") - flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how many events the watch buffer should queue before forcefully disconnecting reader") + flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how large the watch buffer should be before blocking") + flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), 1*time.Second, "how long the watch buffer should queue before forcefully disconnecting the reader") // disabling stats is only for tests flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table") @@ -252,6 +254,7 @@ func DefaultDatastoreConfig() *Config { GCInterval: 3 * time.Minute, GCMaxOperationTime: 1 * time.Minute, WatchBufferLength: 1024, + WatchBufferWriteTimeout: 1 * time.Second, EnableDatastoreMetrics: true, DisableStats: false, BootstrapFiles: []string{}, @@ -379,6 +382,7 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er crdb.OverlapKey(opts.OverlapKey), crdb.OverlapStrategy(opts.OverlapStrategy), crdb.WatchBufferLength(opts.WatchBufferLength), + crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), crdb.DisableStats(opts.DisableStats), crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing), @@ -408,6 +412,7 @@ func newPostgresDatastore(ctx context.Context, opts Config) (datastore.Datastore postgres.GCMaxOperationTime(opts.GCMaxOperationTime), postgres.EnableTracing(), postgres.WatchBufferLength(opts.WatchBufferLength), + postgres.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), postgres.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), postgres.MaxRetries(uint8(opts.MaxRetries)), postgres.MigrationPhase(opts.MigrationPhase), @@ -424,6 +429,7 @@ func newSpannerDatastore(ctx context.Context, opts Config) (datastore.Datastore, spanner.MaxRevisionStalenessPercent(opts.MaxRevisionStalenessPercent), spanner.CredentialsFile(opts.SpannerCredentialsFile), spanner.WatchBufferLength(opts.WatchBufferLength), + spanner.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), spanner.EmulatorHost(opts.SpannerEmulatorHost), spanner.DisableStats(opts.DisableStats), spanner.ReadConnsMaxOpen(opts.ReadConnPool.MaxOpenConns), @@ -448,6 +454,7 @@ func newMySQLDatastore(ctx context.Context, opts Config) (datastore.Datastore, e mysql.MaxRevisionStalenessPercent(opts.MaxRevisionStalenessPercent), mysql.TablePrefix(opts.TablePrefix), mysql.WatchBufferLength(opts.WatchBufferLength), + mysql.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), mysql.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), mysql.MaxRetries(uint8(opts.MaxRetries)), mysql.OverrideLockWaitTimeout(1), diff --git a/pkg/cmd/datastore/zz_generated.options.go b/pkg/cmd/datastore/zz_generated.options.go index 99e2791317..fbb25752f3 100644 --- a/pkg/cmd/datastore/zz_generated.options.go +++ b/pkg/cmd/datastore/zz_generated.options.go @@ -64,6 +64,7 @@ func (c *Config) ToOption() ConfigOption { to.SpannerMaxSessions = c.SpannerMaxSessions to.TablePrefix = c.TablePrefix to.WatchBufferLength = c.WatchBufferLength + to.WatchBufferWriteTimeout = c.WatchBufferWriteTimeout to.MigrationPhase = c.MigrationPhase } } @@ -104,6 +105,7 @@ func (c Config) DebugMap() map[string]any { debugMap["SpannerMaxSessions"] = helpers.DebugValue(c.SpannerMaxSessions, false) debugMap["TablePrefix"] = helpers.DebugValue(c.TablePrefix, false) debugMap["WatchBufferLength"] = helpers.DebugValue(c.WatchBufferLength, false) + debugMap["WatchBufferWriteTimeout"] = helpers.DebugValue(c.WatchBufferWriteTimeout, false) debugMap["MigrationPhase"] = helpers.DebugValue(c.MigrationPhase, false) return debugMap } @@ -369,6 +371,13 @@ func WithWatchBufferLength(watchBufferLength uint16) ConfigOption { } } +// WithWatchBufferWriteTimeout returns an option that can set WatchBufferWriteTimeout on a Config +func WithWatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) ConfigOption { + return func(c *Config) { + c.WatchBufferWriteTimeout = watchBufferWriteTimeout + } +} + // WithMigrationPhase returns an option that can set MigrationPhase on a Config func WithMigrationPhase(migrationPhase string) ConfigOption { return func(c *Config) { diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index f7426eeef4..3a98c05b66 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -342,6 +342,14 @@ type WatchOptions struct { // If given the zero value, the datastore's default will be used. If smaller // than the datastore's minimum, the minimum will be used. CheckpointInterval time.Duration + + // WatchBufferLength is the length of the buffer for the watch channel. If + // given the zero value, the datastore's default will be used. + WatchBufferLength uint16 + + // WatchBufferWriteTimeout is the timeout for writing to the watch channel. + // If given the zero value, the datastore's default will be used. + WatchBufferWriteTimeout time.Duration } // WatchJustRelationships returns watch options for just relationships. diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index dde08ecc58..2d02ab6609 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sort" - "strconv" "testing" "time" @@ -29,25 +28,34 @@ const waitForChangesTimeout = 5 * time.Second func WatchTest(t *testing.T, tester DatastoreTester) { testCases := []struct { numTuples int + bufferTimeout time.Duration expectFallBehind bool }{ { numTuples: 1, + bufferTimeout: 1 * time.Minute, expectFallBehind: false, }, { numTuples: 2, + bufferTimeout: 1 * time.Minute, expectFallBehind: false, }, { numTuples: 256, + bufferTimeout: 1 * time.Minute, + expectFallBehind: false, + }, + { + numTuples: 256, + bufferTimeout: 1 * time.Nanosecond, expectFallBehind: true, }, } for _, tc := range testCases { tc := tc - t.Run(strconv.Itoa(tc.numTuples), func(t *testing.T) { + t.Run(fmt.Sprintf("%d-%v", tc.numTuples, tc.expectFallBehind), func(t *testing.T) { require := require.New(t) ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) @@ -61,7 +69,12 @@ func WatchTest(t *testing.T, tester DatastoreTester) { lowestRevision, err := ds.HeadRevision(ctx) require.NoError(err) - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + opts := datastore.WatchOptions{ + Content: datastore.WatchRelationships, + WatchBufferLength: 128, + WatchBufferWriteTimeout: tc.bufferTimeout, + } + changes, errchan := ds.Watch(ctx, lowestRevision, opts) require.Zero(len(errchan)) var testUpdates [][]*core.RelationTupleUpdate @@ -115,7 +128,7 @@ func WatchTest(t *testing.T, tester DatastoreTester) { verifyUpdates(require, testUpdates, changes, errchan, tc.expectFallBehind) // Test the catch-up case - changes, errchan = ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + changes, errchan = ds.Watch(ctx, lowestRevision, opts) verifyUpdates(require, testUpdates, changes, errchan, tc.expectFallBehind) }) }