diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index f296a01bd9ed..a533de9bb686 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -823,7 +823,7 @@ const ( type spanPartitionState struct { // partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of // times we have picked an instance for that reason. - partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int + partitionSpanDecisions [NumSpanPartitionReason]int // partitionSpans is a mapping from a SQLInstanceID to the number of // partition spans that have been assigned to that node. @@ -1187,6 +1187,11 @@ const ( // eligible but overloaded with other partitions. In this case we pick a // random instance apart from the gateway. SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + // SpanPartitionReason_PREVIOUSLY_CACHED is reported whenever we've already + // resolved the provided NodeID. + SpanPartitionReason_PREVIOUSLY_CACHED + + NumSpanPartitionReason ) func (r SpanPartitionReason) String() string { @@ -1215,6 +1220,8 @@ func (r SpanPartitionReason) String() string { return "gossip-target-healthy" case SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: return "locality-filtered-random-gateway-overloaded" + case SpanPartitionReason_PREVIOUSLY_CACHED: + return "previously-cached" default: return "unknown" } @@ -1765,20 +1772,35 @@ func (dsp *DistSQLPlanner) makeInstanceResolver( log.VEventf(ctx, 2, "healthy SQL instances available for distributed planning: %v", instances) } + var filtered bool + // filterUnhealthyInstances on the first call updates 'instances' slice to + // contain only healthy SQL instances. On the second and all consecutive + // calls it is a no-op. filterUnhealthyInstances := func() []sqlinstance.InstanceInfo { + if filtered { + return instances + } // Instances that have gone down might still be present in the sql_instances // cache. Therefore, we filter out these unhealthy nodes by dialing them. - healthy, unhealthy := dsp.filterUnhealthyInstances(instances, planCtx.nodeStatuses) + var unhealthy []sqlinstance.InstanceInfo + instances, unhealthy = dsp.filterUnhealthyInstances(instances, planCtx.nodeStatuses) if len(unhealthy) != 0 && log.ExpensiveLogEnabled(ctx, 2) { log.Eventf(ctx, "not planning on unhealthy instances : %v", unhealthy) } - return healthy + filtered = true + return instances } + // cached tracks which NodeID have already been resolved. + cached := make(map[roachpb.NodeID]base.SQLInstanceID) + // If we were able to determine the locality information for at least some // instances, use the locality-aware resolver. if instancesHaveLocality { resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) { + if instance, ok := cached[nodeID]; ok { + return instance, SpanPartitionReason_PREVIOUSLY_CACHED + } instances = filterUnhealthyInstances() if len(instances) == 0 { log.Eventf(ctx, "no healthy sql instances available for planning, using the gateway") @@ -1804,9 +1826,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver( } } - // TODO(dt): Pre-compute / cache this result, e.g. in the instance reader. - if closest, _ := ClosestInstances(instances, - nodeDesc.Locality); len(closest) > 0 { + if closest, _ := ClosestInstances(instances, nodeDesc.Locality); len(closest) > 0 { return closest[rng.Intn(len(closest))], SpanPartitionReason_CLOSEST_LOCALITY_MATCH } @@ -1843,8 +1863,11 @@ func (dsp *DistSQLPlanner) makeInstanceResolver( instances[i], instances[j] = instances[j], instances[i] }) var i int - resolver := func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) { - instances := filterUnhealthyInstances() + resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) { + if instance, ok := cached[nodeID]; ok { + return instance, SpanPartitionReason_PREVIOUSLY_CACHED + } + instances = filterUnhealthyInstances() if len(instances) == 0 { log.Eventf(ctx, "no healthy sql instances available for planning, only using the gateway") return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index f7455998bd54..d78a776bdc04 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -962,7 +962,7 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, }, totalPartitionSpans: 4, @@ -995,7 +995,7 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, }, totalPartitionSpans: 4, @@ -1026,7 +1026,7 @@ func TestPartitionSpans(t *testing.T) { 1: 3, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, }, @@ -1058,7 +1058,7 @@ func TestPartitionSpans(t *testing.T) { 1: 3, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, }, @@ -1090,7 +1090,7 @@ func TestPartitionSpans(t *testing.T) { 2: 3, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, }, @@ -1122,7 +1122,7 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 3, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, }, @@ -1153,7 +1153,7 @@ func TestPartitionSpans(t *testing.T) { 1: 2, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, }, totalPartitionSpans: 3, @@ -1190,7 +1190,7 @@ func TestPartitionSpans(t *testing.T) { 1: 9, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 10, }, totalPartitionSpans: 10, @@ -1218,7 +1218,7 @@ func TestPartitionSpans(t *testing.T) { partitionSpans: map[base.SQLInstanceID]int{ 1: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, }, totalPartitionSpans: 2, @@ -1253,7 +1253,7 @@ func TestPartitionSpans(t *testing.T) { 1: 2, 2: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_TARGET_HEALTHY: 3, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 1, }, @@ -1286,7 +1286,7 @@ func TestPartitionSpans(t *testing.T) { 2: 3, 4: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_TARGET_HEALTHY: 1, SpanPartitionReason_CLOSEST_LOCALITY_MATCH: 3, }, @@ -1316,7 +1316,7 @@ func TestPartitionSpans(t *testing.T) { 6: 2, 7: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH: 2, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 2, }, @@ -1347,7 +1347,7 @@ func TestPartitionSpans(t *testing.T) { partitionSpans: map[base.SQLInstanceID]int{ 7: 4, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [NumSpanPartitionReason]int{ SpanPartitionReason_LOCALITY_FILTERED_RANDOM: 4, }, totalPartitionSpans: 4,