Skip to content

Commit

Permalink
Merge branch 'main' into agent-runner-fix-computeUnitBounds-comment
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff committed Sep 2, 2023
2 parents 38e5ada + a775918 commit 7c3f7a9
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 45 deletions.
99 changes: 54 additions & 45 deletions pkg/agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,18 +1051,18 @@ const (
UpdatedVMInfo VMUpdateReason = "updated VM info"
)

// atomicUpdateState holds some pre-validated data for (*Runner).updateVMResources, fetched
// AtomicUpdateState holds some pre-validated data for (*Runner).updateVMResources, fetched
// atomically (i.e. all at once, while holding r.lock) with the (*Runner).atomicState method
//
// Because atomicState is able to return nil when there isn't yet enough information to update the
// VM's resources, some validation is already guaranteed by representing the data without pointers.
type atomicUpdateState struct {
computeUnit api.Resources
metrics api.Metrics
vm api.VmInfo
lastApproved api.Resources
requestedUpscale api.MoreResources
config api.ScalingConfig
type AtomicUpdateState struct {
ComputeUnit api.Resources
Metrics api.Metrics
VM api.VmInfo
LastApproved api.Resources
RequestedUpscale api.MoreResources
Config api.ScalingConfig
}

// updateVMResources is responsible for the high-level logic that orchestrates a single update to
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (r *Runner) updateVMResources(
return nil
}

state, err := func() (*atomicUpdateState, error) {
state, err := func() (*AtomicUpdateState, error) {
r.lock.Lock()
defer r.lock.Unlock()

Expand All @@ -1127,9 +1127,9 @@ func (r *Runner) updateVMResources(
}

// Calculate the current and desired state of the VM
target = state.desiredVMState(true) // note: this sets the state value in the loop body
target = state.DesiredVMState(true) // note: this sets the state value in the loop body

current := state.vm.Using()
current := state.VM.Using()
start = current

msg := "Target VM state is equal to current"
Expand All @@ -1151,7 +1151,7 @@ func (r *Runner) updateVMResources(

// note: r.atomicState already checks the validity of r.lastApproved - namely that it has no
// values less than r.vm.Using().
capped = target.Min(state.lastApproved) // note: this sets the state value in the loop body
capped = target.Min(state.LastApproved) // note: this sets the state value in the loop body

return state, nil
}()
Expand All @@ -1164,14 +1164,14 @@ func (r *Runner) updateVMResources(

// If there's an update that can be done immediately, do it! Typically, capped will
// represent the resources we'd like to downscale.
if capped != state.vm.Using() {
if capped != state.VM.Using() {
// If our downscale gets rejected, calculate a new target
rejectedDownscale := func() (newTarget api.Resources, _ error) {
target = state.desiredVMState(false /* don't allow downscaling */)
return target.Min(state.lastApproved), nil
target = state.DesiredVMState(false /* don't allow downscaling */)
return target.Min(state.LastApproved), nil
}

nowUsing, err := r.doVMUpdate(ctx, logger, state.vm.Using(), capped, rejectedDownscale)
nowUsing, err := r.doVMUpdate(ctx, logger, state.VM.Using(), capped, rejectedDownscale)
if err != nil {
return fmt.Errorf("Error doing VM update 1: %w", err)
} else if nowUsing == nil {
Expand All @@ -1186,7 +1186,7 @@ func (r *Runner) updateVMResources(
return nil
}

state.vm.SetUsing(*nowUsing)
state.VM.SetUsing(*nowUsing)
}

// Fetch the scheduler, to (a) inform it of the current state, and (b) request an
Expand Down Expand Up @@ -1222,7 +1222,7 @@ func (r *Runner) updateVMResources(
ProtoVersion: PluginProtocolVersion,
Pod: r.podName,
Resources: target,
Metrics: &state.metrics, // FIXME: the metrics here *might* be a little out of date.
Metrics: &state.Metrics, // FIXME: the metrics here *might* be a little out of date.
}
response, err := sched.DoRequest(ctx, logger, &request)
if err != nil {
Expand All @@ -1239,7 +1239,7 @@ func (r *Runner) updateVMResources(

// sched.DoRequest should have validated the permit, meaning that it's not less than the
// current resource usage.
vmUsing := state.vm.Using()
vmUsing := state.VM.Using()
if permit.HasFieldLessThan(vmUsing) {
panic(errors.New("invalid state: permit less than what's in use"))
} else if permit.HasFieldGreaterThan(target) {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (r *Runner) updateVMResources(
// getStateForVMUpdate produces the atomicUpdateState for updateVMResources
//
// This method MUST be called while holding r.lock.
func (r *Runner) getStateForVMUpdate(logger *zap.Logger, updateReason VMUpdateReason) *atomicUpdateState {
func (r *Runner) getStateForVMUpdate(logger *zap.Logger, updateReason VMUpdateReason) *AtomicUpdateState {
if r.lastMetrics == nil {
if updateReason == UpdatedMetrics {
panic(errors.New("invalid state: metrics signalled but r.lastMetrics == nil"))
Expand Down Expand Up @@ -1313,19 +1313,19 @@ func (r *Runner) getStateForVMUpdate(logger *zap.Logger, updateReason VMUpdateRe
config = *r.vm.ScalingConfig
}

return &atomicUpdateState{
computeUnit: *r.computeUnit,
metrics: *r.lastMetrics,
vm: r.vm,
lastApproved: *r.lastApproved,
requestedUpscale: r.requestedUpscale,
config: config,
return &AtomicUpdateState{
ComputeUnit: *r.computeUnit,
Metrics: *r.lastMetrics,
VM: r.vm,
LastApproved: *r.lastApproved,
RequestedUpscale: r.requestedUpscale,
Config: config,
}
}

// desiredVMState calculates what the resource allocation to the VM should be, given the metrics and
// DesiredVMState calculates what the resource allocation to the VM should be, given the metrics and
// current state.
func (s *atomicUpdateState) desiredVMState(allowDecrease bool) api.Resources {
func (s *AtomicUpdateState) DesiredVMState(allowDecrease bool) api.Resources {
// There's some annoying edge cases that this function has to be able to handle properly. For
// the sake of completeness, they are:
//
Expand Down Expand Up @@ -1354,17 +1354,17 @@ func (s *atomicUpdateState) desiredVMState(allowDecrease bool) api.Resources {
// Goal compute unit is at the point where (CPUs) × (LoadAverageFractionTarget) == (load
// average),
// which we can get by dividing LA by LAFT, and then dividing by the number of CPUs per CU
goalCPUs := float64(s.metrics.LoadAverage1Min) / s.config.LoadAverageFractionTarget
cpuGoalCU := uint32(math.Round(goalCPUs / s.computeUnit.VCPU.AsFloat64()))
goalCPUs := float64(s.Metrics.LoadAverage1Min) / s.Config.LoadAverageFractionTarget
cpuGoalCU := uint32(math.Round(goalCPUs / s.ComputeUnit.VCPU.AsFloat64()))

// For Mem:
// Goal compute unit is at the point where (Mem) * (MemoryUsageFractionTarget) == (Mem Usage)
// We can get the desired memory allocation in bytes by dividing MU by MUFT, and then convert
// that to CUs
//
// NOTE: use uint64 for calculations on bytes as uint32 can overflow
memGoalBytes := uint64(math.Round(float64(s.metrics.MemoryUsageBytes) / s.config.MemoryUsageFractionTarget))
bytesPerCU := uint64(int64(s.computeUnit.Mem) * s.vm.Mem.SlotSize.Value())
memGoalBytes := uint64(math.Round(float64(s.Metrics.MemoryUsageBytes) / s.Config.MemoryUsageFractionTarget))
bytesPerCU := uint64(int64(s.ComputeUnit.Mem) * s.VM.Mem.SlotSize.Value())
memGoalCU := uint32(memGoalBytes / bytesPerCU)

goalCU := util.Max(cpuGoalCU, memGoalCU)
Expand All @@ -1379,20 +1379,29 @@ func (s *atomicUpdateState) desiredVMState(allowDecrease bool) api.Resources {
}

// resources for the desired "goal" compute units
goal := s.computeUnit.Mul(uint16(goalCU))
goal := s.ComputeUnit.Mul(uint16(goalCU))

// bound goal by the minimum and maximum resource amounts for the VM
result := goal.Min(s.vm.Max()).Max(s.vm.Min())
result := goal.Min(s.VM.Max()).Max(s.VM.Min())

// If no decreases are allowed, then we *must* make sure that the VM's usage value has not
// decreased, even if it's greater than the VM maximum.
//
// We can run into situtations like this when VM scale-down on bounds change fails, so we end up
// with a usage value greater than the maximum.
if !allowDecrease {
result = result.Max(s.VM.Using())
}

// Check that the result is sound.
//
// With the current (naive) implementation, this is trivially ok. In future versions, it might
// not be so simple, so it's good to have this integrity check here.
if result.HasFieldGreaterThan(s.vm.Max()) {
if allowDecrease && result.HasFieldGreaterThan(s.VM.Max()) {
panic(fmt.Errorf(
"produced invalid desiredVMState: result has field greater than max. this = %+v", *s,
))
} else if result.HasFieldLessThan(s.vm.Min()) {
} else if result.HasFieldLessThan(s.VM.Min()) {
panic(fmt.Errorf(
"produced invalid desiredVMState: result has field less than min. this = %+v", *s,
))
Expand All @@ -1409,11 +1418,11 @@ func (s *atomicUpdateState) desiredVMState(allowDecrease bool) api.Resources {
// divide to a multiple of the Compute Unit, the upper and lower bounds will be different. This can
// happen when the Compute Unit is changed, or when the VM's maximum or minimum resource allocations
// has previously prevented it from being set to a multiple of the Compute Unit.
func (s *atomicUpdateState) computeUnitsBounds() (uint32, uint32) {
func (s *AtomicUpdateState) computeUnitsBounds() (uint32, uint32) {
// (x + M-1) / M is equivalent to ceil(x/M), as long as M != 0, which is already guaranteed by
// the checks on the computeUnit that the scheduler provides.
minCPUUnits := (uint32(s.vm.Cpu.Use) + uint32(s.computeUnit.VCPU) - 1) / uint32(s.computeUnit.VCPU)
minMemUnits := uint32((s.vm.Mem.Use + s.computeUnit.Mem - 1) / s.computeUnit.Mem)
minCPUUnits := (uint32(s.VM.Cpu.Use) + uint32(s.ComputeUnit.VCPU) - 1) / uint32(s.ComputeUnit.VCPU)
minMemUnits := uint32((s.VM.Mem.Use + s.ComputeUnit.Mem - 1) / s.ComputeUnit.Mem)

return util.Min(minCPUUnits, minMemUnits), util.Max(minCPUUnits, minMemUnits)
}
Expand All @@ -1425,16 +1434,16 @@ func (s *atomicUpdateState) computeUnitsBounds() (uint32, uint32) {
//
// This method does not respect any bounds on Compute Units placed by the VM's maximum or minimum
// resource allocation.
func (s *atomicUpdateState) requiredCUForRequestedUpscaling() uint32 {
func (s *AtomicUpdateState) requiredCUForRequestedUpscaling() uint32 {
var required uint32

// note: floor(x / M) + 1 gives the minimum integer value greater than x / M.

if s.requestedUpscale.Cpu {
required = util.Max(required, uint32(s.vm.Cpu.Use/s.computeUnit.VCPU)+1)
if s.RequestedUpscale.Cpu {
required = util.Max(required, uint32(s.VM.Cpu.Use/s.ComputeUnit.VCPU)+1)
}
if s.requestedUpscale.Memory {
required = util.Max(required, uint32(s.vm.Mem.Use/s.computeUnit.Mem)+1)
if s.RequestedUpscale.Memory {
required = util.Max(required, uint32(s.VM.Mem.Use/s.ComputeUnit.Mem)+1)
}

return required
Expand Down
110 changes: 110 additions & 0 deletions pkg/agent/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package agent_test

import (
"testing"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/neondatabase/autoscaling/pkg/agent"
"github.com/neondatabase/autoscaling/pkg/api"
)

func Test_desiredVMState(t *testing.T) {
cases := []struct {
name string

// helpers for setting fields of atomicUpdateState:
metrics api.Metrics
vmUsing api.Resources
lastApproved api.Resources
requestedUpscale api.MoreResources

// expected output from (*atomicUpdateState).desiredVMState(allowDecrease)
expected api.Resources
allowDecrease bool
}{
{
name: "BasicScaleup",
metrics: api.Metrics{
LoadAverage1Min: 0.30,
LoadAverage5Min: 0.0, // unused
MemoryUsageBytes: 0.0,
},
vmUsing: api.Resources{VCPU: 250, Mem: 1},
lastApproved: api.Resources{VCPU: 0, Mem: 0}, // unused
requestedUpscale: api.MoreResources{Cpu: false, Memory: false},

expected: api.Resources{VCPU: 500, Mem: 2},
allowDecrease: true,
},
{
name: "MismatchedApprovedNoScaledown",
metrics: api.Metrics{
LoadAverage1Min: 0.0, // ordinarily would like to scale down
LoadAverage5Min: 0.0,
MemoryUsageBytes: 0.0,
},
vmUsing: api.Resources{VCPU: 250, Mem: 2},
lastApproved: api.Resources{VCPU: 250, Mem: 1},
requestedUpscale: api.MoreResources{Cpu: false, Memory: false},

// need to scale up because vmUsing is mismatched and otherwise we'd be scaling down.
expected: api.Resources{VCPU: 500, Mem: 2},
allowDecrease: false,
},
{
// ref https://github.com/neondatabase/autoscaling/issues/512
name: "MismatchedApprovedNoScaledownButVMAtMaximum",
metrics: api.Metrics{
LoadAverage1Min: 0.0, // ordinarily would like to scale down
LoadAverage5Min: 0.0,
MemoryUsageBytes: 0.0,
},
vmUsing: api.Resources{VCPU: 1000, Mem: 5}, // note: mem greater than maximum. It can happen when scaling bounds change
lastApproved: api.Resources{VCPU: 1000, Mem: 4},
requestedUpscale: api.MoreResources{Cpu: false, Memory: false},

expected: api.Resources{VCPU: 1000, Mem: 5},
allowDecrease: false,
},
}

for _, c := range cases {
state := agent.AtomicUpdateState{
ComputeUnit: api.Resources{VCPU: 250, Mem: 1},
Metrics: c.metrics,
LastApproved: c.lastApproved,
RequestedUpscale: c.requestedUpscale,
Config: api.ScalingConfig{
LoadAverageFractionTarget: 0.5,
MemoryUsageFractionTarget: 0.5,
},
VM: api.VmInfo{
Name: "test",
Namespace: "test",
Cpu: api.VmCpuInfo{
Min: 250,
Use: c.vmUsing.VCPU,
Max: 1000,
},
Mem: api.VmMemInfo{
SlotSize: resource.NewQuantity(1<<30 /* 1 Gi */, resource.BinarySI), // unused, doesn't actually matter.
Min: 1,
Use: c.vmUsing.Mem,
Max: 4,
},
// remaining fields are also unused:
ScalingConfig: nil,
AlwaysMigrate: false,
ScalingEnabled: true,
},
}

t.Run(c.name, func(t *testing.T) {
actual := state.DesiredVMState(c.allowDecrease)
if actual != c.expected {
t.Errorf("expected output %+v but got %+v", c.expected, actual)
}
})
}
}

0 comments on commit 7c3f7a9

Please sign in to comment.