diff --git a/ci/manifests/interlink-config-local.yaml b/ci/manifests/interlink-config-local.yaml index 9cf22d99..9a043c24 100644 --- a/ci/manifests/interlink-config-local.yaml +++ b/ci/manifests/interlink-config-local.yaml @@ -8,7 +8,7 @@ #InterlinkAddress: "unix:///var/run/interlink.socket" InterlinkAddress: "http://0.0.0.0" InterlinkPort: "3000" -#SidecarURL: "http://plugin" +#sidecarURL: "http://plugin" SidecarURL: "http://0.0.0.0" SidecarPort: "4000" VerboseLogging: true diff --git a/ci/manifests/interlink-config.yaml b/ci/manifests/interlink-config.yaml index 8e462671..a9e9283d 100644 --- a/ci/manifests/interlink-config.yaml +++ b/ci/manifests/interlink-config.yaml @@ -9,7 +9,7 @@ InterlinkAddress: "http://0.0.0.0" InterlinkPort: "3000" SidecarURL: "http://plugin" -#SidecarURL: "http://0.0.0.0" +#sidecarURL: "http://0.0.0.0" SidecarPort: "4000" VerboseLogging: true ErrorsOnlyLogging: false diff --git a/ci/manifests/virtual-kubelet-config.yaml b/ci/manifests/virtual-kubelet-config.yaml index 9bfbdff2..12da4426 100644 --- a/ci/manifests/virtual-kubelet-config.yaml +++ b/ci/manifests/virtual-kubelet-config.yaml @@ -13,9 +13,10 @@ data: ServiceAccount: "virtual-kubelet" Namespace: interlink VKTokenFile: "" - CPU: "100" - Memory: "128Gi" - Pods: "100" + Resources: + CPU: "100" + Memory: "128Gi" + Pods: "100" HTTP: Insecure: true KubeletHTTP: diff --git a/cmd/installer/templates/values.yaml b/cmd/installer/templates/values.yaml index 978ea9cb..97409a21 100644 --- a/cmd/installer/templates/values.yaml +++ b/cmd/installer/templates/values.yaml @@ -5,9 +5,10 @@ interlink: port: {{.InterLinkPort}} virtualNode: - CPUs: {{.VKLimits.CPU}} - MemGiB: {{.VKLimits.Memory}} - Pods: {{.VKLimits.Pods}} + Resources: + CPU: {{.VKLimits.CPU}} + Memory: {{.VKLimits.Memory}} + Pods: {{.VKLimits.Pods}} HTTPProxies: HTTP: null HTTPs: null diff --git a/pkg/virtualkubelet/config.go b/pkg/virtualkubelet/config.go index 11345504..5270f397 100644 --- a/pkg/virtualkubelet/config.go +++ b/pkg/virtualkubelet/config.go @@ -2,27 +2,45 @@ package virtualkubelet // Config holds the whole configuration type Config struct { - InterlinkURL string `yaml:"InterlinkURL"` - Interlinkport string `yaml:"InterlinkPort"` - KubernetesAPIAddr string `yaml:"KubernetesApiAddr"` - KubernetesAPIPort string `yaml:"KubernetesApiPort"` - KubernetesAPICaCrt string `yaml:"KubernetesApiCaCrt"` - DisableProjectedVolumes bool `yaml:"DisableProjectedVolumes"` - VKConfigPath string `yaml:"VKConfigPath"` - VKTokenFile string `yaml:"VKTokenFile"` - ServiceAccount string `yaml:"ServiceAccount"` - Namespace string `yaml:"Namespace"` - PodIP string `yaml:"PodIP"` - VerboseLogging bool `yaml:"VerboseLogging"` - ErrorsOnlyLogging bool `yaml:"ErrorsOnlyLogging"` - HTTP HTTP `yaml:"HTTP"` - KubeletHTTP HTTP `yaml:"KubeletHTTP"` - CPU string `yaml:"CPU,omitempty"` - Memory string `yaml:"Memory,omitempty"` - Pods string `yaml:"Pods,omitempty"` - GPU string `yaml:"nvidia.com/gpu,omitempty"` + InterlinkURL string `yaml:"InterlinkURL"` + InterlinkPort string `yaml:"InterlinkPort"` + KubernetesAPIAddr string `yaml:"KubernetesApiAddr"` + KubernetesAPIPort string `yaml:"KubernetesApiPort"` + KubernetesAPICaCrt string `yaml:"KubernetesApiCaCrt"` + DisableProjectedVolumes bool `yaml:"DisableProjectedVolumes"` + VKConfigPath string `yaml:"VKConfigPath"` + VKTokenFile string `yaml:"VKTokenFile"` + ServiceAccount string `yaml:"ServiceAccount"` + Namespace string `yaml:"Namespace"` + PodIP string `yaml:"PodIP"` + VerboseLogging bool `yaml:"VerboseLogging"` + ErrorsOnlyLogging bool `yaml:"ErrorsOnlyLogging"` + HTTP HTTP `yaml:"HTTP"` + KubeletHTTP HTTP `yaml:"KubeletHTTP"` + Resources Resources `yaml:"Resources"` + NodeLabels []string `yaml:"NodeLabels"` + NodeTaints []TaintSpec `yaml:"NodeTaints"` } type HTTP struct { Insecure bool `yaml:"Insecure"` } + +type Resources struct { + CPU string `yaml:"CPU,omitempty"` + Memory string `yaml:"Memory,omitempty"` + Pods string `yaml:"Pods,omitempty"` + Accelerators []Accelerator `yaml:"Accelerators"` +} + +type Accelerator struct { + ResourceType string `yaml:"ResourceType"` + Model string `yaml:"Model"` + Available int `yaml:"Available"` +} + +type TaintSpec struct { + Key string `yaml:"Key"` + Value string `yaml:"Value"` + Effect string `yaml:"Effect"` +} diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 82c07a30..e0e90dac 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -87,7 +87,7 @@ func getSidecarEndpoint(ctx context.Context, interLinkURL string, interLinkPort // PingInterLink pings the InterLink API and returns true if there's an answer. The second return value is given by the answer provided by the API. func PingInterLink(ctx context.Context, config Config) (bool, int, error) { tracer := otel.Tracer("interlink-service") - interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.InterlinkPort) log.G(ctx).Info("Pinging: " + interLinkEndpoint + "/pinglink") retVal := -1 req, err := http.NewRequest(http.MethodPost, interLinkEndpoint+"/pinglink", nil) @@ -145,7 +145,7 @@ func updateCacheRequest(ctx context.Context, config Config, pod v1.Pod, token st return err } - interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.InterlinkPort) reader := bytes.NewReader(bodyBytes) req, err := http.NewRequest(http.MethodPost, interLinkEndpoint+"/updateCache", reader) if err != nil { @@ -183,7 +183,7 @@ func updateCacheRequest(ctx context.Context, config Config, pod v1.Pod, token st // Returns the call response expressed in bytes and/or the first encountered error func createRequest(ctx context.Context, config Config, pod types.PodCreateRequests, token string) ([]byte, error) { tracer := otel.Tracer("interlink-service") - interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.InterlinkPort) bodyBytes, err := json.Marshal(pod) if err != nil { @@ -232,7 +232,7 @@ func createRequest(ctx context.Context, config Config, pod types.PodCreateReques // deleteRequest performs a REST call to the InterLink API when a Pod is deleted from the VK. It Marshals the standard v1.Pod struct and sends it to InterLink. // Returns the call response expressed in bytes and/or the first encountered error func deleteRequest(ctx context.Context, config Config, pod *v1.Pod, token string) ([]byte, error) { - interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.InterlinkPort) var returnValue []byte bodyBytes, err := json.Marshal(pod) if err != nil { @@ -288,7 +288,7 @@ func deleteRequest(ctx context.Context, config Config, pod *v1.Pod, token string func statusRequest(ctx context.Context, config Config, podsList []*v1.Pod, token string) ([]byte, error) { tracer := otel.Tracer("interlink-service") - interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.InterlinkPort) bodyBytes, err := json.Marshal(podsList) if err != nil { @@ -349,7 +349,7 @@ func LogRetrieval( sessionContext string, ) (io.ReadCloser, error) { tracer := otel.Tracer("interlink-service") - interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.InterlinkPort) token := "" diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index a239a0c5..b84fa310 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "gopkg.in/yaml.v2" @@ -33,11 +34,17 @@ const ( DefaultMemoryCapacity = "3000G" DefaultPodCapacity = "10000" DefaultGPUCapacity = "0" + DefaultFPGACapacity = "0" DefaultListenPort = 10250 NamespaceKey = "namespace" NameKey = "name" CREATE = 0 DELETE = 1 + nvidiaGPU = "nvidia.com/gpu" + amdGPU = "amd.com/gpu" + intelGPU = "intel.com/gpu" + xilinxFPGA = "xilinx.com/fpga" + intelFPGA = "intel.com/fpga" ) func TracerUpdate(ctx *context.Context, name string, pod *v1.Pod) { @@ -170,30 +177,75 @@ func NodeCondition(ready bool) []v1.NodeCondition { } func GetResources(config Config) v1.ResourceList { + gpuCount := map[string]int{} + fpgaCount := map[string]int{} + + for _, accelerator := range config.Resources.Accelerators { + switch accelerator.ResourceType { + case nvidiaGPU, amdGPU, intelGPU: + gpuCount[accelerator.ResourceType] += accelerator.Available + case xilinxFPGA, intelFPGA: + fpgaCount[accelerator.ResourceType] += accelerator.Available + } + } - return v1.ResourceList{ - "cpu": resource.MustParse(config.CPU), - "memory": resource.MustParse(config.Memory), - "pods": resource.MustParse(config.Pods), - "nvidia.com/gpu": resource.MustParse(config.GPU), + resourceList := v1.ResourceList{ + "cpu": resource.MustParse(config.Resources.CPU), + "memory": resource.MustParse(config.Resources.Memory), + "pods": resource.MustParse(config.Resources.Pods), } + for resourceType, count := range gpuCount { + if count > 0 { + resourceList[v1.ResourceName(resourceType)] = *resource.NewQuantity(int64(count), resource.DecimalSI) + } + } + + for resourceType, count := range fpgaCount { + if count > 0 { + resourceList[v1.ResourceName(resourceType)] = *resource.NewQuantity(int64(count), resource.DecimalSI) + } + } + + // log the resource list + for key, value := range resourceList { + log.G(context.Background()).Infof("Resource %s: %s", key, value.String()) + } + + return resourceList } func SetDefaultResource(config *Config) { - if config.CPU == "" { - config.CPU = DefaultCPUCapacity + if config.Resources.CPU == "" { + config.Resources.CPU = DefaultCPUCapacity } - if config.Memory == "" { - config.Memory = DefaultMemoryCapacity + if config.Resources.Memory == "" { + config.Resources.Memory = DefaultMemoryCapacity } - if config.Pods == "" { - config.Pods = DefaultPodCapacity - } - if config.GPU == "" { - config.GPU = DefaultGPUCapacity + if config.Resources.Pods == "" { + config.Resources.Pods = DefaultPodCapacity } + for i, accelerator := range config.Resources.Accelerators { + if accelerator.Available == 0 { + switch accelerator.ResourceType { + case nvidiaGPU, amdGPU, intelGPU: + defaultGPUCapacity, err := strconv.Atoi(DefaultGPUCapacity) + if err != nil { + log.G(context.Background()).Errorf("Invalid default GPU capacity: %v", err) + defaultGPUCapacity = 0 + } + config.Resources.Accelerators[i].Available = defaultGPUCapacity + case xilinxFPGA, intelFPGA: + defaultFPGACapacity, err := strconv.Atoi(DefaultFPGACapacity) + if err != nil { + log.G(context.Background()).Errorf("Invalid default FPGA capacity: %v", err) + defaultFPGACapacity = 0 + } + config.Resources.Accelerators[i].Available = defaultFPGACapacity + } + } + } } func buildKeyFromNames(namespace string, name string) (string, error) { @@ -251,19 +303,117 @@ func NewProviderConfig( "virtual-node.interlink/type": "virtual-kubelet", } + // Add custom labels from config + for _, label := range config.NodeLabels { + + parts := strings.SplitN(label, "=", 2) + if len(parts) == 2 { + lbls[parts[0]] = parts[1] + } else { + log.G(context.Background()).Warnf("Node label %q is not in the correct format. Should be key=value", label) + } + } + + for _, accelerator := range config.Resources.Accelerators { + switch strings.ToLower(accelerator.ResourceType) { + case "nvidia.com/gpu": + lbls["nvidia-gpu-type"] = accelerator.Model + case "xilinx.com/fpga": + lbls["xilinx-fpga-type"] = accelerator.Model + case "intel.com/fpga": + lbls["intel-fpga-type"] = accelerator.Model + default: + log.G(context.Background()).Warnf("Unhandled accelerator resource type: %q", accelerator.ResourceType) + } + } + + taints := []v1.Taint{ + { + Key: "virtual-node.interlink/no-schedule", + Value: strconv.FormatBool(false), + Effect: v1.TaintEffectNoSchedule, + }} + + for _, taint := range config.NodeTaints { + log.G(context.Background()).Infof("Adding taint key=%q value=%q effect=%q", taint.Key, taint.Value, taint.Effect) + + var effect v1.TaintEffect + + switch taint.Effect { + case "NoSchedule": + effect = v1.TaintEffectNoSchedule + case "PreferNoSchedule": + effect = v1.TaintEffectPreferNoSchedule + case "NoExecute": + effect = v1.TaintEffectNoExecute + default: + effect = v1.TaintEffectNoSchedule + log.G(context.Background()).Warnf("Unknown taint effect %q, defaulting to NoSchedule", taint.Effect) + } + + taints = append(taints, v1.Taint{ + Key: taint.Key, + Value: taint.Value, + Effect: effect, + }) + } + + // Add custom labels from config + for _, label := range config.NodeLabels { + + parts := strings.SplitN(label, "=", 2) + if len(parts) == 2 { + lbls[parts[0]] = parts[1] + } else { + log.G(context.Background()).Warnf("Node label %q is not in the correct format. Should be key=value", label) + } + } + + for _, accelerator := range config.Resources.Accelerators { + switch strings.ToLower(accelerator.ResourceType) { + case "nvidia.com/gpu": + lbls["nvidia-gpu-type"] = accelerator.Model + case "xilinx.com/fpga": + lbls["xilinx-fpga-type"] = accelerator.Model + case "intel.com/fpga": + lbls["intel-fpga-type"] = accelerator.Model + default: + log.G(context.Background()).Warnf("Unhandled accelerator resource type: %q", accelerator.ResourceType) + } + } + + for _, taint := range config.NodeTaints { + log.G(context.Background()).Infof("Adding taint key=%q value=%q effect=%q", taint.Key, taint.Value, taint.Effect) + + var effect v1.TaintEffect + + switch taint.Effect { + case "NoSchedule": + effect = v1.TaintEffectNoSchedule + case "PreferNoSchedule": + effect = v1.TaintEffectPreferNoSchedule + case "NoExecute": + effect = v1.TaintEffectNoExecute + default: + effect = v1.TaintEffectNoSchedule + log.G(context.Background()).Warnf("Unknown taint effect %q, defaulting to NoSchedule", taint.Effect) + } + + taints = append(taints, v1.Taint{ + Key: taint.Key, + Value: taint.Value, + Effect: effect, + }) + } + node := v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName, Labels: lbls, - //Annotations: cfg.ExtraAnnotations, }, Spec: v1.NodeSpec{ ProviderID: "external:///" + nodeName, - Taints: []v1.Taint{{ - Key: "virtual-node.interlink/no-schedule", - Value: strconv.FormatBool(true), - Effect: v1.TaintEffectNoSchedule, - }}, + Taints: taints, }, Status: v1.NodeStatus{ NodeInfo: v1.NodeSystemInfo{ @@ -341,18 +491,31 @@ func LoadConfig(ctx context.Context, providerConfig string) (config Config, err // config = configMap SetDefaultResource(&config) - if _, err = resource.ParseQuantity(config.CPU); err != nil { - return config, fmt.Errorf("invalid CPU value %v", config.CPU) + if _, err = resource.ParseQuantity(config.Resources.CPU); err != nil { + return config, fmt.Errorf("invalid CPU value %v", config.Resources.CPU) + } + if _, err = resource.ParseQuantity(config.Resources.Memory); err != nil { + return config, fmt.Errorf("invalid memory value %v", config.Resources.Memory) + } + if _, err = resource.ParseQuantity(config.Resources.Pods); err != nil { + return config, fmt.Errorf("invalid pods value %v", config.Resources.Pods) } - if _, err = resource.ParseQuantity(config.Memory); err != nil { - return config, fmt.Errorf("invalid memory value %v", config.Memory) + if _, err = resource.ParseQuantity(config.Resources.CPU); err != nil { + return config, fmt.Errorf("invalid CPU value %v", config.Resources.CPU) } - if _, err = resource.ParseQuantity(config.Pods); err != nil { - return config, fmt.Errorf("invalid pods value %v", config.Pods) + if _, err = resource.ParseQuantity(config.Resources.Memory); err != nil { + return config, fmt.Errorf("invalid memory value %v", config.Resources.Memory) } - if _, err = resource.ParseQuantity(config.GPU); err != nil { - return config, fmt.Errorf("invalid GPU value %v", config.GPU) + if _, err = resource.ParseQuantity(config.Resources.Pods); err != nil { + return config, fmt.Errorf("invalid pods value %v", config.Resources.Pods) } + for _, accelerator := range config.Resources.Accelerators { + quantity := resource.NewQuantity(int64(accelerator.Available), resource.DecimalSI) + if _, err = resource.ParseQuantity(quantity.String()); err != nil { + return config, fmt.Errorf("invalid value for accelerator %v (model: %v): %v", accelerator.ResourceType, accelerator.Model, err) + } + } + return config, nil }