diff --git a/go/vt/vtgate/balancer/balancer.go b/go/vt/vtgate/balancer/balancer.go new file mode 100644 index 00000000000..f830b44f8ed --- /dev/null +++ b/go/vt/vtgate/balancer/balancer.go @@ -0,0 +1,348 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "encoding/json" + "fmt" + "math/rand" + "sync" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +/* + +The tabletBalancer probabalistically orders the list of available tablets into +a ranked order of preference in order to satisfy two high-level goals: + +1. Balance the load across the available replicas +2. Prefer a replica in the same cell as the vtgate if possible + +In some topologies this is trivial to accomplish by simply preferring tablets in the +local cell, assuming there are a proportional number of local tablets in each cell to +satisfy the inbound traffic to the vtgates in that cell. + +However, for topologies with a relatively small number of tablets in each cell, a simple +affinity algorithm does not effectively balance the load. + +As a simple example: + + Given three cells with vtgates, four replicas spread into those cells, where each vtgate + receives an equal query share. If each routes only to its local cell, the tablets will be + unbalanced since two of them receive 1/3 of the queries, but the two replicas in the same + cell will only receive 1/6 of the queries. + + Cell A: 1/3 --> vtgate --> 1/3 => vttablet + + Cell B: 1/3 --> vtgate --> 1/3 => vttablet + + Cell C: 1/3 --> vtgate --> 1/6 => vttablet + \-> 1/6 => vttablet + +Other topologies that can cause similar pathologies include cases where there may be cells +containing replicas but no local vtgates, and/or cells that have only vtgates but no replicas. + +For these topologies, the tabletBalancer proportionally assigns the output flow to each tablet, +preferring the local cell where possible, but only as long as the global query balance is +maintained. + +To accomplish this goal, the balancer is given: + +* The list of cells that receive inbound traffic to vtgates +* The local cell where the vtgate exists +* The set of tablets and their cells (learned from discovery) + +The model assumes there is an equal probablility of a query coming from each vtgate cell, i.e. +traffic is effectively load balanced between the cells with vtgates. + +Given that information, the balancer builds a simple model to determine how much query load +would go to each tablet if vtgate only routed to its local cell. Then if any tablets are +unbalanced, it shifts the desired allocation away from the local cell preference in order to +even out the query load. + +Based on this global model, the vtgate then probabalistically picks a destination for each +query to be sent and uses these weights to order the available tablets accordingly. + +Assuming each vtgate is configured with and discovers the same information about the topology, +and the input flow is balanced across the vtgate cells (as mentioned above), then each vtgate +should come the the same conclusion about the global flows, and cooperatively should +converge on the desired balanced query load. + +*/ + +type TabletBalancer interface { + // Randomly shuffle the tablets into an order for routing queries + ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth) + + // Callback when the topology changes to invalidate any cached state + TopologyChanged() +} + +func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer { + return &tabletBalancer{ + localCell: localCell, + vtGateCells: vtGateCells, + allocations: map[discovery.KeyspaceShardTabletType]*targetAllocation{}, + } +} + +type tabletBalancer struct { + // + // Configuration + // + + // The local cell for the vtgate + localCell string + + // The set of cells that have vtgates + vtGateCells []string + + // mu protects the allocation map + mu sync.Mutex + + // + // Allocations for balanced mode, calculated once per target and invalidated + // whenever the topology changes. + // + allocations map[discovery.KeyspaceShardTabletType]*targetAllocation +} + +type targetAllocation struct { + // Target flow per cell based on the number of tablets discovered in the cell + Target map[string]int // json:target + + // Input flows allocated for each cell + Inflows map[string]int + + // Output flows from each vtgate cell to each target cell + Outflows map[string]map[string]int + + // Allocation routed to each tablet from the local cell used for ranking + Allocation map[uint32]int + + // Total allocation which is basically 1,000,000 / len(vtgatecells) + TotalAllocation int +} + +func (b *tabletBalancer) print() string { + allocations, _ := json.Marshal(&b.allocations) + return fmt.Sprintf("LocalCell: %s, VtGateCells: %s, allocations: %s", + b.localCell, b.vtGateCells, string(allocations)) +} + +// ShuffleTablets is the main entry point to the balancer. +// +// It shuffles the tablets into a preference list for routing a given query. +// However, since all tablets should be healthy, the query will almost always go +// to the first tablet in the list, so the balancer ranking algoritm randomly +// shuffles the list to break ties, then chooses a weighted random selection +// based on the balance alorithm to promote to the first in the set. +func (b *tabletBalancer) ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth) { + + numTablets := len(tablets) + + allocationMap, totalAllocation := b.getAllocation(target, tablets) + + rand.Shuffle(numTablets, func(i, j int) { tablets[i], tablets[j] = tablets[j], tablets[i] }) + + // Do another O(n) seek through the list to effect the weighted sample by picking + // a random point in the allocation space and seeking forward in the list of (randomized) + // tablets to that point, promoting the tablet to the head of the list. + r := rand.Intn(totalAllocation) + for i := 0; i < numTablets; i++ { + flow := allocationMap[tablets[i].Tablet.Alias.Uid] + if r < flow { + tablets[0], tablets[i] = tablets[i], tablets[0] + break + } + r -= flow + } +} + +// TopologyChanged is a callback to indicate the topology changed and any cached +// allocations should be cleared +func (b *tabletBalancer) TopologyChanged() { + b.mu.Lock() + defer b.mu.Unlock() + + b.allocations = map[discovery.KeyspaceShardTabletType]*targetAllocation{} +} + +// To stick with integer arithmetic, use 1,000,000 as the full load +const ALLOCATION = 1000000 + +func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation { + + a := targetAllocation{} + + // Initialization: Set up some data structures and derived values + a.Target = make(map[string]int) + a.Inflows = make(map[string]int) + a.Outflows = make(map[string]map[string]int) + a.Allocation = make(map[uint32]int) + flowPerVtgateCell := ALLOCATION / len(b.vtGateCells) + flowPerTablet := ALLOCATION / len(allTablets) + cellExistsWithNoTablets := false + + for _, th := range allTablets { + a.Target[th.Tablet.Alias.Cell] += flowPerTablet + } + + // + // First pass: Allocate vtgate flow to the local cell where the vtgate exists + // and along the way figure out if there are any vtgates with no local tablets. + // + for _, cell := range b.vtGateCells { + outflow := map[string]int{} + target := a.Target[cell] + + if target > 0 { + a.Inflows[cell] += flowPerVtgateCell + outflow[cell] = flowPerVtgateCell + } else { + cellExistsWithNoTablets = true + } + + a.Outflows[cell] = outflow + } + + // + // Figure out if there is a shortfall + // + underAllocated := make(map[string]int) + unbalancedFlow := 0 + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + + // + // Second pass: if there are any vtgates with no local tablets, allocate the underallocated amount + // proportionally to all cells that may need it + // + if cellExistsWithNoTablets { + for _, vtgateCell := range b.vtGateCells { + target := a.Target[vtgateCell] + if target != 0 { + continue + } + + for underAllocatedCell, underAllocatedFlow := range underAllocated { + allocation := flowPerVtgateCell * underAllocatedFlow / unbalancedFlow + a.Inflows[underAllocatedCell] += allocation + a.Outflows[vtgateCell][underAllocatedCell] += allocation + } + } + + // Recompute underallocated after these flows were assigned + unbalancedFlow = 0 + underAllocated = make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + } + + // + // Third pass: Shift remaining imbalance if any cell is over/under allocated after + // assigning local cell traffic and distributing load from cells without tablets. + // + if /* fudge for integer arithmetic */ unbalancedFlow > 10 { + + // cells which are overallocated + overAllocated := make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] > allocation { + overAllocated[cell] = a.Inflows[cell] - allocation + } + } + + // fmt.Printf("outflows %v over %v under %v\n", a.Outflows, overAllocated, underAllocated) + + // + // For each overallocated cell, proportionally shift flow from targets that are overallocated + // to targets that are underallocated. + // + // Note this is an O(N^3) loop, but only over the cells which need adjustment. + // + for _, vtgateCell := range b.vtGateCells { + for underAllocatedCell, underAllocatedFlow := range underAllocated { + for overAllocatedCell, overAllocatedFlow := range overAllocated { + + currentFlow := a.Outflows[vtgateCell][overAllocatedCell] + if currentFlow == 0 { + continue + } + + // Shift a proportional fraction of the amount that the cell is currently allocated weighted + // by the fraction that this vtgate cell is already sending to the overallocated cell, and the + // fraction that the new target is underallocated + // + // Note that the operator order matters -- multiplications need to occur before divisions + // to avoid truncating the integer values. + shiftFlow := overAllocatedFlow * currentFlow * underAllocatedFlow / a.Inflows[overAllocatedCell] / unbalancedFlow + + //fmt.Printf("shift %d %s %s -> %s (over %d current %d in %d under %d unbalanced %d) \n", shiftFlow, vtgateCell, overAllocatedCell, underAllocatedCell, + // overAllocatedFlow, currentFlow, a.Inflows[overAllocatedCell], underAllocatedFlow, unbalancedFlow) + + a.Outflows[vtgateCell][overAllocatedCell] -= shiftFlow + a.Inflows[overAllocatedCell] -= shiftFlow + + a.Inflows[underAllocatedCell] += shiftFlow + a.Outflows[vtgateCell][underAllocatedCell] += shiftFlow + } + } + } + } + + // + // Finally, once the cell flows are all adjusted, figure out the local allocation to each + // tablet in the target cells + // + outflow := a.Outflows[b.localCell] + for _, tablet := range allTablets { + cell := tablet.Tablet.Alias.Cell + flow := outflow[cell] + if flow > 0 { + a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell] + a.TotalAllocation += flow * flowPerTablet / a.Target[cell] + } + } + + return &a +} + +// getAllocation builds the allocation map if needed and returns a copy of the map +func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) { + + b.mu.Lock() + defer b.mu.Unlock() + + allocation, exists := b.allocations[discovery.KeyFromTarget(target)] + if !exists { + allocation = b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = allocation + } + + return allocation.Allocation, allocation.TotalAllocation +} diff --git a/go/vt/vtgate/balancer/balancer_test.go b/go/vt/vtgate/balancer/balancer_test.go new file mode 100644 index 00000000000..318ee6b2776 --- /dev/null +++ b/go/vt/vtgate/balancer/balancer_test.go @@ -0,0 +1,393 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "strconv" + "testing" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" +) + +var nextTestTabletUID int + +func createTestTablet(cell string) *discovery.TabletHealth { + nextTestTabletUID++ + tablet := topo.NewTablet(uint32(nextTestTabletUID), cell, strconv.Itoa(nextTestTabletUID)) + tablet.PortMap["vt"] = 1 + tablet.PortMap["grpc"] = 2 + tablet.Keyspace = "k" + tablet.Shard = "s" + + return &discovery.TabletHealth{ + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: false, + Stats: nil, + PrimaryTermStartTime: 0, + } +} + +// allow 2% fuzz +const FUZZ = 2 + +func fuzzyEquals(a, b int) bool { + diff := a - b + if diff < 0 { + diff = -diff + } + return diff < a*FUZZ/100 +} + +func TestAllocateFlows(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "balanced one tablet per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "balanced multiple tablets per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "vtgate in cell with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e"}, + }, + { + "vtgates in multiple cells with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e", "f", "g"}, + }, + { + "imbalanced multiple tablets in one cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + }, + []string{"a", "b", "c"}, + }, + { + "imbalanced multiple tablets in multiple cells", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "heavy imbalance", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + []string{"a", "b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + tabletsByCell := make(map[string][]*discovery.TabletHealth) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + tabletsByCell[cell] = append(tabletsByCell[cell], tablet) + } + + allocationPerTablet := make(map[uint32]int) + expectedPerTablet := ALLOCATION / len(tablets) + + expectedPerCell := make(map[string]int) + for cell := range tabletsByCell { + expectedPerCell[cell] = ALLOCATION / len(tablets) * len(tabletsByCell[cell]) + } + + // Run the balancer over each vtgate cell + for _, localCell := range vtGateCells { + b := NewTabletBalancer(localCell, vtGateCells).(*tabletBalancer) + a := b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = a + + t.Logf("Target Flows %v, Balancer: %s XXX %d %v \n", expectedPerCell, b.print(), len(b.allocations), b.allocations) + + // Accumulate all the output per tablet cell + outflowPerCell := make(map[string]int) + for _, outflow := range a.Outflows { + for tabletCell, flow := range outflow { + if flow < 0 { + t.Errorf("balancer %v negative outflow", b.print()) + } + outflowPerCell[tabletCell] += flow + } + } + + // Check in / out flow to each tablet cell + for cell := range tabletsByCell { + expectedForCell := expectedPerCell[cell] + + if !fuzzyEquals(a.Inflows[cell], expectedForCell) || !fuzzyEquals(outflowPerCell[cell], expectedForCell) { + t.Errorf("Balancer {%s} ExpectedPerCell {%v} did not allocate correct flow to cell %s: expected %d, inflow %d outflow %d", + b.print(), expectedPerCell, cell, expectedForCell, a.Inflows[cell], outflowPerCell[cell]) + } + } + + // Accumulate the allocations for all runs to compare what the system does as a whole + // when routing from all vtgate cells + for uid, flow := range a.Allocation { + allocationPerTablet[uid] += flow + } + } + + // Check that the allocations all add up + for _, tablet := range tablets { + uid := tablet.Tablet.Alias.Uid + + allocation := allocationPerTablet[uid] + if !fuzzyEquals(allocation, expectedPerTablet) { + t.Errorf("did not allocate full allocation to tablet %d: expected %d got %d", + uid, expectedPerTablet, allocation) + } + } + } +} + +func TestBalancedShuffle(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "simple balanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "simple unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "mixed unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + + []string{"a", "b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + // test unbalanced distribution + + routed := make(map[uint32]int) + + expectedPerCell := make(map[string]int) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + expectedPerCell[cell] += ALLOCATION / len(tablets) + } + + // Run the algorithm a bunch of times to get a random enough sample + N := 1000000 + for _, localCell := range vtGateCells { + b := NewTabletBalancer(localCell, vtGateCells).(*tabletBalancer) + + for i := 0; i < N/len(vtGateCells); i++ { + b.ShuffleTablets(target, tablets) + if i == 0 { + t.Logf("Target Flows %v, Balancer: %s\n", expectedPerCell, b.print()) + t.Logf(b.print()) + } + + routed[tablets[0].Tablet.Alias.Uid]++ + } + } + + expected := N / len(tablets) + delta := make(map[uint32]int) + for _, tablet := range tablets { + got := routed[tablet.Tablet.Alias.Uid] + delta[tablet.Tablet.Alias.Uid] = got - expected + if !fuzzyEquals(got, expected) { + t.Errorf("routing to tablet %d got %d expected %d", tablet.Tablet.Alias.Uid, got, expected) + } + } + t.Logf("Expected %d per tablet, Routed %v, Delta %v, Max delta %d", N/len(tablets), routed, delta, expected*FUZZ/100) + } +} + +func TestTopologyChanged(t *testing.T) { + allTablets := []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + } + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + b := NewTabletBalancer("b", []string{"a", "b"}).(*tabletBalancer) + + N := 1 + + // initially create a slice of tablets with just the two in cell a + tablets := allTablets + tablets = tablets[0:2] + + for i := 0; i < N; i++ { + b.ShuffleTablets(target, tablets) + allocation, totalAllocation := b.getAllocation(target, tablets) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if tablets[0].Tablet.Alias.Cell != "a" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets[0].Tablet.Alias.Cell) + } + } + + // Run again with the full topology, but without triggering a topology change + // event to cause a reallocation + tablets2 := allTablets + for i := 0; i < N; i++ { + b.ShuffleTablets(target, tablets2) + + allocation, totalAllocation := b.getAllocation(target, tablets2) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if tablets2[0].Tablet.Alias.Cell != "a" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell) + } + } + + // Trigger toplogy changed event, now traffic should go to b + b.TopologyChanged() + for i := 0; i < N; i++ { + b.ShuffleTablets(target, tablets2) + + allocation, totalAllocation := b.getAllocation(target, tablets2) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if tablets2[0].Tablet.Alias.Cell != "b" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell) + } + } +} diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 3bbdf3c5939..1c1efe47546 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/balancer" "vitess.io/vitess/go/vt/vtgate/buffer" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -53,6 +54,11 @@ var ( // retryCount is the number of times a query will be retried on error retryCount = 2 routeReplicaToRdonly bool + + // configuration flags for the tablet balancer + balancerEnabled bool + balancerVtgateCells []string + balancerKeyspaces []string ) func init() { @@ -62,6 +68,9 @@ func init() { fs.DurationVar(&initialTabletTimeout, "gateway_initial_tablet_timeout", 30*time.Second, "At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type") fs.IntVar(&retryCount, "retry-count", 2, "retry count") fs.BoolVar(&routeReplicaToRdonly, "gateway_route_replica_to_rdonly", false, "route REPLICA queries to RDONLY tablets as well as REPLICA tablets") + fs.BoolVar(&balancerEnabled, "balancer_enabled", false, "Whether to enable the tablet balancer to evenly spread query load") + fs.StringSliceVar(&balancerVtgateCells, "balancer_vtgate_cells", []string{}, "When in balanced mode, a comma-separated list of cells that contain vtgates (required)") + fs.StringSliceVar(&balancerKeyspaces, "balancer_keyspaces", []string{}, "When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)") }) } @@ -84,6 +93,9 @@ type TabletGateway struct { // buffer, if enabled, buffers requests during a detected PRIMARY failover. buffer *buffer.Buffer + + // balancer used for routing to tablets + balancer balancer.TabletBalancer } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { @@ -112,6 +124,9 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop statusAggregators: make(map[string]*TabletStatusAggregator), } gw.setupBuffering(ctx) + if balancerEnabled { + gw.setupBalancer(ctx) + } gw.QueryService = queryservice.Wrap(nil, gw.withRetry) return gw } @@ -171,6 +186,26 @@ func (gw *TabletGateway) setupBuffering(ctx context.Context) { } } +func (gw *TabletGateway) setupBalancer(ctx context.Context) { + if len(balancerVtgateCells) == 0 { + log.Exitf("balancer_vtgate_cells is required for balanced mode") + } + gw.balancer = balancer.NewTabletBalancer(gw.localCell, balancerVtgateCells) + + // subscribe to healthcheck updates so that the balancer can reset its allocation + hcChan := gw.hc.Subscribe() + go func(ctx context.Context, c chan *discovery.TabletHealth, balancer balancer.TabletBalancer) { + for { + select { + case <-ctx.Done(): + return + case <-hcChan: + balancer.TopologyChanged() + } + } + }(ctx, hcChan, gw.balancer) +} + // QueryServiceByAlias satisfies the Gateway interface func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { return gw.hc.TabletConnection(alias, target) @@ -323,7 +358,27 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no healthy tablet available for '%s'", target.String()) break } - gw.shuffleTablets(gw.localCell, tablets) + + // Determine whether or not to use the balancer or the standard affinity-based shuffle + useBalancer := false + if balancerEnabled { + if len(balancerKeyspaces) != 0 { + for _, keyspace := range balancerKeyspaces { + if keyspace == target.Keyspace { + useBalancer = true + break + } + } + } else { + useBalancer = true + } + } + + if useBalancer { + gw.balancer.ShuffleTablets(target, tablets) + } else { + gw.shuffleTablets(gw.localCell, tablets) + } var th *discovery.TabletHealth // skip tablets we tried before