Skip to content

Commit

Permalink
Update tabletpicker to support cell pref and tablet order options (vi…
Browse files Browse the repository at this point in the history
…tessio#12282)

* update tabletpicker to support cell pref and tablet order options

Signed-off-by: Priya Bibra <[email protected]>

* add tablet picker options in vstream flags

Signed-off-by: Priya Bibra <[email protected]>

* add tablet picker tests

Signed-off-by: Priya Bibra <[email protected]>

* add summary docs

Signed-off-by: Priya Bibra <[email protected]>

* update proto

Signed-off-by: Priya Bibra <[email protected]>
Signed-off-by: 'Priya Bibra' <[email protected]>

* fix vreplication test

Signed-off-by: Priya Bibra <[email protected]>
Signed-off-by: 'Priya Bibra' <[email protected]>

* fix doc typos, add function comments, update error handling

Signed-off-by: Priya Bibra <[email protected]>
Signed-off-by: 'Priya Bibra' <[email protected]>

* fix tests and comments

Signed-off-by: Priya Bibra <[email protected]>

* fix local cell ref

Signed-off-by: Priya Bibra <[email protected]>

* update proto

Signed-off-by: Priya Bibra <[email protected]>

* add log line to debug test

Signed-off-by: Priya Bibra <[email protected]>

* add cell to vre test def

Signed-off-by: Priya Bibra <[email protected]>

* define vrepl engine for controller tests

Signed-off-by: Priya Bibra <[email protected]>
Signed-off-by: 'Priya Bibra' <[email protected]>

---------

Signed-off-by: Priya Bibra <[email protected]>
Signed-off-by: pbibra <[email protected]>
Signed-off-by: 'Priya Bibra' <[email protected]>
  • Loading branch information
pbibra authored and timvaillancourt committed May 29, 2024
1 parent 31e9e72 commit 8c45856
Show file tree
Hide file tree
Showing 14 changed files with 736 additions and 350 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func testVStreamCellFlag(t *testing.T) {
flags := &vtgatepb.VStreamFlags{}
if tc.cells != "" {
flags.Cells = tc.cells
flags.CellPreference = "onlyspecified"
}

ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
Expand Down
244 changes: 217 additions & 27 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,40 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

type TabletPickerCellPreference int

const (
// PreferLocalWithAlias gives preference to the local cell first, then specified cells, if any.
// This is the default when no other option is provided.
TabletPickerCellPreference_PreferLocalWithAlias TabletPickerCellPreference = iota
// OnlySpecified only picks tablets from the list of cells given.
TabletPickerCellPreference_OnlySpecified
)

type TabletPickerTabletOrder int

const (
// All provided tablet types are given equal priority. This is the default.
TabletPickerTabletOrder_Any TabletPickerTabletOrder = iota
// Provided tablet types are expected to be prioritized in the given order.
TabletPickerTabletOrder_InOrder
)

var (
tabletPickerRetryDelay = 30 * time.Second
muTabletPickerRetryDelay sync.Mutex
globalTPStats *tabletPickerStats
inOrderHint = "in_order:"

tabletPickerCellPreferenceMap = map[string]TabletPickerCellPreference{
"preferlocalwithalias": TabletPickerCellPreference_PreferLocalWithAlias,
"onlyspecified": TabletPickerCellPreference_OnlySpecified,
}

tabletPickerTabletOrderMap = map[string]TabletPickerTabletOrder{
"any": TabletPickerTabletOrder_Any,
"inorder": TabletPickerTabletOrder_InOrder,
}
)

// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment
Expand All @@ -62,18 +91,63 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
tabletPickerRetryDelay = delay
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
// return default if blank
if str == "" {
return TabletPickerCellPreference_PreferLocalWithAlias, nil
}

if c, ok := tabletPickerCellPreferenceMap[strings.ToLower(str)]; ok {
return c, nil
}

return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid cell preference: %v", str)
}

func parseTabletPickerTabletOrderString(str string) (TabletPickerTabletOrder, error) {
// return default if blank
if str == "" {
return TabletPickerTabletOrder_Any, nil
}

if o, ok := tabletPickerTabletOrderMap[strings.ToLower(str)]; ok {
return o, nil
}

return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet order type: %v", str)
}

type localCellInfo struct {
localCell string
cellsInAlias map[string]string
}

// TabletPicker gives a simplified API for picking tablets.
type TabletPicker struct {
ts *topo.Server
cells []string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
ts *topo.Server
cells []string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
}

// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) {
func NewTabletPicker(
ctx context.Context,
ts *topo.Server,
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr)
Expand All @@ -92,19 +166,123 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", ")))
}

// Resolve tablet picker options
cellPref, err := parseTabletPickerCellPreferenceString(options.CellPreference)
if err != nil {
return nil, err
}

// For backward compatibility only parse the options for tablet ordering
// if the in_order hint wasn't already specified. Otherwise it could be overridden.
// We can remove this check once the in_order hint is deprecated.
if !inOrder {
order, err := parseTabletPickerTabletOrderString(options.TabletOrder)
if err != nil {
return nil, err
}
switch order {
case TabletPickerTabletOrder_Any:
inOrder = false
case TabletPickerTabletOrder_InOrder:
inOrder = true
}
}

aliasCellMap := make(map[string]string)
if cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
if localCell == "" {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot have local cell preference without local cell")
}

// Add local cell to the list of cells for tablet picking.
// This will be de-duped later if the local cell already exists in the original list - see: dedupeCells()
cells = append(cells, localCell)
aliasName := topo.GetAliasByCell(ctx, ts, localCell)

// If an alias exists
if aliasName != localCell {
alias, err := ts.GetCellsAlias(ctx, aliasName, false)
if err != nil {
return nil, vterrors.Wrap(err, "error fetching local cell alias")
}

// Add the aliasName to the list of cells for tablet picking.
cells = append(cells, aliasName)

// Create a map of the cells in the alias to make lookup faster later when we're giving preference to these.
// see prioritizeTablets()
for _, c := range alias.Cells {
aliasCellMap[c] = c
}
}
}

return &TabletPicker{
ts: ts,
cells: cells,
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
}, nil
}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
// and exists in the local cell's alias. Can happen if CellPreference is PreferLocalWithAlias.
func dedupeCells(cells []string) []string {
keys := make(map[string]bool)
dedupedCells := []string{}

for _, c := range cells {
if _, value := keys[c]; !value {
keys[c] = true
dedupedCells = append(dedupedCells, c)
}
}
return dedupedCells
}

// prioritizeTablets orders the candidate pool of tablets based on CellPreference.
// If CellPreference is PreferLocalWithAlias then tablets in the local cell will be prioritized for selection,
// followed by the tablets within the local cell's alias, and finally any others specified by the client.
// If CellPreference is OnlySpecified, then tablets will only be selected randomly from the cells specified by the client.
func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, sameAlias, allOthers []*topo.TabletInfo) {
for _, c := range candidates {
if c.Alias.Cell == tp.localCellInfo.localCell {
sameCell = append(sameCell, c)
} else if _, ok := tp.localCellInfo.cellsInAlias[c.Alias.Cell]; ok {
sameAlias = append(sameAlias, c)
} else {
allOthers = append(allOthers, c)
}
}

return sameCell, sameAlias, allOthers
}

func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo.TabletInfo {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})

return candidates
}

// PickForStreaming picks an available tablet.
// All tablets that belong to tp.cells are evaluated and one is
// chosen at random.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
Expand All @@ -116,19 +294,30 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.inOrder {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates
rand.Shuffle(len(candidates), func(i, j int) {
Expand Down Expand Up @@ -204,6 +393,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
actualCells = append(actualCells, cell)
}
}

for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
Expand Down
Loading

0 comments on commit 8c45856

Please sign in to comment.