Skip to content

Commit

Permalink
neonvmd: expose interface to control online/offline state of the CPUs
Browse files Browse the repository at this point in the history
Signed-off-by: Misha Sakhnov <[email protected]>
  • Loading branch information
mikhail-sakhnov committed Oct 14, 2024
1 parent 2f3873d commit 4a3f699
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 11 deletions.
95 changes: 84 additions & 11 deletions neonvm-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package main
import (
"flag"
"fmt"
"io"
"math"
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/neondatabase/autoscaling/neonvm/daemon/pkg/cpuscaling"
"go.uber.org/zap"
)

Expand All @@ -26,24 +31,92 @@ func main() {
defer logger.Sync() //nolint:errcheck // what are we gonna do, log something about it?

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
cpuHotPlugController := &cpuscaling.CPUOnlineOffliner{}
srv := cpuServer{
cpuSystemWideScaler: cpuHotPlugController,
logger: logger.Named("cpu-srv"),
}
srv.run(*addr)
}

type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuSystemWideScaler *cpuscaling.CPUOnlineOffliner
logger *zap.Logger
}

// milliCPU is a type that represents CPU in milli units
type milliCPU uint64

// milliCPUFromString converts a byte slice to milliCPU
func milliCPUFromString(s []byte) (milliCPU, error) {
cpu, err := strconv.ParseUint(string(s), 10, 32)
if err != nil {
return 0, err
}
return milliCPU(cpu), nil
}

// ToCPU converts milliCPU to CPU
func (m milliCPU) ToCPU() int {
cpu := float64(m) / 1000.0
// Use math.Ceil to round up to the next CPU.
return int(math.Ceil(cpu))
}

srv := cpuServer{}
srv.run(logger, *addr)
func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
totalCPUs, err := s.cpuSystemWideScaler.GetTotalCPUsCount()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
activeCPUs, err := s.cpuSystemWideScaler.GetActiveCPUsCount()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(fmt.Sprintf("%d %d", activeCPUs, totalCPUs)))
w.WriteHeader(http.StatusOK)
}

type cpuServer struct{}
func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("could not read request body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

milliCPU, err := milliCPUFromString(body)
if err != nil {
s.logger.Error("could not parse request body as uint32", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(milliCPU.ToCPU()); err != nil {
s.logger.Error("failed to ensure online CPUs", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}

func (s *cpuServer) run(logger *zap.Logger, addr string) {
logger = logger.Named("cpu-srv")
func (s *cpuServer) run(addr string) {

mux := http.NewServeMux()
mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
logger.Error("unimplemented!")
w.WriteHeader(http.StatusInternalServerError)
s.handleGetCPUStatus(w)
return
} else if r.Method == http.MethodPut {
logger.Error("unimplemented!")
w.WriteHeader(http.StatusInternalServerError)
s.handleSetCPUStatus(w, r)
return
} else {
// unknown method
w.WriteHeader(http.StatusNotFound)
Expand All @@ -61,7 +134,7 @@ func (s *cpuServer) run(logger *zap.Logger, addr string) {

err := server.ListenAndServe()
if err != nil {
logger.Fatal("CPU server exited with error", zap.Error(err))
s.logger.Fatal("CPU server exited with error", zap.Error(err))
}
logger.Info("CPU server exited without error")
s.logger.Info("CPU server exited without error")
}
196 changes: 196 additions & 0 deletions neonvm-daemon/pkg/cpuscaling/online_offline_cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package cpuscaling

import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
)

// CPU directory path
const cpuPath = "/sys/devices/system/cpu/"

type CPUOnlineOffliner struct {
}

func (c *CPUOnlineOffliner) EnsureOnlineCPUs(X int) error {
cpus, err := getAllCPUs()
if err != nil {
return err
}

onlineCount, err := c.GetActiveCPUsCount()
if err != nil {
return err
}

if onlineCount < uint32(X) {
for _, cpu := range cpus {
if cpu == 0 {
// Skip CPU 0 as it is always online and can't be toggled
continue
}

online, err := isCPUOnline(cpu)
if err != nil {
return err
}

if !online {
// Mark CPU as online
err := setCPUOnline(cpu, true)
if err != nil {
return err
}
onlineCount++
}

// Stop when we reach the target count
if onlineCount == uint32(X) {
break
}
}
} else if onlineCount > uint32(X) {
// Remove CPUs if there are more than X online
for i := len(cpus) - 1; i >= 0; i-- {
cpu := cpus[i]
if cpu == 0 {
// Skip CPU 0 as it cannot be taken offline
continue
}

online, err := isCPUOnline(cpu)
if err != nil {
return err
}

if online {
// Mark CPU as offline
err := setCPUOnline(cpu, false)
if err != nil {
return err
}
onlineCount--
}

// Stop when we reach the target count
if onlineCount == uint32(X) {
break
}
}
}

if onlineCount != uint32(X) {
return fmt.Errorf("failed to ensure %d CPUs are online, current online CPUs: %d", X, onlineCount)
}

return nil
}

// GetActiveCPUsCount() returns the count of online CPUs.
func (c *CPUOnlineOffliner) GetActiveCPUsCount() (uint32, error) {
cpus, err := getAllCPUs()
if err != nil {
return 0, err
}

var onlineCount uint32
for _, cpu := range cpus {
online, err := isCPUOnline(cpu)
if err != nil {
return 0, err
}
if online {
onlineCount++
}
}

return onlineCount, nil
}

// GetTotalCPUsCount returns the total number of CPUs (online + offline).
func (c *CPUOnlineOffliner) GetTotalCPUsCount() (uint32, error) {
cpus, err := getAllCPUs()
if err != nil {
return 0, err
}

return uint32(len(cpus)), nil
}

// Helper functions

// getAllCPUs returns a list of all CPUs that are physically present in the system.
func getAllCPUs() ([]int, error) {
data, err := os.ReadFile(filepath.Join(cpuPath, "possible"))
if err != nil {
return nil, err
}

return parseCPURange(string(data))
}

// parseCPURange parses the CPU range string (e.g., "0-3") and returns a list of CPUs.
func parseCPURange(cpuRange string) ([]int, error) {
var cpus []int
cpuRange = strings.TrimSpace(cpuRange)
parts := strings.Split(cpuRange, "-")

if len(parts) == 1 {
// Single CPU case, e.g., "0"
cpu, err := strconv.Atoi(parts[0])
if err != nil {
return nil, err
}
cpus = append(cpus, cpu)
} else if len(parts) == 2 {
// Range case, e.g., "0-3"
start, err := strconv.Atoi(parts[0])
if err != nil {
return nil, err
}
end, err := strconv.Atoi(parts[1])
if err != nil {
return nil, err
}
for i := start; i <= end; i++ {
cpus = append(cpus, i)
}
}

return cpus, nil
}

// isCPUOnline checks if a given CPU is online.
func isCPUOnline(cpu int) (bool, error) {
data, err := os.ReadFile(filepath.Join(cpuPath, fmt.Sprintf("cpu%d/online", cpu)))
if os.IsNotExist(err) {
// If the file doesn't exist for CPU 0, assume it's online
if cpu == 0 {
return true, nil
}
return false, err
}
if err != nil {
return false, err
}

online := strings.TrimSpace(string(data))
return online == "1", nil
}

// setCPUOnline sets the given CPU as online (true) or offline (false).
func setCPUOnline(cpu int, online bool) error {
state := "0"
if online {
state = "1"
}

err := os.WriteFile(filepath.Join(cpuPath, fmt.Sprintf("cpu%d/online", cpu)), []byte(state), 0644)
if err != nil {
return fmt.Errorf("failed to set CPU %d online status: %v", cpu, err)
}

return nil
}

0 comments on commit 4a3f699

Please sign in to comment.