From ba1d81c2f3ac95cd73916e519c9134535354adc2 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Mon, 25 Sep 2023 14:33:06 -0700 Subject: [PATCH] revert custom local cell preference and apply upstream 12282 Signed-off-by: Priya Bibra --- .../vreplication/vreplication_test.go | 1 + go/vt/discovery/tablet_picker.go | 224 ++++-- go/vt/discovery/tablet_picker_test.go | 690 ++++++++---------- go/vt/proto/vtgate/vtgate.pb.go | 149 ++-- go/vt/proto/vtgate/vtgate_vtproto.pb.go | 86 +++ go/vt/vtgate/vstream_manager.go | 50 +- go/vt/vtgate/vstream_manager_test.go | 70 -- .../tabletmanager/vdiff/table_differ.go | 9 +- .../tabletmanager/vreplication/controller.go | 2 +- .../vreplication/controller_test.go | 22 +- go/vt/wrangler/traffic_switcher.go | 2 +- go/vt/wrangler/traffic_switcher_env_test.go | 4 +- go/vt/wrangler/vdiff.go | 4 +- proto/vtgate.proto | 2 + tools/rowlog/rowlog.go | 4 +- 15 files changed, 689 insertions(+), 630 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index a3a88a7fc7f..bb9cd29a611 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -287,6 +287,7 @@ func testVStreamCellFlag(t *testing.T) { flags := &vtgatepb.VStreamFlags{} if tc.cells != "" { flags.Cells = tc.cells + flags.CellPreference = "onlyspecified" } ctx2, cancel := context.WithTimeout(ctx, 30*time.Second) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 3435ac5ea7e..a24637a6edc 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -41,12 +41,40 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +type TabletPickerCellPreference int + +const ( + // PreferLocalWithAlias gives preference to the local cell first, then specified cells, if any. + // This is the default when no other option is provided. + TabletPickerCellPreference_PreferLocalWithAlias TabletPickerCellPreference = iota + // OnlySpecified only picks tablets from the list of cells given. + TabletPickerCellPreference_OnlySpecified +) + +type TabletPickerTabletOrder int + +const ( + // All provided tablet types are given equal priority. This is the default. + TabletPickerTabletOrder_Any TabletPickerTabletOrder = iota + // Provided tablet types are expected to be prioritized in the given order. + TabletPickerTabletOrder_InOrder +) + var ( tabletPickerRetryDelay = 30 * time.Second muTabletPickerRetryDelay sync.Mutex globalTPStats *tabletPickerStats inOrderHint = "in_order:" - localPreferenceHint = "local:" + + tabletPickerCellPreferenceMap = map[string]TabletPickerCellPreference{ + "preferlocalwithalias": TabletPickerCellPreference_PreferLocalWithAlias, + "onlyspecified": TabletPickerCellPreference_OnlySpecified, + } + + tabletPickerTabletOrderMap = map[string]TabletPickerTabletOrder{ + "any": TabletPickerTabletOrder_Any, + "inorder": TabletPickerTabletOrder_InOrder, + } ) // GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment @@ -63,19 +91,63 @@ func SetTabletPickerRetryDelay(delay time.Duration) { tabletPickerRetryDelay = delay } +type TabletPickerOptions struct { + CellPreference string + TabletOrder string +} + +func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) { + // return default if blank + if str == "" { + return TabletPickerCellPreference_PreferLocalWithAlias, nil + } + + if c, ok := tabletPickerCellPreferenceMap[strings.ToLower(str)]; ok { + return c, nil + } + + return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid cell preference: %v", str) +} + +func parseTabletPickerTabletOrderString(str string) (TabletPickerTabletOrder, error) { + // return default if blank + if str == "" { + return TabletPickerTabletOrder_Any, nil + } + + if o, ok := tabletPickerTabletOrderMap[strings.ToLower(str)]; ok { + return o, nil + } + + return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet order type: %v", str) +} + +type localCellInfo struct { + localCell string + cellsInAlias map[string]string +} + // TabletPicker gives a simplified API for picking tablets. type TabletPicker struct { - ts *topo.Server - cells []string - keyspace string - shard string - tabletTypes []topodatapb.TabletType - inOrder bool - localPreference string + ts *topo.Server + cells []string + keyspace string + shard string + tabletTypes []topodatapb.TabletType + inOrder bool + cellPref TabletPickerCellPreference + localCellInfo localCellInfo } // NewTabletPicker returns a TabletPicker. -func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { +func NewTabletPicker( + ctx context.Context, + ts *topo.Server, + cells []string, + localCell, keyspace, shard, tabletTypesStr string, + options TabletPickerOptions, +) (*TabletPicker, error) { + // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr) if err != nil { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr) @@ -95,37 +167,100 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", "))) } - localPreference := "" - if strings.HasPrefix(cells[0], localPreferenceHint) { - localPreference = cells[0][len(localPreferenceHint):] - cells = cells[1:] - // Add the local cell to the list of cells - // This may result in the local cell appearing twice if it already exists as part of an alias, - // but cells will get deduped during tablet selection. See GetMatchingTablets() -> tp.dedupeCells() - cells = append(cells, localPreference) + // Resolve tablet picker options + cellPref, err := parseTabletPickerCellPreferenceString(options.CellPreference) + if err != nil { + return nil, err + } + + // For backward compatibility only parse the options for tablet ordering + // if the in_order hint wasn't already specified. Otherwise it could be overridden. + // We can remove this check once the in_order hint is deprecated. + if !inOrder { + order, err := parseTabletPickerTabletOrderString(options.TabletOrder) + if err != nil { + return nil, err + } + switch order { + case TabletPickerTabletOrder_Any: + inOrder = false + case TabletPickerTabletOrder_InOrder: + inOrder = true + } + } + + aliasCellMap := make(map[string]string) + if cellPref == TabletPickerCellPreference_PreferLocalWithAlias { + if localCell == "" { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot have local cell preference without local cell") + } + + // Add local cell to the list of cells for tablet picking. + // This will be de-duped later if the local cell already exists in the original list - see: dedupeCells() + cells = append(cells, localCell) + aliasName := topo.GetAliasByCell(ctx, ts, localCell) + + // If an alias exists + if aliasName != localCell { + alias, err := ts.GetCellsAlias(ctx, aliasName, false) + if err != nil { + return nil, vterrors.Wrap(err, "error fetching local cell alias") + } + + // Add the aliasName to the list of cells for tablet picking. + cells = append(cells, aliasName) + + // Create a map of the cells in the alias to make lookup faster later when we're giving preference to these. + // see prioritizeTablets() + for _, c := range alias.Cells { + aliasCellMap[c] = c + } + } } return &TabletPicker{ - ts: ts, - cells: cells, - keyspace: keyspace, - shard: shard, - tabletTypes: tabletTypes, - inOrder: inOrder, - localPreference: localPreference, + ts: ts, + cells: dedupeCells(cells), + localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, + keyspace: keyspace, + shard: shard, + tabletTypes: tabletTypes, + inOrder: inOrder, + cellPref: cellPref, }, nil } -func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, allOthers []*topo.TabletInfo) { +// dedupeCells is used to remove duplicates in the cell list in case it is passed in +// and exists in the local cell's alias. Can happen if CellPreference is PreferLocalWithAlias. +func dedupeCells(cells []string) []string { + keys := make(map[string]bool) + dedupedCells := []string{} + + for _, c := range cells { + if _, value := keys[c]; !value { + keys[c] = true + dedupedCells = append(dedupedCells, c) + } + } + return dedupedCells +} + +// prioritizeTablets orders the candidate pool of tablets based on CellPreference. +// If CellPreference is PreferLocalWithAlias then tablets in the local cell will be prioritized for selection, +// followed by the tablets within the local cell's alias, and finally any others specified by the client. +// If CellPreference is OnlySpecified, then tablets will only be selected randomly from the cells specified by the client. +func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, sameAlias, allOthers []*topo.TabletInfo) { for _, c := range candidates { - if c.Alias.Cell == tp.localPreference { + if c.Alias.Cell == tp.localCellInfo.localCell { sameCell = append(sameCell, c) + } else if _, ok := tp.localCellInfo.cellsInAlias[c.Alias.Cell]; ok { + sameAlias = append(sameAlias, c) } else { allOthers = append(allOthers, c) } } - return sameCell, allOthers + return sameCell, sameAlias, allOthers } func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo.TabletInfo { @@ -146,8 +281,8 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo } // PickForStreaming picks an available tablet -// All tablets that belong to tp.cells are evaluated and one is -// chosen at random, unless local preference is given +// Selection is based on CellPreference. +// See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { rand.Seed(time.Now().UnixNano()) // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found @@ -159,25 +294,28 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table default: } candidates := tp.GetMatchingTablets(ctx) - // we'd like to prioritize same cell tablets - if tp.localPreference != "" { - sameCellCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) + if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias { + sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) - // order same cell and all others by tablet type separately - // combine with same cell in front if tp.inOrder { sameCellCandidates = tp.orderByTabletType(sameCellCandidates) + sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) allOtherCandidates = tp.orderByTabletType(allOtherCandidates) } else { // Randomize candidates rand.Shuffle(len(sameCellCandidates), func(i, j int) { sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] }) + rand.Shuffle(len(sameAliasCandidates), func(i, j int) { + sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i] + }) rand.Shuffle(len(allOtherCandidates), func(i, j int) { allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] }) } - candidates = append(sameCellCandidates, allOtherCandidates...) + + candidates = append(sameCellCandidates, sameAliasCandidates...) + candidates = append(candidates, allOtherCandidates...) } else if tp.inOrder { candidates = tp.orderByTabletType(candidates) } else { @@ -255,9 +393,6 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn actualCells = append(actualCells, cell) } } - // Just in case a cell was passed in addition to its alias. - // Can happen if localPreference is not "". See NewTabletPicker - actualCells = tp.dedupeCells(actualCells) for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) @@ -301,19 +436,6 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn return tablets } -func (tp *TabletPicker) dedupeCells(cells []string) []string { - keys := make(map[string]bool) - dedupedCells := []string{} - - for _, c := range cells { - if _, value := keys[c]; !value { - keys[c] = true - dedupedCells = append(dedupedCells, c) - } - } - return dedupedCells -} - func init() { // TODO(sougou): consolidate this call to be once per process. rand.Seed(time.Now().UnixNano()) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index c81f1303ec4..88368c02a60 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -1,12 +1,9 @@ /* Copyright 2019 The Vitess Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,169 +28,6 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" ) -func TestPickSimple(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) - - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) -} - -func TestPickFromTwoHealthy(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") - require.NoError(t, err) - - // In 20 attempts, both tablet types must be picked at least once. - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.True(t, picked1) - assert.True(t, picked2) -} - -func TestPickInOrder1(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:replica,rdonly") - require.NoError(t, err) - - // In 20 attempts, we always pick the first healthy tablet in order - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.True(t, picked1) - assert.False(t, picked2) -} - -func TestPickInOrder2(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica") - require.NoError(t, err) - - // In 20 attempts, we always pick the first healthy tablet in order - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.False(t, picked1) - assert.True(t, picked2) -} - -func TestPickInOrderMultipleInGroup(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want2) - want3 := addTablet(te, 102, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want3) - want4 := addTablet(te, 103, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(t, te, want4) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica") - require.NoError(t, err) - - // In 40 attempts, we pick each of the three RDONLY, but never the REPLICA - var picked1, picked2, picked3, picked4 bool - for i := 0; i < 40; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - if proto.Equal(tablet, want3) { - picked3 = true - } - if proto.Equal(tablet, want4) { - picked4 = true - } - } - assert.False(t, picked1) - assert.True(t, picked2) - assert.True(t, picked3) - assert.True(t, picked4) -} - -func TestPickRespectsTabletType(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want) - dont := addTablet(te, 101, topodatapb.TabletType_PRIMARY, "cell", true, true) - defer deleteTablet(t, te, dont) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") - require.NoError(t, err) - - // In 20 attempts, primary tablet must be never picked - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - require.NotNil(t, tablet) - require.True(t, proto.Equal(tablet, want), "picked wrong tablet type") - } -} - -func TestPickMultiCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) -} - func TestPickPrimary(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_PRIMARY, "cell", true, true) @@ -206,7 +40,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(te.topoServ, []string{"otherCell"}, te.keyspace, te.shard, "primary") + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) @@ -216,67 +50,278 @@ func TestPickPrimary(t *testing.T) { assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } -func TestPickFromOtherCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(t, te, want) +func TestPickLocalPreferences(t *testing.T) { + type tablet struct { + id uint32 + typ topodatapb.TabletType + cell string + } - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) + type testCase struct { + name string - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) -} + //inputs + tablets []tablet + envCells []string + inCells []string + localCell string + inTabletTypes string + options TabletPickerOptions -func TestDontPickFromOtherCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(t, te, want2) + //expected + tpCells []string + wantTablets []uint32 + } - tp, err := NewTabletPicker(te.topoServ, []string{"cell"}, te.keyspace, te.shard, "replica") - require.NoError(t, err) + tcases := []testCase{ + { + name: "pick simple", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick from two healthy", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100, 101}, + }, { + name: "pick in order replica", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "in_order:replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick in order rdonly", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "in_order:rdonly,replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{101}, + }, { + name: "pick in order multiple in group", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + {102, topodatapb.TabletType_RDONLY, "cell"}, + {103, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "in_order:rdonly,replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{101, 102, 103}, + }, { + // Same test as above, except the in order preference is passed via the new TabletPickerOptions param. + // This will replace the above test when we deprecate the "in_order" hint in the tabletTypeStr + name: "pick in order multiple in group with new picker option", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "cell"}, + {102, topodatapb.TabletType_RDONLY, "cell"}, + {103, topodatapb.TabletType_RDONLY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "rdonly,replica", + options: TabletPickerOptions{TabletOrder: "InOrder"}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{101, 102, 103}, + }, { + name: "picker respects tablet type", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_PRIMARY, "cell"}, + }, + envCells: []string{"cell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick multi cell", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "pick from other cell", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "don't pick from other cell", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "multi cell two tablets, local preference default", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "multi cell two tablets, only specified cells", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_REPLICA, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica", + options: TabletPickerOptions{CellPreference: "OnlySpecified"}, + tpCells: []string{"cell", "otherCell"}, + wantTablets: []uint32{100, 101}, + }, { + name: "multi cell two tablet types, local preference default", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{}, + tpCells: []string{"cell", "otherCell", "cella"}, + wantTablets: []uint32{100}, + }, { + name: "multi cell two tablet types, only specified cells", + tablets: []tablet{ + {100, topodatapb.TabletType_REPLICA, "cell"}, + {101, topodatapb.TabletType_RDONLY, "otherCell"}, + }, + envCells: []string{"cell", "otherCell"}, + inCells: []string{"cell", "otherCell"}, + localCell: "cell", + inTabletTypes: "replica,rdonly", + options: TabletPickerOptions{CellPreference: "OnlySpecified"}, + tpCells: []string{"cell", "otherCell"}, + wantTablets: []uint32{100, 101}, + }, + } - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() + ctx := context.Background() + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + te := newPickerTestEnv(t, tcase.envCells) + var testTablets []*topodatapb.Tablet + for _, tab := range tcase.tablets { + testTablets = append(testTablets, addTablet(te, int(tab.id), tab.typ, tab.cell, true, true)) + } + defer func() { + for _, tab := range testTablets { + deleteTablet(t, te, tab) + } + }() + tp, err := NewTabletPicker(context.Background(), te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options) + require.NoError(t, err) + require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) + require.ElementsMatch(t, tp.cells, tcase.tpCells) - // In 20 attempts, only want1 must be picked because TabletPicker.cells = "cell" - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } + var selectedTablets []uint32 + selectedTabletMap := make(map[uint32]bool) + for i := 0; i < 40; i++ { + tab, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + selectedTabletMap[tab.Alias.Uid] = true + } + for uid := range selectedTabletMap { + selectedTablets = append(selectedTablets, uid) + } + require.ElementsMatch(t, selectedTablets, tcase.wantTablets) + }) } - assert.True(t, picked1) - assert.False(t, picked2) } -func TestPickMultiCellTwoTablets(t *testing.T) { +func TestPickCellPreferenceLocalCell(t *testing.T) { + // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(t, te, want2) - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + // Local cell preference is default + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() + ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel1() + tablet, err := tp.PickForStreaming(ctx1) + require.NoError(t, err) + assert.True(t, proto.Equal(want1, tablet), "Pick: %v, want %v", tablet, want1) + + // create a tablet in the other cell + want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(t, te, want2) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() - // In 20 attempts, both tablet types must be picked at least once. + // In 20 attempts, only tablet in "cell" will be picked because we give local cell priority by default var picked1, picked2 bool for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(ctx) + tablet, err := tp.PickForStreaming(ctx2) require.NoError(t, err) if proto.Equal(tablet, want1) { picked1 = true @@ -286,45 +331,32 @@ func TestPickMultiCellTwoTablets(t *testing.T) { } } assert.True(t, picked1) - assert.True(t, picked2) + assert.False(t, picked2) } -func TestPickMultiCellTwoTabletTypes(t *testing.T) { +func TestPickCellPreferenceLocalAlias(t *testing.T) { + // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(t, te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "otherCell", true, true) - defer deleteTablet(t, te, want2) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) + // create a tablet in the other cell, it should be picked + want := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(t, te, want) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - - // In 20 attempts, both tablet types must be picked at least once. - var picked1, picked2 bool - for i := 0; i < 20; i++ { - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - if proto.Equal(tablet, want1) { - picked1 = true - } - if proto.Equal(tablet, want2) { - picked2 = true - } - } - assert.True(t, picked1) - assert.True(t, picked2) + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } -func TestPickUsingCellAlias(t *testing.T) { +func TestPickUsingCellAliasOnlySpecified(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(context.Background(), te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) require.NoError(t, err) ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond) @@ -348,7 +380,8 @@ func TestPickUsingCellAlias(t *testing.T) { ctx3, cancel3 := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel3() - // In 20 attempts, both tablet types must be picked at least once. + // In 20 attempts each of the tablets should get picked at least once. + // Local cell is not given preference var picked1, picked2 bool for i := 0; i < 20; i++ { tablet, err := tp.PickForStreaming(ctx3) @@ -364,159 +397,9 @@ func TestPickUsingCellAlias(t *testing.T) { assert.True(t, picked2) } -func TestPickLocalPreferences(t *testing.T) { - type tablet struct { - id uint32 - typ topodatapb.TabletType - cell string - } - - type testCase struct { - name string - - //inputs - tablets []tablet - inCells []string - inTabletTypes string - - //expected - tpLocalPreference string - tpCells []string - wantTablets []uint32 - } - - tcases := []testCase{ - { - name: "local preference", - tablets: []tablet{ - {101, topodatapb.TabletType_REPLICA, "cell1"}, - {102, topodatapb.TabletType_REPLICA, "cell2"}, - {103, topodatapb.TabletType_REPLICA, "cell2"}, - }, - inCells: []string{"local:cell2", "cell1"}, - inTabletTypes: "replica", - tpLocalPreference: "cell2", - tpCells: []string{"cell1", "cell2"}, - wantTablets: []uint32{102, 103}, - }, - { - name: "local preference with cell alias", - tablets: []tablet{ - {101, topodatapb.TabletType_REPLICA, "cell1"}, - {102, topodatapb.TabletType_REPLICA, "cell2"}, - }, - inCells: []string{"local:cell2", "cella"}, - inTabletTypes: "replica", - tpLocalPreference: "cell2", - tpCells: []string{"cella", "cell2"}, - wantTablets: []uint32{102}, - }, - { - name: "local preference with tablet type ordering, replica", - tablets: []tablet{ - {101, topodatapb.TabletType_REPLICA, "cell1"}, - {102, topodatapb.TabletType_REPLICA, "cell1"}, - {103, topodatapb.TabletType_PRIMARY, "cell2"}, - {104, topodatapb.TabletType_REPLICA, "cell2"}, - }, - inCells: []string{"local:cell2", "cella"}, - inTabletTypes: "in_order:replica,primary", - tpLocalPreference: "cell2", - tpCells: []string{"cella", "cell2"}, - wantTablets: []uint32{104}, - }, - { - name: "no local preference with tablet type ordering, primary", - tablets: []tablet{ - {101, topodatapb.TabletType_REPLICA, "cell1"}, - {102, topodatapb.TabletType_PRIMARY, "cell1"}, - {103, topodatapb.TabletType_REPLICA, "cell2"}, - {104, topodatapb.TabletType_REPLICA, "cell2"}, - }, - inCells: []string{"cell2", "cella"}, - inTabletTypes: "in_order:primary,replica", - tpLocalPreference: "", - tpCells: []string{"cella", "cell2"}, - wantTablets: []uint32{102}, - }, - { - name: "local preference with tablet type ordering, primary in local", - tablets: []tablet{ - {101, topodatapb.TabletType_REPLICA, "cell1"}, - {102, topodatapb.TabletType_REPLICA, "cell1"}, - {103, topodatapb.TabletType_PRIMARY, "cell2"}, - {104, topodatapb.TabletType_REPLICA, "cell2"}, - }, - inCells: []string{"local:cell2", "cella"}, - inTabletTypes: "in_order:primary,replica", - tpLocalPreference: "cell2", - tpCells: []string{"cella", "cell2"}, - wantTablets: []uint32{103}, - }, - { - name: "local preference with tablet type ordering, primary not local", - tablets: []tablet{ - {101, topodatapb.TabletType_PRIMARY, "cell1"}, - {102, topodatapb.TabletType_REPLICA, "cell1"}, - {103, topodatapb.TabletType_REPLICA, "cell2"}, - {104, topodatapb.TabletType_REPLICA, "cell2"}, - }, - inCells: []string{"local:cell2", "cella"}, - inTabletTypes: "in_order:primary,replica", - tpLocalPreference: "cell2", - tpCells: []string{"cella", "cell2"}, - wantTablets: []uint32{103, 104}, // replicas are picked because primary is not in the local cell/cell alias - }, - { - name: "local preference with tablet type ordering, primary in local's alias", - tablets: []tablet{ - {101, topodatapb.TabletType_PRIMARY, "cell1"}, - {102, topodatapb.TabletType_REPLICA, "cell1"}, - }, - inCells: []string{"local:cell2", "cella"}, - inTabletTypes: "in_order:primary,replica", - tpLocalPreference: "cell2", - tpCells: []string{"cella", "cell2"}, - wantTablets: []uint32{101}, // primary found since there are no tablets in cell/cell alias - }, - } - - ctx := context.Background() - for _, tcase := range tcases { - t.Run(tcase.name, func(t *testing.T) { - cells := []string{"cell1", "cell2"} - te := newPickerTestEnv(t, cells) - var testTablets []*topodatapb.Tablet - for _, tab := range tcase.tablets { - testTablets = append(testTablets, addTablet(te, int(tab.id), tab.typ, tab.cell, true, true)) - } - defer func() { - for _, tab := range testTablets { - deleteTablet(t, te, tab) - } - }() - tp, err := NewTabletPicker(te.topoServ, tcase.inCells, te.keyspace, te.shard, tcase.inTabletTypes) - require.NoError(t, err) - require.Equal(t, tp.localPreference, tcase.tpLocalPreference) - require.ElementsMatch(t, tp.cells, tcase.tpCells) - var selectedTablets []uint32 - selectedTabletMap := make(map[uint32]bool) - for i := 0; i < 20; i++ { - tab, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - selectedTabletMap[tab.Alias.Uid] = true - } - for uid := range selectedTabletMap { - selectedTablets = append(selectedTablets, uid) - } - require.ElementsMatch(t, selectedTablets, tcase.wantTablets) - }) - } -} - func TestTabletAppearsDuringSleep(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -542,12 +425,12 @@ func TestTabletAppearsDuringSleep(t *testing.T) { assert.True(t, proto.Equal(want, got), "Pick: %v, want %v", got, want) } -func TestPickError(t *testing.T) { +func TestPickErrorLocalPreferenceDefault(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) - _, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "badtype") + _, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -566,6 +449,33 @@ func TestPickError(t *testing.T) { defer cancel() _, err = tp.PickForStreaming(ctx) require.EqualError(t, err, "context has expired") + // if local preference is selected, tp cells include's the local cell's alias + require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell_cella.ks.0.replica"], int64(0)) +} + +func TestPickErrorOnlySpecified(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) + require.NoError(t, err) + delay := GetTabletPickerRetryDelay() + defer func() { + SetTabletPickerRetryDelay(delay) + }() + SetTabletPickerRetryDelay(11 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + // no tablets + _, err = tp.PickForStreaming(ctx) + require.EqualError(t, err, "context has expired") + // no tablets of the correct type + defer deleteTablet(t, te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true)) + ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + _, err = tp.PickForStreaming(ctx) + require.EqualError(t, err, "context has expired") + require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index e83f441723a..f629717fab0 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -1074,7 +1074,9 @@ type VStreamFlags struct { StopOnReshard bool `protobuf:"varint,3,opt,name=stop_on_reshard,json=stopOnReshard,proto3" json:"stop_on_reshard,omitempty"` // if specified, these cells (comma-separated) are used to pick source tablets from. // defaults to the cell of the vtgate serving the VStream API. - Cells string `protobuf:"bytes,4,opt,name=cells,proto3" json:"cells,omitempty"` + Cells string `protobuf:"bytes,4,opt,name=cells,proto3" json:"cells,omitempty"` + CellPreference string `protobuf:"bytes,5,opt,name=cell_preference,json=cellPreference,proto3" json:"cell_preference,omitempty"` + TabletOrder string `protobuf:"bytes,6,opt,name=tablet_order,json=tabletOrder,proto3" json:"tablet_order,omitempty"` } func (x *VStreamFlags) Reset() { @@ -1137,6 +1139,20 @@ func (x *VStreamFlags) GetCells() string { return "" } +func (x *VStreamFlags) GetCellPreference() string { + if x != nil { + return x.CellPreference + } + return "" +} + +func (x *VStreamFlags) GetTabletOrder() string { + if x != nil { + return x.TabletOrder + } + return "" +} + // VStreamRequest is the payload for VStream. type VStreamRequest struct { state protoimpl.MessageState @@ -1797,7 +1813,7 @@ var file_vtgate_proto_rawDesc = []byte{ 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x74, 0x69, 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa0, 0x01, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xec, 0x01, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x5f, 0x73, 0x6b, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x53, 0x6b, 0x65, 0x77, 0x12, 0x2d, 0x0a, @@ -1807,68 +1823,73 @@ var file_vtgate_proto_rawDesc = []byte{ 0x73, 0x74, 0x6f, 0x70, 0x5f, 0x6f, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x73, 0x74, 0x6f, 0x70, 0x4f, 0x6e, 0x52, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x65, 0x6c, 0x6c, 0x73, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x63, 0x65, 0x6c, 0x6c, 0x73, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x56, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, - 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, - 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, 0x74, - 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, - 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, - 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, - 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x66, - 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, - 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, - 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, - 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, 0x6c, - 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, - 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, - 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, - 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, - 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, - 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, - 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, - 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, - 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, - 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, - 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, - 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, - 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, - 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, - 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, - 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, - 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, - 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, - 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, - 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, - 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, - 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, - 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, - 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, - 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x63, 0x65, 0x6c, 0x6c, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x65, + 0x6c, 0x6c, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x65, 0x6c, 0x6c, 0x50, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, + 0x6e, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x6f, 0x72, + 0x64, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, + 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, + 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, + 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, + 0x70, 0x65, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, + 0x0a, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, + 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, + 0x52, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, + 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, + 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, + 0x74, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, + 0x3d, 0x0a, 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, + 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, + 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, + 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, + 0x65, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x2e, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, + 0x65, 0x72, 0x79, 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, + 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, + 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, + 0x6c, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, + 0x6e, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, + 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, + 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, + 0x3d, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, + 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, + 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, + 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, + 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, + 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, + 0x50, 0x43, 0x10, 0x03, 0x2a, 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, + 0x64, 0x65, 0x72, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, + 0x07, 0x0a, 0x03, 0x50, 0x52, 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, + 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, + 0x10, 0x03, 0x42, 0x36, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, + 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index 121ec1ccd65..545478ed8ad 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -976,6 +976,20 @@ func (m *VStreamFlags) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.TabletOrder) > 0 { + i -= len(m.TabletOrder) + copy(dAtA[i:], m.TabletOrder) + i = encodeVarint(dAtA, i, uint64(len(m.TabletOrder))) + i-- + dAtA[i] = 0x32 + } + if len(m.CellPreference) > 0 { + i -= len(m.CellPreference) + copy(dAtA[i:], m.CellPreference) + i = encodeVarint(dAtA, i, uint64(len(m.CellPreference))) + i-- + dAtA[i] = 0x2a + } if len(m.Cells) > 0 { i -= len(m.Cells) copy(dAtA[i:], m.Cells) @@ -1766,6 +1780,14 @@ func (m *VStreamFlags) SizeVT() (n int) { if l > 0 { n += 1 + l + sov(uint64(l)) } + l = len(m.CellPreference) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + l = len(m.TabletOrder) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } if m.unknownFields != nil { n += len(m.unknownFields) } @@ -4604,6 +4626,70 @@ func (m *VStreamFlags) UnmarshalVT(dAtA []byte) error { } m.Cells = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CellPreference", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CellPreference = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TabletOrder", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TabletOrder = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 3fb5e0ac9b4..a9e0f651e6a 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -116,9 +116,10 @@ type vstream struct { vsm *vstreamManager - eventCh chan []*binlogdatapb.VEvent - heartbeatInterval uint32 - ts *topo.Server + eventCh chan []*binlogdatapb.VEvent + heartbeatInterval uint32 + ts *topo.Server + tabletPickerOptions discovery.TabletPickerOptions } type journalEvent struct { @@ -176,6 +177,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, copyCompletedShard: make(map[string]struct{}), + tabletPickerOptions: discovery.TabletPickerOptions{ + CellPreference: flags.GetCellPreference(), + TabletOrder: flags.GetTabletOrder(), + }, } return vs.stream(ctx) } @@ -455,46 +460,19 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, } } -// getCells determines the availability zones to select tablets from -// 2 scenarios: -// -// 1. No cells specified by the client via the gRPC request: -// Tablets from the local cell AND the cell alias that the VTGate's local cell belongs to will be selected. -// The local cell of the VTGate will take precendence over any other cell in the alias. -// -// 2. Cells are specified by the client via the gRPC request -// These cells will take precendence over the default cell and its alias and only tablets belonging to -// the specified cells will be selected. -// If the "local" tag is passed in as an option in the list of optCells, -// the local cell of the VTGate will take precedence over any other cell specified. -func (vs *vstream) getCells(ctx context.Context) []string { +func (vs *vstream) getCells() []string { var cells []string if vs.optCells != "" { - for i, cell := range strings.Split(strings.TrimSpace(vs.optCells), ",") { - // if the local tag is passed in, we must give local cell priority - // during tablet selection. Append the VTGate's local cell to the list of cells - if i == 0 && cell == "local" { - cells = append(cells, fmt.Sprintf("local:%s", vs.vsm.cell)) - continue - } + for _, cell := range strings.Split(strings.TrimSpace(vs.optCells), ",") { cells = append(cells, strings.TrimSpace(cell)) } } - // if no cell override provided in gRPC request, perform cell alias fallback if len(cells) == 0 { - log.Info("[VSTREAM MANAGER] no cells specified by client, falling back to alias...") - // append the alias this cell belongs to, otherwise appends the vtgate's cell - alias := topo.GetAliasByCell(ctx, vs.ts, vs.vsm.cell) - // an alias was actually found - if alias != vs.vsm.cell { - // send in the vtgate's cell for local cell preference - cells = append(cells, fmt.Sprintf("local:%s", vs.vsm.cell)) - } - cells = append(cells, alias) + // use the vtgate's cell by default + cells = append(cells, vs.vsm.cell) } - log.Infof("[VSTREAM MANAGER] cells to pick from %v", cells) return cells } @@ -519,8 +497,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error - cells := vs.getCells(ctx) - tp, err := discovery.NewTabletPicker(vs.ts, cells, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + cells := vs.getCells() + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) if err != nil { log.Errorf(err.Error()) return err diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index c9fe153c8b8..754c604c5f5 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -275,76 +275,6 @@ func TestVStreamChunks(t *testing.T) { assert.Equal(t, int32(100), ddlCount.Get()) } -func TestVStreamManagerGetCells(t *testing.T) { - type testArgs struct { - name string - optCells string - cellAlias bool - resultCells []string - } - - tcases := []*testArgs{ - {"default-local", "", false, []string{"aa"}}, - {"default-local-cell-alias", "", true, []string{"local:aa", "region1"}}, - {"with-opt-cells", "local,bb,cc", true, []string{"local:aa", "bb", "cc"}}, - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cell := "aa" - - ks := "TestVStream" - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) - st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa", true) - ts, _ := st.GetTopoServer() - - for _, tcase := range tcases { - t.Run(tcase.name, func(t *testing.T) { - vs := &vstream{ - vgtid: nil, - tabletType: topodatapb.TabletType_PRIMARY, - optCells: tcase.optCells, - filter: nil, - send: nil, - resolver: nil, - journaler: nil, - minimizeSkew: false, - stopOnReshard: true, - skewTimeoutSeconds: 0, - timestamps: nil, - vsm: vsm, - eventCh: nil, - heartbeatInterval: 0, - ts: ts, - } - - if tcase.cellAlias { - cellsAlias := &topodatapb.CellsAlias{ - Cells: []string{"aa", "bb"}, - } - - assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region1", cellsAlias), "failed to create cell alias") - defer cleanupGetCellTests(t, ts, "region1") - } - - got := vs.getCells(ctx) - assert.Equal(t, len(got), len(tcase.resultCells)) - assert.Equal(t, got, tcase.resultCells) - }) - } -} - -func cleanupGetCellTests(t *testing.T, ts *topo.Server, region string) { - if region != "" { - if err := ts.DeleteCellsAlias(context.Background(), region); err != nil { - t.Logf("DeleteCellsAlias(%s) failed: %v", region, err) - } - } -} - func TestVStreamMulti(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 251f2332eb7..8389c169908 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -196,7 +196,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri defer wg.Done() err1 = td.forEachSource(func(source *migrationSource) error { // TODO: handle external sources to support Mount+Migrate - tablet, err := pickTablet(ctx, ct.ts, cell, ct.sourceKeyspace, source.shard, tabletTypes) + tablet, err := pickTablet(ctx, ct.ts, cell, ct.vde.thisTablet.Alias.Cell, ct.sourceKeyspace, source.shard, tabletTypes) if err != nil { return err } @@ -208,8 +208,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri wg.Add(1) go func() { defer wg.Done() - tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Keyspace, - ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) + tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Alias.Cell, ct.vde.thisTablet.Keyspace, ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) if err2 != nil { return } @@ -226,8 +225,8 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri return err2 } -func pickTablet(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ts, []string{cell}, keyspace, shard, tabletTypes) +func pickTablet(ctx context.Context, ts *topo.Server, cell, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { + tp, err := discovery.NewTabletPicker(ctx, ts, []string{cell}, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 6b9a2c4ddc8..f96fe5ba19d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -134,7 +134,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(sourceTopo, cells, ct.source.Keyspace, ct.source.Shard, tabletTypesStr) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index 51051d9b964..3ecf99c97ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -86,8 +86,9 @@ func TestControllerKeyRange(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) if err != nil { t.Fatal(err) } @@ -147,8 +148,9 @@ func TestControllerTables(t *testing.T) { }, }, } + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) if err != nil { t.Fatal(err) } @@ -215,8 +217,9 @@ func TestControllerOverrides(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) if err != nil { t.Fatal(err) } @@ -230,7 +233,8 @@ func TestControllerOverrides(t *testing.T) { } func TestControllerCanceledContext(t *testing.T) { - defer deleteTablet(addTablet(100)) + wantTablet := addTablet(100) + defer deleteTablet(wantTablet) params := map[string]string{ "id": "1", @@ -240,7 +244,9 @@ func TestControllerCanceledContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, nil) + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, nil, nil, nil, "", nil) + + ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, vre) if err != nil { t.Fatal(err) } @@ -283,8 +289,9 @@ func TestControllerRetry(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) if err != nil { t.Fatal(err) } @@ -339,8 +346,9 @@ func TestControllerStopPosition(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) if err != nil { t.Fatal(err) } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index af4eb8d7bc4..e12ca6f394c 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -407,7 +407,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(wr.ts, cells, keyspace, shard.ShardName(), tabletTypes) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index d01dbd9a927..995442bbd17 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -433,7 +433,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { tme.dbSourceClients = append(tme.dbSourceClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } // Replace existing engine with a new one - primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, "", primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) + primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, primary.Tablet.GetAlias().GetCell(), primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) primary.TM.VREngine.Open(ctx) } for _, primary := range tme.targetPrimaries { @@ -442,7 +442,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { tme.dbTargetClients = append(tme.dbTargetClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } // Replace existing engine with a new one - primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, "", primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) + primary.TM.VREngine = vreplication.NewTestEngine(tme.ts, primary.Tablet.GetAlias().GetCell(), primary.FakeMysqlDaemon, dbClientFactory, dbClientFactory, dbclient.DBName(), nil) primary.TM.VREngine.Open(ctx) } tme.allDBClients = append(tme.dbSourceClients, tme.dbTargetClients...) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 4e791e4408d..cae9f0b83a2 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -687,7 +687,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(sourceTopo, []string{df.sourceCell}, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } @@ -705,7 +705,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(df.ts.TopoServer(), []string{df.targetCell}, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } diff --git a/proto/vtgate.proto b/proto/vtgate.proto index d42e7f451db..8f79d419862 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -282,6 +282,8 @@ message VStreamFlags { // if specified, these cells (comma-separated) are used to pick source tablets from. // defaults to the cell of the vtgate serving the VStream API. string cells = 4; + string cell_preference = 5; + string tablet_order = 6; } // VStreamRequest is the payload for VStream. diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 61ae6f500a5..e9f328c9c8a 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -372,7 +372,9 @@ func getFlavor(ctx context.Context, server, keyspace string) string { } func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace string) string { - picker, err := discovery.NewTabletPicker(ts, cells, keyspace, "0", "primary") + picker, err := discovery.NewTabletPicker(ctx, ts, cells, "", keyspace, "0", "primary", discovery.TabletPickerOptions{ + CellPreference: "OnlySpecified", + }) if err != nil { return "" }