Skip to content

Commit

Permalink
WIP on always resolving NodeID to the same SQL instance
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzefovich committed Nov 20, 2024
1 parent d993d31 commit 7c81ac8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
39 changes: 31 additions & 8 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ const (
type spanPartitionState struct {
// partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of
// times we have picked an instance for that reason.
partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int
partitionSpanDecisions [NumSpanPartitionReason]int

// partitionSpans is a mapping from a SQLInstanceID to the number of
// partition spans that have been assigned to that node.
Expand Down Expand Up @@ -1187,6 +1187,11 @@ const (
// eligible but overloaded with other partitions. In this case we pick a
// random instance apart from the gateway.
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
// SpanPartitionReason_PREVIOUSLY_CACHED is reported whenever we've already
// resolved the provided NodeID.
SpanPartitionReason_PREVIOUSLY_CACHED

NumSpanPartitionReason
)

func (r SpanPartitionReason) String() string {
Expand Down Expand Up @@ -1215,6 +1220,8 @@ func (r SpanPartitionReason) String() string {
return "gossip-target-healthy"
case SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED:
return "locality-filtered-random-gateway-overloaded"
case SpanPartitionReason_PREVIOUSLY_CACHED:
return "previously-cached"
default:
return "unknown"
}
Expand Down Expand Up @@ -1765,20 +1772,35 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
log.VEventf(ctx, 2, "healthy SQL instances available for distributed planning: %v", instances)
}

var filtered bool
// filterUnhealthyInstances on the first call updates 'instances' slice to
// contain only healthy SQL instances. On the second and all consecutive
// calls it is a no-op.
filterUnhealthyInstances := func() []sqlinstance.InstanceInfo {
if filtered {
return instances
}
// Instances that have gone down might still be present in the sql_instances
// cache. Therefore, we filter out these unhealthy nodes by dialing them.
healthy, unhealthy := dsp.filterUnhealthyInstances(instances, planCtx.nodeStatuses)
var unhealthy []sqlinstance.InstanceInfo
instances, unhealthy = dsp.filterUnhealthyInstances(instances, planCtx.nodeStatuses)
if len(unhealthy) != 0 && log.ExpensiveLogEnabled(ctx, 2) {
log.Eventf(ctx, "not planning on unhealthy instances : %v", unhealthy)
}
return healthy
filtered = true
return instances
}

// cached tracks which NodeID have already been resolved.
cached := make(map[roachpb.NodeID]base.SQLInstanceID)

// If we were able to determine the locality information for at least some
// instances, use the locality-aware resolver.
if instancesHaveLocality {
resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
if instance, ok := cached[nodeID]; ok {
return instance, SpanPartitionReason_PREVIOUSLY_CACHED
}
instances = filterUnhealthyInstances()
if len(instances) == 0 {
log.Eventf(ctx, "no healthy sql instances available for planning, using the gateway")
Expand All @@ -1804,9 +1826,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
}
}

// TODO(dt): Pre-compute / cache this result, e.g. in the instance reader.
if closest, _ := ClosestInstances(instances,
nodeDesc.Locality); len(closest) > 0 {
if closest, _ := ClosestInstances(instances, nodeDesc.Locality); len(closest) > 0 {
return closest[rng.Intn(len(closest))], SpanPartitionReason_CLOSEST_LOCALITY_MATCH
}

Expand Down Expand Up @@ -1843,8 +1863,11 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
instances[i], instances[j] = instances[j], instances[i]
})
var i int
resolver := func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
instances := filterUnhealthyInstances()
resolver := func(nodeID roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason) {
if instance, ok := cached[nodeID]; ok {
return instance, SpanPartitionReason_PREVIOUSLY_CACHED
}
instances = filterUnhealthyInstances()
if len(instances) == 0 {
log.Eventf(ctx, "no healthy sql instances available for planning, only using the gateway")
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_NO_HEALTHY_INSTANCES
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ func TestPartitionSpans(t *testing.T) {
2: 1,
3: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4,
},
totalPartitionSpans: 4,
Expand Down Expand Up @@ -995,7 +995,7 @@ func TestPartitionSpans(t *testing.T) {
2: 1,
3: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4,
},
totalPartitionSpans: 4,
Expand Down Expand Up @@ -1026,7 +1026,7 @@ func TestPartitionSpans(t *testing.T) {
1: 3,
3: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3,
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1,
},
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func TestPartitionSpans(t *testing.T) {
1: 3,
2: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3,
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1,
},
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestPartitionSpans(t *testing.T) {
2: 3,
3: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2,
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2,
},
Expand Down Expand Up @@ -1122,7 +1122,7 @@ func TestPartitionSpans(t *testing.T) {
2: 1,
3: 3,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2,
SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2,
},
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func TestPartitionSpans(t *testing.T) {
1: 2,
2: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3,
},
totalPartitionSpans: 3,
Expand Down Expand Up @@ -1190,7 +1190,7 @@ func TestPartitionSpans(t *testing.T) {
1: 9,
2: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 10,
},
totalPartitionSpans: 10,
Expand Down Expand Up @@ -1218,7 +1218,7 @@ func TestPartitionSpans(t *testing.T) {
partitionSpans: map[base.SQLInstanceID]int{
1: 2,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2,
},
totalPartitionSpans: 2,
Expand Down Expand Up @@ -1253,7 +1253,7 @@ func TestPartitionSpans(t *testing.T) {
1: 2,
2: 2,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_TARGET_HEALTHY: 3,
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 1,
},
Expand Down Expand Up @@ -1286,7 +1286,7 @@ func TestPartitionSpans(t *testing.T) {
2: 3,
4: 1,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_TARGET_HEALTHY: 1,
SpanPartitionReason_CLOSEST_LOCALITY_MATCH: 3,
},
Expand Down Expand Up @@ -1316,7 +1316,7 @@ func TestPartitionSpans(t *testing.T) {
6: 2,
7: 2,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH: 2,
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 2,
},
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func TestPartitionSpans(t *testing.T) {
partitionSpans: map[base.SQLInstanceID]int{
7: 4,
},
partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{
partitionSpanDecisions: [NumSpanPartitionReason]int{
SpanPartitionReason_LOCALITY_FILTERED_RANDOM: 4,
},
totalPartitionSpans: 4,
Expand Down

0 comments on commit 7c81ac8

Please sign in to comment.