diff --git a/go/vt/vtgate/balancer/balancer.go b/go/vt/vtgate/balancer/balancer.go index f830b44f8ed..80486695a5b 100644 --- a/go/vt/vtgate/balancer/balancer.go +++ b/go/vt/vtgate/balancer/balancer.go @@ -89,9 +89,6 @@ converge on the desired balanced query load. type TabletBalancer interface { // Randomly shuffle the tablets into an order for routing queries ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth) - - // Callback when the topology changes to invalidate any cached state - TopologyChanged() } func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer { @@ -136,6 +133,9 @@ type targetAllocation struct { // Allocation routed to each tablet from the local cell used for ranking Allocation map[uint32]int + // Tablets that local cell does not route to + Unallocated map[uint32]struct{} + // Total allocation which is basically 1,000,000 / len(vtgatecells) TotalAllocation int } @@ -175,27 +175,18 @@ func (b *tabletBalancer) ShuffleTablets(target *querypb.Target, tablets []*disco } } -// TopologyChanged is a callback to indicate the topology changed and any cached -// allocations should be cleared -func (b *tabletBalancer) TopologyChanged() { - b.mu.Lock() - defer b.mu.Unlock() - - b.allocations = map[discovery.KeyspaceShardTabletType]*targetAllocation{} -} - // To stick with integer arithmetic, use 1,000,000 as the full load const ALLOCATION = 1000000 func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation { - - a := targetAllocation{} - // Initialization: Set up some data structures and derived values - a.Target = make(map[string]int) - a.Inflows = make(map[string]int) - a.Outflows = make(map[string]map[string]int) - a.Allocation = make(map[uint32]int) + a := targetAllocation{ + Target: map[string]int{}, + Inflows: map[string]int{}, + Outflows: map[string]map[string]int{}, + Allocation: map[uint32]int{}, + Unallocated: map[uint32]struct{}{}, + } flowPerVtgateCell := ALLOCATION / len(b.vtGateCells) flowPerTablet := ALLOCATION / len(allTablets) cellExistsWithNoTablets := false @@ -326,6 +317,8 @@ func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *ta if flow > 0 { a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell] a.TotalAllocation += flow * flowPerTablet / a.Target[cell] + } else { + a.Unallocated[tablet.Tablet.Alias.Uid] = struct{}{} } } @@ -334,15 +327,28 @@ func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *ta // getAllocation builds the allocation map if needed and returns a copy of the map func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) { - b.mu.Lock() defer b.mu.Unlock() allocation, exists := b.allocations[discovery.KeyFromTarget(target)] - if !exists { - allocation = b.allocateFlows(tablets) - b.allocations[discovery.KeyFromTarget(target)] = allocation + if exists && (len(allocation.Allocation)+len(allocation.Unallocated)) == len(tablets) { + mismatch := false + for _, tablet := range tablets { + if _, ok := allocation.Allocation[tablet.Tablet.Alias.Uid]; !ok { + if _, ok := allocation.Unallocated[tablet.Tablet.Alias.Uid]; !ok { + mismatch = true + break + } + } + } + if !mismatch { + // No change in tablets for this target. Return computed allocation + return allocation.Allocation, allocation.TotalAllocation + } } + allocation = b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = allocation + return allocation.Allocation, allocation.TotalAllocation } diff --git a/go/vt/vtgate/balancer/balancer_test.go b/go/vt/vtgate/balancer/balancer_test.go index 318ee6b2776..1eb9e69fadf 100644 --- a/go/vt/vtgate/balancer/balancer_test.go +++ b/go/vt/vtgate/balancer/balancer_test.go @@ -350,13 +350,11 @@ func TestTopologyChanged(t *testing.T) { } } - // Run again with the full topology, but without triggering a topology change - // event to cause a reallocation - tablets2 := allTablets + // Run again with the full topology. Now traffic should go to cell b for i := 0; i < N; i++ { - b.ShuffleTablets(target, tablets2) + b.ShuffleTablets(target, allTablets) - allocation, totalAllocation := b.getAllocation(target, tablets2) + allocation, totalAllocation := b.getAllocation(target, allTablets) if totalAllocation != ALLOCATION/2 { t.Errorf("totalAllocation mismatch %s", b.print()) @@ -366,28 +364,8 @@ func TestTopologyChanged(t *testing.T) { t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) } - if tablets2[0].Tablet.Alias.Cell != "a" { - t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell) - } - } - - // Trigger toplogy changed event, now traffic should go to b - b.TopologyChanged() - for i := 0; i < N; i++ { - b.ShuffleTablets(target, tablets2) - - allocation, totalAllocation := b.getAllocation(target, tablets2) - - if totalAllocation != ALLOCATION/2 { - t.Errorf("totalAllocation mismatch %s", b.print()) - } - - if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { - t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) - } - - if tablets2[0].Tablet.Alias.Cell != "b" { - t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell) + if allTablets[0].Tablet.Alias.Cell != "b" { + t.Errorf("shuffle promoted wrong tablet from cell %s", allTablets[0].Tablet.Alias.Cell) } } } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 1c1efe47546..ce8c12019d5 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -191,19 +191,6 @@ func (gw *TabletGateway) setupBalancer(ctx context.Context) { log.Exitf("balancer_vtgate_cells is required for balanced mode") } gw.balancer = balancer.NewTabletBalancer(gw.localCell, balancerVtgateCells) - - // subscribe to healthcheck updates so that the balancer can reset its allocation - hcChan := gw.hc.Subscribe() - go func(ctx context.Context, c chan *discovery.TabletHealth, balancer balancer.TabletBalancer) { - for { - select { - case <-ctx.Done(): - return - case <-hcChan: - balancer.TopologyChanged() - } - } - }(ctx, hcChan, gw.balancer) } // QueryServiceByAlias satisfies the Gateway interface