Skip to content

Commit

Permalink
update solution
Browse files Browse the repository at this point in the history
Signed-off-by: Jun Wang <[email protected]>
  • Loading branch information
Jun Wang committed Dec 15, 2024
1 parent 45192d2 commit e5cdf1a
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 5 deletions.
11 changes: 11 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**

- **[Minor Changes](#minor-changes)**
- **[VTTablet](#vttablet)**
- [VTTablet: Query Consolidation Waiter Cap](#vttablet-consolidator-query-waiter-cap)

## <a id="major-changes"/>Major Changes</a>

Expand Down Expand Up @@ -67,3 +70,11 @@ To upgrade to the newer version of the configuration file, first switch to using
- `twopc_abandon_age` flag now supports values in the time.Duration format (e.g., 1s, 2m, 1h).
While the flag will continue to accept float values (interpreted as seconds) for backward compatibility,
**float inputs are deprecated** and will be removed in a future release.

## <a id="minor-changes"/>Minor Changes

### <a id="vttablet"/>VTTablet

#### <a id="vttablet-consolidator-query-waiter-cap"/>--consolidator-query-waiter-cap flag

A new CLI flag `--consolidator-query-waiter-cap` to set the maximum number of clients allowed to wait on the consolidator. The default value is set to 0 for unlimited wait. Users can adjust this value based on the performance of VTTablet to avoid excessive memory usage and the risk of being OOMKilled, particularly in Kubernetes deployments.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Flags:
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
--consul_auth_static_file string JSON File to read the topos/tokens from.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Flags:
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
--consul_auth_static_file string JSON File to read the topos/tokens from.
Expand Down
10 changes: 9 additions & 1 deletion go/sync2/consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type PendingResult interface {
SetResult(*sqltypes.Result)
Result() *sqltypes.Result
Wait()
AddWaiterCounter(int64) *int64
}

type consolidator struct {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (co *consolidator) Create(query string) (PendingResult, bool) {
defer co.mu.Unlock()
var r *pendingResult
if r, ok := co.queries[query]; ok {
r.AddWaiterCounter(1)
return r, false
}
r = &pendingResult{consolidator: co, query: query}
Expand Down Expand Up @@ -122,17 +124,23 @@ func (rs *pendingResult) Wait() {
rs.executing.RLock()
}

func (rs *pendingResult) AddWaiterCounter(c int64) *int64 {
atomic.AddInt64(rs.consolidator.totalWaiterCount, c)
return rs.consolidator.totalWaiterCount
}

// ConsolidatorCache is a thread-safe object used for counting how often recent
// queries have been consolidated.
// It is also used by the txserializer package to count how often transactions
// have been queued and had to wait because they targeted the same row (range).
type ConsolidatorCache struct {
*cache.LRUCache[*ccount]
totalWaiterCount *int64
}

// NewConsolidatorCache creates a new cache with the given capacity.
func NewConsolidatorCache(capacity int64) *ConsolidatorCache {
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity)}
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity), new(int64)}
}

// Record increments the count for "query" by 1.
Expand Down
31 changes: 31 additions & 0 deletions go/sync2/consolidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,42 @@ package sync2

import (
"reflect"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
)

func TestAddWaiterCount(t *testing.T) {
con := NewConsolidator()
sql := "select * from SomeTable"
pr, _ := con.Create(sql)
var wgAdd sync.WaitGroup
var wgSub sync.WaitGroup

var concurrent = 1000

for i := 0; i < concurrent; i++ {
wgAdd.Add(1)
wgSub.Add(1)
go func() {
defer wgAdd.Done()
pr.AddWaiterCounter(1)
}()
go func() {
defer wgSub.Done()
pr.AddWaiterCounter(-1)
}()
}

wgAdd.Wait()
wgSub.Wait()

if *pr.AddWaiterCounter(0) != 0 {
t.Fatalf("Expect 0 totalWaiterCount but got: %v", *pr.AddWaiterCounter(0))
}
}

func TestConsolidator(t *testing.T) {
con := NewConsolidator()
sql := "select * from SomeTable"
Expand Down
5 changes: 5 additions & 0 deletions go/sync2/fake_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ func (fr *FakePendingResult) SetResult(result *sqltypes.Result) {
func (fr *FakePendingResult) Wait() {
fr.WaitCalls++
}

// AddWaiterCounter is currently a no-op.
func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
return new(int64)
}
12 changes: 8 additions & 4 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,14 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
q.SetErr(err)
}
} else {
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
startTime := time.Now()
q.Wait()
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
waiterCap := qre.tsv.config.ConsolidatorQueryWaiterCap
if waiterCap == 0 || *q.AddWaiterCounter(0) <= waiterCap {
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
startTime := time.Now()
q.Wait()
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
}
q.AddWaiterCounter(-1)
}
if q.Err() != nil {
return nil, q.Err()
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.Int64Var(&currentConfig.ConsolidatorStreamQuerySize, "consolidator-stream-query-size", defaultConfig.ConsolidatorStreamQuerySize, "Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator.")
fs.Int64Var(&currentConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.")

fs.Int64Var(&currentConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
fs.DurationVar(&healthCheckInterval, "health_check_interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
fs.DurationVar(&degradedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
Expand Down Expand Up @@ -320,6 +321,7 @@ type TabletConfig struct {
StreamBufferSize int `json:"streamBufferSize,omitempty"`
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`
Expand Down

0 comments on commit e5cdf1a

Please sign in to comment.