Skip to content

Commit

Permalink
neonvm: sysfs cpu state based scaling support
Browse files Browse the repository at this point in the history
introduce separate CPU scaling flow based on the vmSpec.cpuScalingMode

If vmSpec.cpuScalingMode is equal to `qmp_scaling` the logic of the
scaling is preserved as before:

- Scale, if required the amount of CPUs using qmp commands.
- If it is required to scale cgroups, call vm-runner /cpu_change endpoint

if vmSpec.cpuScalingMode is equal to `cpuSysfsStateScaling` all cpu
scaling requests go directly to the vm-runner /cpu_change, which in
that configuration goes to the neonvm-daemon to reconcile required
amount of online CPUs.

Value `cpuSysfsStateScaling` also modifies the qemu and the kernel
arguments to enable plug all CPUs but mark as online only first one.

Signed-off-by: Misha Sakhnov <[email protected]>
  • Loading branch information
mikhail-sakhnov committed Oct 17, 2024
1 parent 50b7180 commit 82a4827
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 154 deletions.
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ test: fmt vet envtest ## Run tests.

.PHONY: build
build: fmt vet bin/vm-builder ## Build all neonvm binaries.
GOOS=linux go build -o bin/controller neonvm/main.go
GOOS=linux go build -o bin/vxlan-controller neonvm/tools/vxlan/controller/main.go
GOOS=linux go build -o bin/runner neonvm/runner/*.go
GOOS=linux go build -o bin/controller neonvm-controller/cmd/main.go
GOOS=linux go build -o bin/vxlan-controller neonvm-vxlan-controller/cmd/main.go
GOOS=linux go build -o bin/daemon neonvm-daemon/main.go
GOOS=linux go build -o bin/runner neonvm-runner/cmd/main.go

.PHONY: bin/vm-builder
bin/vm-builder: ## Build vm-builder binary.
Expand Down
6 changes: 3 additions & 3 deletions neonvm-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func main() {
var concurrencyLimit int
var skipUpdateValidationFor map[types.NamespacedName]struct{}
var disableRunnerCgroup bool
var useOnlineOfflining bool
var useCpuSysfsStateScaling bool
var qemuDiskCacheSettings string
var defaultMemoryProvider vmv1.MemoryProvider
var memhpAutoMovableRatio string
Expand All @@ -106,7 +106,6 @@ func main() {
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&useOnlineOfflining, "use-online-offlining", false, "Use online offlining for CPU scaling")
flag.IntVar(&concurrencyLimit, "concurrency-limit", 1, "Maximum number of concurrent reconcile operations")
flag.Func(
"skip-update-validation-for",
Expand Down Expand Up @@ -134,6 +133,7 @@ func main() {
return nil
},
)
flag.BoolVar(&useCpuSysfsStateScaling, "use-cpu-sysfs-state-scaling", false, "Use sysfs cpu state scaling for CPU scaling")
flag.BoolVar(&disableRunnerCgroup, "disable-runner-cgroup", false, "Disable creation of a cgroup in neonvm-runner for fractional CPU limiting")
flag.StringVar(&qemuDiskCacheSettings, "qemu-disk-cache-settings", "cache=none", "Set neonvm-runner's QEMU disk cache settings")
flag.Func("default-memory-provider", "Set default memory provider to use for new VMs", defaultMemoryProvider.FlagFunc)
Expand Down Expand Up @@ -189,7 +189,7 @@ func main() {
reconcilerMetrics := controllers.MakeReconcilerMetrics()

rc := &controllers.ReconcilerConfig{
UseOnlineOfflining: useOnlineOfflining,
DisableRunnerCgroup: disableRunnerCgroup,
MaxConcurrentReconciles: concurrencyLimit,
SkipUpdateValidationFor: skipUpdateValidationFor,
QEMUDiskCacheSettings: qemuDiskCacheSettings,
Expand Down
1 change: 0 additions & 1 deletion neonvm-controller/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ spec:
- "--concurrency-limit=128"
- "--skip-update-validation-for="
- "--disable-runner-cgroup"
- "--use-online-offlining"
# See #775 and its links.
# * cache.writeback=on - don't set O_DSYNC (don't flush every write)
# * cache.direct=on - use O_DIRECT (don't abuse host's page cache!)
Expand Down
55 changes: 20 additions & 35 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"flag"
"fmt"
"io"
"math"
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/neondatabase/autoscaling/neonvm/daemon/pkg/cpuscaling"
"go.uber.org/zap"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling"
)

func main() {
Expand All @@ -31,8 +32,9 @@ func main() {
defer logger.Sync() //nolint:errcheck // what are we gonna do, log something about it?

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
cpuHotPlugController := &cpuscaling.CPUOnlineOffliner{}
cpuHotPlugController := &cpuscaling.CPUSysFsStateScaler{}
srv := cpuServer{
cpuOperationsMutex: &sync.Mutex{},
cpuSystemWideScaler: cpuHotPlugController,
logger: logger.Named("cpu-srv"),
}
Expand All @@ -43,47 +45,29 @@ type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuSystemWideScaler *cpuscaling.CPUOnlineOffliner
cpuSystemWideScaler *cpuscaling.CPUSysFsStateScaler
logger *zap.Logger
}

// milliCPU is a type that represents CPU in milli units
type milliCPU uint64

// milliCPUFromString converts a byte slice to milliCPU
func milliCPUFromString(s []byte) (milliCPU, error) {
cpu, err := strconv.ParseUint(string(s), 10, 32)
if err != nil {
return 0, err
}
return milliCPU(cpu), nil
}

// ToCPU converts milliCPU to CPU
func (m milliCPU) ToCPU() int {
cpu := float64(m) / 1000.0
// Use math.Ceil to round up to the next CPU.
return int(math.Ceil(cpu))
}

func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
// should be replaced with cgroups milliseconds to be exposed instead of having CPU
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
totalCPUs, err := s.cpuSystemWideScaler.GetTotalCPUsCount()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
activeCPUs, err := s.cpuSystemWideScaler.GetActiveCPUsCount()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(fmt.Sprintf("%d %d", activeCPUs, totalCPUs)))
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte(fmt.Sprintf("%d", activeCPUs*1000))); err != nil {
s.logger.Error("could not write response", zap.Error(err))
}
}

func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
// TODO: should the call to this method be conditional, only if the statefs cpu scaling is enabled?
// on the other hand, currently this endpoint is called by runner only if the statefs scaling is enabled
// and it is a bit tricky to pass vmSpec here
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
body, err := io.ReadAll(r.Body)
Expand All @@ -92,19 +76,20 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}

milliCPU, err := milliCPUFromString(body)
updateInt, err := strconv.Atoi(string(body))
update := vmv1.MilliCPU(updateInt)
if err != nil {
s.logger.Error("could not parse request body as uint32", zap.Error(err))
s.logger.Error("could not unmarshal request body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(milliCPU.ToCPU()); err != nil {
s.logger.Error("failed to ensure online CPUs", zap.Error(err))
s.logger.Info("Setting CPU status", zap.String("body", string(body)))
if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(int(update.RoundedUp())); err != nil {
s.logger.Error("could not ensure online CPUs", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) run(addr string) {
Expand Down
74 changes: 58 additions & 16 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ type Config struct {
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
useOnlineOfflining bool
}

func newConfig(logger *zap.Logger) *Config {
Expand All @@ -630,7 +629,6 @@ func newConfig(logger *zap.Logger) *Config {
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
useOnlineOfflining: false,
}
flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump,
"Base64 encoded VirtualMachine json specification")
Expand All @@ -651,7 +649,6 @@ func newConfig(logger *zap.Logger) *Config {
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
flag.StringVar(&cfg.autoMovableRatio, "memhp-auto-movable-ratio",
cfg.autoMovableRatio, "Set value of kernel's memory_hotplug.auto_movable_ratio [virtio-mem only]")
flag.BoolVar(&cfg.useOnlineOfflining, "use-online-offlining", false, "Use online offlining for CPU scaling")
flag.Parse()

if cfg.memoryProvider == "" {
Expand Down Expand Up @@ -761,7 +758,7 @@ func run(logger *zap.Logger) error {

tg.Go("qemu-cmd", func(logger *zap.Logger) error {
var err error
qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, enableSSH, swapSize, hostname, cfg.useOnlineOfflining)
qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, enableSSH, swapSize, hostname)
return err
})

Expand Down Expand Up @@ -815,7 +812,6 @@ func buildQEMUCmd(
enableSSH bool,
swapSize *resource.Quantity,
hostname string,
useOnlineOfflining bool,
) ([]string, error) {
// prepare qemu command line
qemuCmd := []string{
Expand Down Expand Up @@ -893,21 +889,26 @@ func buildQEMUCmd(
logger.Warn("not using KVM acceleration")
}
qemuCmd = append(qemuCmd, "-cpu", "max")
if useOnlineOfflining {
// if we use online offlining we specify start cpus equal to max cpus

// cpu scaling details
maxCPUs := vmSpec.Guest.CPUs.Max.RoundedUp()
minCPUs := vmSpec.Guest.CPUs.Min.RoundedUp()

if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
// if we use sysfs based scaling we specify start cpus equal to max cpus
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
vmSpec.Guest.CPUs.Max.RoundedUp(),
vmSpec.Guest.CPUs.Max.RoundedUp(),
vmSpec.Guest.CPUs.Max.RoundedUp(),
maxCPUs,
maxCPUs,
maxCPUs,
))
} else {
// if we use hotplug we specify start cpus equal to min cpus and scale using udev rules for cpu plug events
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
vmSpec.Guest.CPUs.Min.RoundedUp(),
vmSpec.Guest.CPUs.Max.RoundedUp(),
vmSpec.Guest.CPUs.Max.RoundedUp(),
minCPUs,
maxCPUs,
maxCPUs,
))
}

Expand Down Expand Up @@ -997,8 +998,8 @@ func makeKernelCmdline(cfg *Config, vmSpec *vmv1.VirtualMachineSpec, vmStatus *v
if cfg.appendKernelCmdline != "" {
cmdlineParts = append(cmdlineParts, cfg.appendKernelCmdline)
}
if cfg.useOnlineOfflining {
// if we use online offlining we need to specify the start cpus as min CPUs
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
// if we use sysfs based scaling we need to specify the start cpus as min CPUs
cmdlineParts = append(cmdlineParts, fmt.Sprintf("maxcpus=%d", vmSpec.Guest.CPUs.Min.RoundedUp()))
}

Expand Down Expand Up @@ -1047,8 +1048,12 @@ func runQEMU(
wg := sync.WaitGroup{}

wg.Add(1)
useCpuSysfsStateScaling := false
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
useCpuSysfsStateScaling = true
}
go terminateQemuOnSigterm(ctx, logger, &wg)
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer {
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer || useCpuSysfsStateScaling {
var callbacks cpuServerCallbacks

if cfg.enableDummyCPUServer {
Expand All @@ -1060,6 +1065,13 @@ func runQEMU(
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
if useCpuSysfsStateScaling {
err := setNeonvmDaemonCPU(cpu)
if err != nil {
logger.Error("setting CPU through NeonVM Daemon failed", zap.Any("cpu", cpu), zap.Error(err))
return err
}
}
lastValue.Store(uint32(cpu))
return nil
},
Expand Down Expand Up @@ -1830,3 +1842,33 @@ func overlayNetwork(iface string) (mac.MAC, error) {

return mac, nil
}

func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

url := fmt.Sprintf("http://%s:25183/cpu", vmIP)
body := bytes.NewReader([]byte(fmt.Sprintf("%d", uint32(cpu))))

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

return nil
}
5 changes: 0 additions & 5 deletions pkg/neonvm/controllers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
// ReconcilerConfig stores shared configuration for VirtualMachineReconciler and
// VirtualMachineMigrationReconciler.
type ReconcilerConfig struct {
// UseOnlineOfflining, if true, enables using online offlining for new VM runner pods instead of QMP cpu hotplugging.
//
// This is defined as a config option so we can do a gradual rollout of this change.
UseOnlineOfflining bool

// DisableRunnerCgroup, if true, disables running QEMU in a cgroup in new VM runner pods.
// Fractional CPU scaling will continue to *pretend* to work, but it will not do anything in
// practice.
Expand Down
Loading

0 comments on commit 82a4827

Please sign in to comment.