Skip to content

Commit

Permalink
Merge pull request #1707 from josephschorr/watch-buffer-timeout
Browse files Browse the repository at this point in the history
Add timeout to watch buffers
  • Loading branch information
josephschorr authored Jan 9, 2024
2 parents d25d8a1 + 3d8c1aa commit dae2369
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 86 deletions.
28 changes: 15 additions & 13 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,14 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
config.followerReadDelay,
config.revisionQuantization,
),
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
dburl: url,
watchBufferLength: config.watchBufferLength,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
disableStats: config.disableStats,
beginChangefeedQuery: changefeedQuery,
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
dburl: url,
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
disableStats: config.disableStats,
beginChangefeedQuery: changefeedQuery,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -247,12 +248,13 @@ type crdbDatastore struct {
*revisions.RemoteClockRevisions
revisions.CommonDecoder

dburl string
readPool, writePool *pool.RetryPool
watchBufferLength uint16
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
disableStats bool
dburl string
readPool, writePool *pool.RetryPool
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
disableStats bool

beginChangefeedQuery string

Expand Down
9 changes: 9 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type crdbOptions struct {
connectRate time.Duration

watchBufferLength uint16
watchBufferWriteTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
Expand All @@ -37,6 +38,7 @@ const (
defaultFollowerReadDelay = 0 * time.Second
defaultMaxRevisionStalenessPercent = 0.1
defaultWatchBufferLength = 128
defaultWatchBufferWriteTimeout = 1 * time.Second
defaultSplitSize = 1024

defaultMaxRetries = 5
Expand All @@ -56,6 +58,7 @@ func generateConfig(options []Option) (crdbOptions, error) {
computed := crdbOptions{
gcWindow: 24 * time.Hour,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
Expand Down Expand Up @@ -216,6 +219,12 @@ func WatchBufferLength(watchBufferLength uint16) Option {
return func(po *crdbOptions) { po.watchBufferLength = watchBufferLength }
}

// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer,
// after which the caller to the watch will be disconnected.
func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
return func(po *crdbOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout }
}

// RevisionQuantization is the time bucket size to which advertised revisions
// will be rounded.
//
Expand Down
23 changes: 22 additions & 1 deletion internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ type changeDetails struct {
}

func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
updates := make(chan *datastore.RevisionChanges, cds.watchBufferLength)
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = cds.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

features, err := cds.Features(ctx)
Expand Down Expand Up @@ -137,12 +142,28 @@ func (cds *crdbDatastore) watch(
errs <- err
}

watchBufferWriteTimeout := opts.WatchBufferWriteTimeout
if watchBufferWriteTimeout <= 0 {
watchBufferWriteTimeout = cds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
select {
case updates <- change:
return true

default:
// If we cannot immediately write, setup the timer and try again.
}

timer := time.NewTimer(watchBufferWriteTimeout)
defer timer.Stop()

select {
case updates <- change:
return true

case <-timer.C:
errs <- datastore.NewWatchDisconnectedErr()
return false
}
Expand Down
18 changes: 10 additions & 8 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ func NewMemdbDatastore(
},
},

negativeGCWindow: gcWindow.Nanoseconds() * -1,
quantizationPeriod: revisionQuantization.Nanoseconds(),
watchBufferLength: watchBufferLength,
uniqueID: uniqueID,
negativeGCWindow: gcWindow.Nanoseconds() * -1,
quantizationPeriod: revisionQuantization.Nanoseconds(),
watchBufferLength: watchBufferLength,
watchBufferWriteTimeout: 100 * time.Millisecond,
uniqueID: uniqueID,
}, nil
}

Expand All @@ -83,10 +84,11 @@ type memdbDatastore struct {
revisions []snapshot
activeWriteTxn *memdb.Txn

negativeGCWindow int64
quantizationPeriod int64
watchBufferLength uint16
uniqueID string
negativeGCWindow int64
quantizationPeriod int64
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
uniqueID string
}

type snapshot struct {
Expand Down
40 changes: 35 additions & 5 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/hashicorp/go-memdb"

Expand All @@ -14,14 +15,46 @@ import (
const errWatchError = "watch error: %w"

func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
updates := make(chan *datastore.RevisionChanges, mdb.watchBufferLength)
watchBufferLength := options.WatchBufferLength
if watchBufferLength == 0 {
watchBufferLength = mdb.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
errs <- errors.New("schema watch unsupported in MemDB")
return updates, errs
}

watchBufferWriteTimeout := options.WatchBufferWriteTimeout
if watchBufferWriteTimeout == 0 {
watchBufferWriteTimeout = mdb.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
select {
case updates <- change:
return true

default:
// If we cannot immediately write, setup the timer and try again.
}

timer := time.NewTimer(watchBufferWriteTimeout)
defer timer.Stop()

select {
case updates <- change:
return true

case <-timer.C:
errs <- datastore.NewWatchDisconnectedErr()
return false
}
}

go func() {
defer close(updates)
defer close(errs)
Expand All @@ -40,10 +73,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt

// Write the staged updates to the channel
for _, changeToWrite := range stagedUpdates {
select {
case updates <- changeToWrite:
default:
errs <- datastore.NewWatchDisconnectedErr()
if !sendChange(changeToWrite) {
return
}
}
Expand Down
50 changes: 26 additions & 24 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,24 +197,25 @@ func newMySQLDatastore(ctx context.Context, uri string, options ...Option) (*Dat
)

store := &Datastore{
db: db,
driver: driver,
url: uri,
revisionQuantization: config.revisionQuantization,
gcWindow: config.gcWindow,
gcInterval: config.gcInterval,
gcTimeout: config.gcMaxOperationTime,
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
createTxn: createTxn,
createBaseTxn: createBaseTxn,
QueryBuilder: queryBuilder,
readTxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true},
maxRetries: config.maxRetries,
analyzeBeforeStats: config.analyzeBeforeStats,
db: db,
driver: driver,
url: uri,
revisionQuantization: config.revisionQuantization,
gcWindow: config.gcWindow,
gcInterval: config.gcInterval,
gcTimeout: config.gcMaxOperationTime,
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
createTxn: createTxn,
createBaseTxn: createBaseTxn,
QueryBuilder: queryBuilder,
readTxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true},
maxRetries: config.maxRetries,
analyzeBeforeStats: config.analyzeBeforeStats,
CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions(
maxRevisionStaleness,
),
Expand Down Expand Up @@ -430,12 +431,13 @@ type Datastore struct {
url string
analyzeBeforeStats bool

revisionQuantization time.Duration
gcWindow time.Duration
gcInterval time.Duration
gcTimeout time.Duration
watchBufferLength uint16
maxRetries uint8
revisionQuantization time.Duration
gcWindow time.Duration
gcInterval time.Duration
gcTimeout time.Duration
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
maxRetries uint8

optimizedRevisionQuery string
validTransactionQuery string
Expand Down
9 changes: 9 additions & 0 deletions internal/datastore/mysql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
defaultConnMaxIdleTime = 30 * time.Minute
defaultConnMaxLifetime = 30 * time.Minute
defaultWatchBufferLength = 128
defaultWatchBufferWriteTimeout = 1 * time.Second
defaultQuantization = 5 * time.Second
defaultMaxRevisionStalenessPercent = 0.1
defaultEnablePrometheusStats = false
Expand All @@ -29,6 +30,7 @@ type mysqlOptions struct {
gcMaxOperationTime time.Duration
maxRevisionStalenessPercent float64
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
tablePrefix string
enablePrometheusStats bool
maxOpenConns int
Expand All @@ -50,6 +52,7 @@ func generateConfig(options []Option) (mysqlOptions, error) {
gcInterval: defaultGarbageCollectionInterval,
gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
maxOpenConns: defaultMaxOpenConns,
connMaxIdleTime: defaultConnMaxIdleTime,
connMaxLifetime: defaultConnMaxLifetime,
Expand Down Expand Up @@ -86,6 +89,12 @@ func WatchBufferLength(watchBufferLength uint16) Option {
}
}

// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer,
// after which the caller to the watch will be disconnected.
func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
return func(mo *mysqlOptions) { mo.watchBufferWriteTimeout = watchBufferWriteTimeout }
}

// RevisionQuantization is the time bucket size to which advertised
// revisions will be rounded.
//
Expand Down
40 changes: 34 additions & 6 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ const (
//
// All events following afterRevision will be sent to the caller.
func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
updates := make(chan *datastore.RevisionChanges, mds.watchBufferLength)
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = mds.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
Expand All @@ -35,6 +40,33 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
return updates, errs
}

watchBufferWriteTimeout := options.WatchBufferWriteTimeout
if watchBufferWriteTimeout <= 0 {
watchBufferWriteTimeout = mds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
select {
case updates <- change:
return true

default:
// If we cannot immediately write, setup the timer and try again.
}

timer := time.NewTimer(watchBufferWriteTimeout)
defer timer.Stop()

select {
case updates <- change:
return true

case <-timer.C:
errs <- datastore.NewWatchDisconnectedErr()
return false
}
}

go func() {
defer close(updates)
defer close(errs)
Expand All @@ -56,11 +88,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
// Write the staged updates to the channel
for _, changeToWrite := range stagedUpdates {
changeToWrite := changeToWrite

select {
case updates <- &changeToWrite:
default:
errs <- datastore.NewWatchDisconnectedErr()
if !sendChange(&changeToWrite) {
return
}
}
Expand Down
Loading

0 comments on commit dae2369

Please sign in to comment.