From 29d9f255e062499be8a82a7cf49c5e5c6a979d55 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Wed, 20 Dec 2023 10:07:44 +0000 Subject: [PATCH 1/9] updated eamples --- examples/interlink-htcondor/vk/deployment.yaml | 11 ++++++++++- examples/interlink-htcondor/vk/kustomization.yaml | 5 ++++- examples/interlink-slurm/README.md | 2 ++ examples/interlink-slurm/vk/deployment.yaml | 13 +++++++++++-- examples/interlink-slurm/vk/kustomization.yaml | 5 ++++- main.go | 1 + 6 files changed, 32 insertions(+), 5 deletions(-) diff --git a/examples/interlink-htcondor/vk/deployment.yaml b/examples/interlink-htcondor/vk/deployment.yaml index 41df7e35..648febd2 100644 --- a/examples/interlink-htcondor/vk/deployment.yaml +++ b/examples/interlink-htcondor/vk/deployment.yaml @@ -48,6 +48,8 @@ spec: value: "/opt/interlink/token" - name: CONFIGPATH value: "/etc/interlink/vk-cfg.json" + - name: KUBECONFIG + value: /etc/interlink/kubeconfig.yaml volumeMounts: - name: config mountPath: /etc/interlink/InterLinkConfig.yaml @@ -55,6 +57,9 @@ spec: - name: config-json mountPath: /etc/interlink/vk-cfg.json subPath: vk-cfg.json + - name: kubeconfig + mountPath: "/etc/interlink/kubeconfig.yaml" + subPath: kubeconfig.yaml - name: token mountPath: /opt/interlink resources: @@ -100,6 +105,10 @@ spec: - name: config-json configMap: # Provide the name of the ConfigMap you want to mount. - name: test-vk-config + name: vk-config-json + - name: kubeconfig + configMap: + # Provide the name of the ConfigMap you want to mount. + name: vk-kubeconfig - name: token emptyDir: {} diff --git a/examples/interlink-htcondor/vk/kustomization.yaml b/examples/interlink-htcondor/vk/kustomization.yaml index a30024c2..c7cb9ebb 100644 --- a/examples/interlink-htcondor/vk/kustomization.yaml +++ b/examples/interlink-htcondor/vk/kustomization.yaml @@ -5,9 +5,12 @@ resources: - ./service-account.yaml configMapGenerator: - - name: test-vk-config + - name: vk-config-json files: - vk-cfg.json=vk-cfg.json - name: vk-config files: - InterLinkConfig.yaml=InterLinkConfig.yaml + - name: vk-kubeconfig + files: + - kubeconfig.yaml=kubeconfig.yaml diff --git a/examples/interlink-slurm/README.md b/examples/interlink-slurm/README.md index eeae9c65..51efc34a 100644 --- a/examples/interlink-slurm/README.md +++ b/examples/interlink-slurm/README.md @@ -42,8 +42,10 @@ mkdir -p interlink/config cp $PATH_TO_KUBECONFIG interlink/config/kubeconfig.yaml sed 's/certificate-authority:.*/certificate-authority-data: '$CA_DATA'/g' $PATH_TO_KUBECONFIG | sed 's/client-certificate:.*/client-certificate-data: '$CERT_DATA'/g' - | sed 's/client-key:.*/client-key-data: '$KEY_DATA'/g' - > interlink/config/kubeconfig.yaml +sed 's/certificate-authority:.*/certificate-authority-data: '$CA_DATA'/g' $PATH_TO_KUBECONFIG | sed 's/client-certificate:.*/client-certificate-data: '$CERT_DATA'/g' - | sed 's/client-key:.*/client-key-data: '$KEY_DATA'/g' - > vk/kubeconfig.yaml chmod 777 interlink/config/kubeconfig.yaml +chmod 777 vk/kubeconfig.yaml ``` Then you need to provide the interLink IP address that should be reachable from the kubernetes pods. In case of this demo setup, that address __is the address of your machine__ diff --git a/examples/interlink-slurm/vk/deployment.yaml b/examples/interlink-slurm/vk/deployment.yaml index c16e19e8..f6f6e4a2 100644 --- a/examples/interlink-slurm/vk/deployment.yaml +++ b/examples/interlink-slurm/vk/deployment.yaml @@ -49,6 +49,8 @@ spec: value: "/opt/interlink/token" - name: CONFIGPATH value: "/etc/interlink/vk-cfg.json" + - name: KUBECONFIG + value: "/etc/interlink/kubeconfig.yaml" volumeMounts: - name: config mountPath: /etc/interlink/InterLinkConfig.yaml @@ -58,6 +60,9 @@ spec: subPath: vk-cfg.json - name: token mountPath: /opt/interlink + - name: kubeconfig + mountPath: "/etc/interlink/kubeconfig.yaml" + subPath: kubeconfig.yaml resources: limits: cpu: 500m @@ -101,6 +106,10 @@ spec: - name: config-json configMap: # Provide the name of the ConfigMap you want to mount. - name: test-vk-config + name: vk-config-json + - name: kubeconfig + configMap: + # Provide the name of the ConfigMap you want to mount. + name: vk-kubeconfig - name: token - emptyDir: {} + emptyDir: {} \ No newline at end of file diff --git a/examples/interlink-slurm/vk/kustomization.yaml b/examples/interlink-slurm/vk/kustomization.yaml index a30024c2..d06fd1e7 100644 --- a/examples/interlink-slurm/vk/kustomization.yaml +++ b/examples/interlink-slurm/vk/kustomization.yaml @@ -5,9 +5,12 @@ resources: - ./service-account.yaml configMapGenerator: - - name: test-vk-config + - name: vk-config-json files: - vk-cfg.json=vk-cfg.json - name: vk-config files: - InterLinkConfig.yaml=InterLinkConfig.yaml + - name: vk-kubeconfig + files: + - kubeconfig.yaml=kubeconfig.yaml \ No newline at end of file diff --git a/main.go b/main.go index e8a2f681..e4132305 100644 --- a/main.go +++ b/main.go @@ -223,6 +223,7 @@ func main() { log.G(ctx).Fatal(err) } } else { + log.G(ctx).Debug("Loading Kubeconfig from " + os.Getenv("KUBECONFIG")) clientCfg, err := clientcmd.NewClientConfigFromBytes(kubecfgFile) if err != nil { log.G(ctx).Fatal(err) From 40af4b03d5368a5194296994d4266f6d9d591329 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Wed, 20 Dec 2023 10:19:26 +0000 Subject: [PATCH 2/9] fixed interlink Ping --- examples/interlink-slurm/README.md | 1 + pkg/interlink/ping.go | 11 ----------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/examples/interlink-slurm/README.md b/examples/interlink-slurm/README.md index 51efc34a..85fb0626 100644 --- a/examples/interlink-slurm/README.md +++ b/examples/interlink-slurm/README.md @@ -41,6 +41,7 @@ export KEY_DATA=$(cat $HOME/.minikube/profiles/minikube/client.key | base64 -w0) mkdir -p interlink/config cp $PATH_TO_KUBECONFIG interlink/config/kubeconfig.yaml +cp $PATH_TO_KUBECONFIG vk/kubeconfig.yaml sed 's/certificate-authority:.*/certificate-authority-data: '$CA_DATA'/g' $PATH_TO_KUBECONFIG | sed 's/client-certificate:.*/client-certificate-data: '$CERT_DATA'/g' - | sed 's/client-key:.*/client-key-data: '$KEY_DATA'/g' - > interlink/config/kubeconfig.yaml sed 's/certificate-authority:.*/certificate-authority-data: '$CA_DATA'/g' $PATH_TO_KUBECONFIG | sed 's/client-certificate:.*/client-certificate-data: '$CERT_DATA'/g' - | sed 's/client-key:.*/client-key-data: '$KEY_DATA'/g' - > vk/kubeconfig.yaml diff --git a/pkg/interlink/ping.go b/pkg/interlink/ping.go index 6656b27d..54ac3e08 100644 --- a/pkg/interlink/ping.go +++ b/pkg/interlink/ping.go @@ -2,22 +2,11 @@ package interlink import ( "net/http" - "os" "github.com/containerd/containerd/log" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" ) func Ping(w http.ResponseWriter, r *http.Request) { log.G(Ctx).Info("InterLink: received Ping call") - kubeconfig, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG")) - if err != nil { - log.G(Ctx).Error("Unable to create a valid clientset config") - } - Clientset, err = kubernetes.NewForConfig(kubeconfig) - if err != nil { - log.G(Ctx).Error("Unable to set up a clientset") - } w.WriteHeader(http.StatusOK) } From fecec9c6b3607695ffd4da439dc5262ff7ee944b Mon Sep 17 00:00:00 2001 From: Surax98 Date: Wed, 20 Dec 2023 11:50:04 +0000 Subject: [PATCH 3/9] kubeconfig is not sent if KUBECONFIG env is set on the interlink host --- main.go | 7 +++- pkg/common/func.go | 89 +++++++++++++++++++++++++--------------- pkg/interlink/kubeCFG.go | 84 +++++++++++++++++++------------------ pkg/interlink/ping.go | 9 ++++ 4 files changed, 113 insertions(+), 76 deletions(-) diff --git a/main.go b/main.go index e4132305..100d964b 100644 --- a/main.go +++ b/main.go @@ -241,19 +241,22 @@ func main() { ILbindNow := false ILbindOld := false + retValue := -1 for { - err, ILbindNow = commonIL.PingInterLink(ctx) + err, ILbindNow, retValue = commonIL.PingInterLink(ctx) if err != nil { log.G(ctx).Error(err) } - if ILbindNow == true && ILbindOld == false { + if ILbindNow == true && ILbindOld == false && retValue == 1 { err = commonIL.NewServiceAccount() if err != nil { log.G(ctx).Fatal(err) } + } else if ILbindNow == true && ILbindOld == false && retValue == 0 { + commonIL.CreateClientsetFrom(ctx, "") } ILbindOld = ILbindNow diff --git a/pkg/common/func.go b/pkg/common/func.go index 7cb35c8e..13202163 100644 --- a/pkg/common/func.go +++ b/pkg/common/func.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "os" + "strconv" "time" "k8s.io/client-go/kubernetes" @@ -205,64 +206,84 @@ func NewServiceAccount() error { os.Remove(path + "getSAConfig.sh") os.Remove(path + "kubeconfig-sa") + err = CreateClientsetFrom(context.Background(), sa) + + return nil +} + +func PingInterLink(ctx context.Context) (error, bool, int) { + log.G(ctx).Info("Pinging: " + InterLinkConfigInst.Interlinkurl + ":" + InterLinkConfigInst.Interlinkport + "/ping") + retVal := -1 + req, err := http.NewRequest(http.MethodPost, InterLinkConfigInst.Interlinkurl+":"+InterLinkConfigInst.Interlinkport+"/ping", nil) + + if err != nil { + log.G(ctx).Error(err) + } + + token, err := os.ReadFile(InterLinkConfigInst.VKTokenFile) // just pass the file name + if err != nil { + log.G(ctx).Error(err) + return err, false, retVal + } + req.Header.Add("Authorization", "Bearer "+string(token)) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err, false, retVal + } + + if resp.StatusCode == http.StatusOK { + retBytes, err := io.ReadAll(resp.Body) + if err != nil { + log.G(ctx).Error(err) + return err, false, retVal + } + retVal, err = strconv.Atoi(string(retBytes)) + if err != nil { + log.G(ctx).Error(err) + return err, false, retVal + } + return nil, true, retVal + } else { + log.G(ctx).Error("Error " + err.Error() + " " + fmt.Sprint(resp.StatusCode)) + return nil, false, retVal + } +} + +func CreateClientsetFrom(ctx context.Context, body string) error { + counter := 0 for { var returnValue, _ = json.Marshal("Error") - reader := bytes.NewReader([]byte(sa)) + reader := bytes.NewReader([]byte(body)) req, err := http.NewRequest(http.MethodPost, InterLinkConfigInst.Interlinkurl+":"+InterLinkConfigInst.Interlinkport+"/setKubeCFG", reader) if err != nil { - log.G(context.Background()).Error(err) + log.G(ctx).Error(err) } token, err := os.ReadFile(InterLinkConfigInst.VKTokenFile) // just pass the file name if err != nil { - log.G(context.Background()).Error(err) + log.G(ctx).Error(err) return err } req.Header.Add("Authorization", "Bearer "+string(token)) resp, err := http.DefaultClient.Do(req) if err != nil { - log.G(context.Background()).Error(err) + log.G(ctx).Error(err) + counter++ + if counter > 5 { + return errors.New("Timeout occured trying to set a kubeconfig") + } time.Sleep(5 * time.Second) continue } else { - returnValue, _ = io.ReadAll(resp.Body) } if resp.StatusCode == http.StatusOK { break } else { - log.G(context.Background()).Error("Error " + err.Error() + " " + string(returnValue)) + log.G(ctx).Error("Error " + err.Error() + " " + string(returnValue)) } } - return nil } - -func PingInterLink(ctx context.Context) (error, bool) { - log.G(ctx).Info("Pinging: " + InterLinkConfigInst.Interlinkurl + ":" + InterLinkConfigInst.Interlinkport + "/ping") - req, err := http.NewRequest(http.MethodPost, InterLinkConfigInst.Interlinkurl+":"+InterLinkConfigInst.Interlinkport+"/ping", nil) - - if err != nil { - log.G(ctx).Error(err) - } - - token, err := os.ReadFile(InterLinkConfigInst.VKTokenFile) // just pass the file name - if err != nil { - log.G(ctx).Error(err) - return err, false - } - req.Header.Add("Authorization", "Bearer "+string(token)) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err, false - } - - if resp.StatusCode == http.StatusOK { - return nil, true - } else { - log.G(ctx).Error("Error " + err.Error() + " " + fmt.Sprint(resp.StatusCode)) - return nil, false - } -} diff --git a/pkg/interlink/kubeCFG.go b/pkg/interlink/kubeCFG.go index 6a2ec048..44d4941e 100644 --- a/pkg/interlink/kubeCFG.go +++ b/pkg/interlink/kubeCFG.go @@ -23,46 +23,48 @@ func SetKubeCFGHandler(w http.ResponseWriter, r *http.Request) { log.G(Ctx).Fatal(err) } - log.G(Ctx).Debug("- Creating folder to save KubeConfig") - err = os.MkdirAll(path, os.ModePerm) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte(err.Error())) - log.G(Ctx).Fatal(err) - } else { - log.G(Ctx).Debug("-- Created folder") - } - log.G(Ctx).Debug("- Creating the actual KubeConfig file") - config, err := os.Create(path + "config") - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte(err.Error())) - log.G(Ctx).Fatal(err) - } else { - log.G(Ctx).Debug("-- Created file") - } - log.G(Ctx).Debug("- Writing configuration to file") - _, err = config.Write([]byte(bodyBytes)) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte(err.Error())) - log.G(Ctx).Fatal(err) - } else { - log.G(Ctx).Info("-- Written configuration") - } - defer config.Close() - log.G(Ctx).Debug("- Setting KUBECONFIG env") - err = os.Setenv("KUBECONFIG", path+"config") - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte(err.Error())) - log.G(Ctx).Fatal(err) - } else { - log.G(Ctx).Info("-- Set KUBECONFIG to " + path + "config") + if string(bodyBytes) != "" { + log.G(Ctx).Debug("- Creating folder to save KubeConfig") + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte(err.Error())) + log.G(Ctx).Fatal(err) + } else { + log.G(Ctx).Debug("-- Created folder") + } + log.G(Ctx).Debug("- Creating the actual KubeConfig file") + config, err := os.Create(path + "config") + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte(err.Error())) + log.G(Ctx).Fatal(err) + } else { + log.G(Ctx).Debug("-- Created file") + } + log.G(Ctx).Debug("- Writing configuration to file") + _, err = config.Write([]byte(bodyBytes)) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte(err.Error())) + log.G(Ctx).Fatal(err) + } else { + log.G(Ctx).Info("-- Written configuration") + } + defer config.Close() + log.G(Ctx).Debug("- Setting KUBECONFIG env") + err = os.Setenv("KUBECONFIG", path+"config") + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte(err.Error())) + log.G(Ctx).Fatal(err) + } else { + log.G(Ctx).Info("-- Set KUBECONFIG to " + path + "config") + } } kubeconfig, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG")) @@ -78,6 +80,8 @@ func SetKubeCFGHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(statusCode) w.Write([]byte(err.Error())) log.G(Ctx).Fatal("Unable to set up a clientset") + } else { + log.G(Ctx).Debug("Clientset properly instantiated") } w.WriteHeader(statusCode) diff --git a/pkg/interlink/ping.go b/pkg/interlink/ping.go index 54ac3e08..2e5255b4 100644 --- a/pkg/interlink/ping.go +++ b/pkg/interlink/ping.go @@ -2,6 +2,7 @@ package interlink import ( "net/http" + "os" "github.com/containerd/containerd/log" ) @@ -9,4 +10,12 @@ import ( func Ping(w http.ResponseWriter, r *http.Request) { log.G(Ctx).Info("InterLink: received Ping call") w.WriteHeader(http.StatusOK) + + // 0 = KUBECONFIG already set + // 1 = KUBECONFIG not set + if os.Getenv("KUBECONFIG") != "" { + w.Write([]byte("0")) + } else { + w.Write([]byte("1")) + } } From 0aced7578c31c320bcacb2f1bd6b3bf87f79d941 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Thu, 21 Dec 2023 08:34:19 +0000 Subject: [PATCH 4/9] moved function from common to main, since it's specifically related to the generation of a kubeconfig --- main.go | 2 +- pkg/common/func.go | 84 ----------------------------------- pkg/virtualkubelet/execute.go | 82 ++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 85 deletions(-) diff --git a/main.go b/main.go index 100d964b..9ddcbf05 100644 --- a/main.go +++ b/main.go @@ -251,7 +251,7 @@ func main() { } if ILbindNow == true && ILbindOld == false && retValue == 1 { - err = commonIL.NewServiceAccount() + err = virtualkubelet.NewServiceAccount() if err != nil { log.G(ctx).Fatal(err) } diff --git a/pkg/common/func.go b/pkg/common/func.go index 13202163..3f32ae65 100644 --- a/pkg/common/func.go +++ b/pkg/common/func.go @@ -14,11 +14,8 @@ import ( "time" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" "github.com/containerd/containerd/log" - - exec "github.com/alexellis/go-execute/pkg/v1" "gopkg.in/yaml.v2" ) @@ -130,87 +127,6 @@ func NewInterLinkConfig() { } } -func NewServiceAccount() error { - - var sa string - var script string - path := InterLinkConfigInst.DataRootFolder + ".kube/" - - err := os.MkdirAll(path, os.ModePerm) - if err != nil { - log.G(context.Background()).Error(err) - return err - } - f, err := os.Create(path + "getSAConfig.sh") - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - defer f.Close() - - script = "#!" + InterLinkConfigInst.BashPath + "\n" + - "SERVICE_ACCOUNT_NAME=" + InterLinkConfigInst.ServiceAccount + "\n" + - "CONTEXT=$(kubectl config current-context)\n" + - "NAMESPACE=" + InterLinkConfigInst.Namespace + "\n" + - "NEW_CONTEXT=" + InterLinkConfigInst.Namespace + "\n" + - "KUBECONFIG_FILE=\"" + path + "kubeconfig-sa\"\n" + - "SECRET_NAME=$(kubectl get secret -l kubernetes.io/service-account.name=${SERVICE_ACCOUNT_NAME} --namespace ${NAMESPACE} --context ${CONTEXT} -o jsonpath='{.items[0].metadata.name}')\n" + - "TOKEN_DATA=$(kubectl get secret ${SECRET_NAME} --context ${CONTEXT} --namespace ${NAMESPACE} -o jsonpath='{.data.token}')\n" + - "TOKEN=$(echo ${TOKEN_DATA} | base64 -d)\n" + - "kubectl config view --raw > ${KUBECONFIG_FILE}.full.tmp\n" + - "kubectl --kubeconfig ${KUBECONFIG_FILE}.full.tmp config use-context ${CONTEXT}\n" + - "kubectl --kubeconfig ${KUBECONFIG_FILE}.full.tmp config view --flatten --minify > ${KUBECONFIG_FILE}.tmp\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp rename-context ${CONTEXT} ${NEW_CONTEXT}\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-credentials ${CONTEXT}-${NAMESPACE}-token-user --token ${TOKEN}\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-context ${NEW_CONTEXT} --user ${CONTEXT}-${NAMESPACE}-token-user\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-context ${NEW_CONTEXT} --namespace ${NAMESPACE}\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp view --flatten --minify > ${KUBECONFIG_FILE}\n" + - "rm ${KUBECONFIG_FILE}.full.tmp\n" + - "rm ${KUBECONFIG_FILE}.tmp" - - _, err = f.Write([]byte(script)) - - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - //executing the script to actually retrieve a valid service account - cmd := []string{path + "getSAConfig.sh"} - shell := exec.ExecTask{ - Command: "sh", - Args: cmd, - Shell: true, - } - execResult, _ := shell.Execute() - if execResult.Stderr != "" { - log.G(context.Background()).Error("Stderr: " + execResult.Stderr + "\nStdout: " + execResult.Stdout) - return errors.New(execResult.Stderr) - } - - //checking if the config is valid - _, err = clientcmd.LoadFromFile(path + "kubeconfig-sa") - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - config, err := os.ReadFile(path + "kubeconfig-sa") - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - sa = string(config) - os.Remove(path + "getSAConfig.sh") - os.Remove(path + "kubeconfig-sa") - - err = CreateClientsetFrom(context.Background(), sa) - - return nil -} - func PingInterLink(ctx context.Context) (error, bool, int) { log.G(ctx).Info("Pinging: " + InterLinkConfigInst.Interlinkurl + ":" + InterLinkConfigInst.Interlinkport + "/ping") retVal := -1 diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index e7cdb313..d3d129e6 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + exec "github.com/alexellis/go-execute/pkg/v1" commonIL "github.com/intertwin-eu/interlink/pkg/common" "github.com/containerd/containerd/log" @@ -22,6 +23,87 @@ import ( var ClientSet *kubernetes.Clientset +func NewServiceAccount() error { + + var sa string + var script string + path := commonIL.InterLinkConfigInst.DataRootFolder + ".kube/" + + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + log.G(context.Background()).Error(err) + return err + } + f, err := os.Create(path + "getSAConfig.sh") + if err != nil { + log.G(context.Background()).Error(err) + return err + } + + defer f.Close() + + script = "#!" + commonIL.InterLinkConfigInst.BashPath + "\n" + + "SERVICE_ACCOUNT_NAME=" + commonIL.InterLinkConfigInst.ServiceAccount + "\n" + + "CONTEXT=$(kubectl config current-context)\n" + + "NAMESPACE=" + commonIL.InterLinkConfigInst.Namespace + "\n" + + "NEW_CONTEXT=" + commonIL.InterLinkConfigInst.Namespace + "\n" + + "KUBECONFIG_FILE=\"" + path + "kubeconfig-sa\"\n" + + "SECRET_NAME=$(kubectl get secret -l kubernetes.io/service-account.name=${SERVICE_ACCOUNT_NAME} --namespace ${NAMESPACE} --context ${CONTEXT} -o jsonpath='{.items[0].metadata.name}')\n" + + "TOKEN_DATA=$(kubectl get secret ${SECRET_NAME} --context ${CONTEXT} --namespace ${NAMESPACE} -o jsonpath='{.data.token}')\n" + + "TOKEN=$(echo ${TOKEN_DATA} | base64 -d)\n" + + "kubectl config view --raw > ${KUBECONFIG_FILE}.full.tmp\n" + + "kubectl --kubeconfig ${KUBECONFIG_FILE}.full.tmp config use-context ${CONTEXT}\n" + + "kubectl --kubeconfig ${KUBECONFIG_FILE}.full.tmp config view --flatten --minify > ${KUBECONFIG_FILE}.tmp\n" + + "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp rename-context ${CONTEXT} ${NEW_CONTEXT}\n" + + "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-credentials ${CONTEXT}-${NAMESPACE}-token-user --token ${TOKEN}\n" + + "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-context ${NEW_CONTEXT} --user ${CONTEXT}-${NAMESPACE}-token-user\n" + + "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-context ${NEW_CONTEXT} --namespace ${NAMESPACE}\n" + + "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp view --flatten --minify > ${KUBECONFIG_FILE}\n" + + "rm ${KUBECONFIG_FILE}.full.tmp\n" + + "rm ${KUBECONFIG_FILE}.tmp" + + _, err = f.Write([]byte(script)) + + if err != nil { + log.G(context.Background()).Error(err) + return err + } + + //executing the script to actually retrieve a valid service account + cmd := []string{path + "getSAConfig.sh"} + shell := exec.ExecTask{ + Command: "sh", + Args: cmd, + Shell: true, + } + execResult, _ := shell.Execute() + if execResult.Stderr != "" { + log.G(context.Background()).Error("Stderr: " + execResult.Stderr + "\nStdout: " + execResult.Stdout) + return errors.New(execResult.Stderr) + } + + //checking if the config is valid + _, err = clientcmd.LoadFromFile(path + "kubeconfig-sa") + if err != nil { + log.G(context.Background()).Error(err) + return err + } + + config, err := os.ReadFile(path + "kubeconfig-sa") + if err != nil { + log.G(context.Background()).Error(err) + return err + } + + sa = string(config) + os.Remove(path + "getSAConfig.sh") + os.Remove(path + "kubeconfig-sa") + + err = commonIL.CreateClientsetFrom(context.Background(), sa) + + return nil +} + func createRequest(pods []*v1.Pod, token string) ([]byte, error) { var returnValue, _ = json.Marshal(commonIL.PodStatus{}) From 466253fdf8cad32dd38c18f80857efb6b29f9e2e Mon Sep 17 00:00:00 2001 From: Surax98 Date: Thu, 21 Dec 2023 10:22:58 +0000 Subject: [PATCH 5/9] improved itwinctl script --- docs/itwinctl.sh | 99 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 18 deletions(-) diff --git a/docs/itwinctl.sh b/docs/itwinctl.sh index a19a1487..23d338b8 100755 --- a/docs/itwinctl.sh +++ b/docs/itwinctl.sh @@ -2,7 +2,7 @@ #export INTERLINKCONFIGPATH="$PWD/kustomizations/InterLinkConfig.yaml" -VERSION="${VERSION:-0.0.1-pre6}" +VERSION="${VERSION:-0.0.4-pre5}" SIDECAR="${SIDECAR:-slurm}" @@ -31,14 +31,12 @@ OIDC_ISSUER="${OIDC_ISSUER:-https://dodas-iam.cloud.cnaf.infn.it/}" AUTHORIZED_GROUPS="${AUTHORIZED_GROUPS:-intw}" AUTHORIZED_AUD="${AUTHORIZED_AUD:-intertw-vk}" API_HTTP_PORT="${API_HTTP_PORT:-8080}" -API_HTTPS_PORT="${API_HTTPS_PORT:-443}" -export HOSTCERT="${HOSTCERT:-/etc/hostcert.pem}" -export HOSTKEY="${HOSTKEY:-/etc/hostkey.pem}" -export INTERLINKPORT="${INTERLINKPORT:-3000}" +API_HTTPS_PORT="${API_HTTPS_PORT:-30443}" +export HOSTCERT="${HOSTCERT:-/home/ciangottinid/EasyRSA-3.1.5/pki/issued/intertwin.crt}" +export HOSTKEY="${HOSTKEY:-/home/ciangottinid/EasyRSA-3.1.5/pki/private/InterTwin.key}" +export INTERLINKPORT="${INTERLINKPORT:-30444}" export INTERLINKURL="${INTERLINKURL:-http://0.0.0.0}" -export INTERLINKPORT="${INTERLINKPORT:-3000}" -export INTERLINKURL="${INTERLINKURL:-http://0.0.0.0}" -export INTERLINKCONFIGPATH="${INTERLINKCONFIGPATH:-$HOME/.config/interlink/InterLinkConfig.yaml}" +export INTERLINKCONFIGPATH="${INTERLINKCONFIGPATH:-$HOME/InterLinkConfig.yaml}" export SBATCHPATH="${SBATCHPATH:-/usr/bin/sbatch}" export SCANCELPATH="${SCANCELPATH:-/usr/bin/scancel}" @@ -46,13 +44,42 @@ export SCANCELPATH="${SCANCELPATH:-/usr/bin/scancel}" install () { mkdir -p $HOME/.local/interlink/logs || exit 1 mkdir -p $HOME/.local/interlink/bin || exit 1 - mkdir -p $HOME/.config/interlink/ || exit 1 + mkdir -p $HOME/.local/interlink/config || exit 1 # download interlinkpath in $HOME/.config/interlink/InterLinkConfig.yaml - curl -o $HOME/.config/interlink/InterLinkConfig.yaml https://raw.githubusercontent.com/intertwin-eu/interLink/main/kustomizations/InterLinkConfig.yaml - - ## Download binaries to $HOME/.local/interlink/bin - curl -L -o interlink.tar.gz https://github.com/intertwin-eu/interLink/releases/download/${VERSION}/interLink_$(uname -s)_$(uname -m).tar.gz \ - && tar -xzvf interlink.tar.gz -C $HOME/.local/interlink/bin/ + if test -f $HOME/.local/interlink/config/InterLinkConfig.yaml; then + echo -e "The InterLink config already exists. Skipping its downloading\n" + else + { + { + curl --fail -o $HOME/.local/interlink/config/InterLinkConfig.yaml https://raw.githubusercontent.com/interTwin-eu/interLink/main/examples/interlink-slurm/vk/InterLinkConfig.yaml + } || { + echo "Error downloading InterLink config, exiting..." + exit 1 + } + } + fi + + ## Download binaries to $HOME/.local/interlink/ + echo "curl --fail -L -o interlink.tar.gz https://github.com/intertwin-eu/interLink/releases/download/${VERSION}/interLink_$(uname -s)_$(uname -m).tar.gz \ + && tar -xzvf interlink.tar.gz -C $HOME/.local/interlink/bin/" + + { + { + export INTERLINKCONFIGPATH=$HOME/interlink/config/InterLinkConfig.yaml + curl --fail -L -o interlink.tar.gz https://github.com/intertwin-eu/interLink/releases/download/${VERSION}/interLink_$(uname -s)_$(uname -m).tar.gz + } || { + echo "Error downloading InterLink binaries, exiting..." + exit 1 + } + } && { + { + tar -xzvf interlink.tar.gz -C $HOME/.local/interlink/bin/ + } || { + echo "Error extracting InterLink binaries, exiting..." + rm interlink.tar.gz + exit 1 + } + } rm interlink.tar.gz ## Download oauth2 proxy @@ -62,8 +89,23 @@ install () { ;; Linux) echo "https://github.com/oauth2-proxy/oauth2-proxy/releases/download/v7.4.0/oauth2-proxy-v7.4.0.${OS_LOWER}-$OSARCH.tar.gz" - curl -L -o oauth2-proxy-v7.4.0.$OS_LOWER-$OSARCH.tar.gz https://github.com/oauth2-proxy/oauth2-proxy/releases/download/v7.4.0/oauth2-proxy-v7.4.0.${OS_LOWER}-$OSARCH.tar.gz - tar -xzvf oauth2-proxy-v7.4.0.$OS_LOWER-$OSARCH.tar.gz -C $HOME/.local/interlink/bin/ + { + { + curl --fail -L -o oauth2-proxy-v7.4.0.$OS_LOWER-$OSARCH.tar.gz https://github.com/oauth2-proxy/oauth2-proxy/releases/download/v7.4.0/oauth2-proxy-v7.4.0.${OS_LOWER}-$OSARCH.tar.gz + } || { + echo "Error downloading OAuth binaries, exiting..." + exit 1 + } + } && { + { + tar -xzvf oauth2-proxy-v7.4.0.$OS_LOWER-$OSARCH.tar.gz -C $HOME/.local/interlink/bin/ + } || { + echo "Error extracting OAuth binaries, exiting..." + rm oauth2-proxy-v7.4.0.$OS_LOWER-$OSARCH.tar.gz + exit 1 + } + } + rm oauth2-proxy-v7.4.0.$OS_LOWER-$OSARCH.tar.gz ;; esac @@ -103,11 +145,15 @@ start () { case "$SIDECAR" in slurm) - $HOME/.local/interlink/bin/interlink-sidecar-slurm &> $HOME/.local/interlink/logs/sd.log & + SHARED_FS=true $HOME/.local/interlink/bin/interlink-sidecar-slurm &> $HOME/.local/interlink/logs/slurm-sidecar.log & echo $! > $HOME/.local/interlink/sd.pid ;; docker) - $HOME/.local/interlink/bin/interlink-sidecar-docker &> $HOME/.local/interlink/logs/sd.log & + $HOME/.local/interlink/bin/interlink-sidecar-docker &> $HOME/.local/interlink/logs/docker-sidecar.log & + echo $! > $HOME/.local/interlink/sd.pid + ;; + htcondor) + $HOME/.local/interlink/bin/interlink-sidecar-htcondor &> $HOME/.local/interlink/logs/htcondor-sidecar.log & echo $! > $HOME/.local/interlink/sd.pid ;; esac @@ -119,6 +165,15 @@ stop () { kill $(cat $HOME/.local/interlink/sd.pid) } +help () { + echo -e "\n\ninstall: Downloads InterLink and OAuth binaries, as well as InterLink configuration. Files are stored in $HOME/.local/interlink\n\n" + echo -e "uninstall: Delete the $HOME/.local/interlink folder, removing all downloaded files\n\n" + echo -e "start: Starts the OAuth proxy, the InterLink API and a Sidecar by the ENV SIDECAR. Actually, valid values for SIDECAR are docker, slurm and htcondor\n\n" + echo -e "stop: Kills all the previously started processes\n\n" + echo -e "restart: Kills all started processes and start them again\n\n" + echo -e "help: Shows this command list" +} + case "$1" in install) install @@ -135,4 +190,12 @@ case "$1" in ;; uninstall) rm -r $HOME/.local/interlink + ;; + help) + help + ;; + *) + echo -e "You need to specify one of the following commands:" + help + ;; esac From 041d7b6398c15ce641b96a8585432937ff7d3efe Mon Sep 17 00:00:00 2001 From: Surax98 Date: Mon, 8 Jan 2024 10:19:06 +0000 Subject: [PATCH 6/9] fixed cfgmaps and secrets retrieving. Also updated Slurm storing paths to be more parametric. Minors here and there --- main.go | 22 +-- pkg/interlink/create.go | 2 +- pkg/interlink/delete.go | 66 +++---- pkg/interlink/func.go | 48 +++-- pkg/sidecars/slurm/Create.go | 20 +- pkg/sidecars/slurm/Delete.go | 28 +-- pkg/sidecars/slurm/GetLogs.go | 7 +- pkg/sidecars/slurm/Status.go | 17 +- pkg/sidecars/slurm/aux.go | 346 ++++++++++++++++------------------ pkg/virtualkubelet/execute.go | 8 +- 10 files changed, 282 insertions(+), 282 deletions(-) diff --git a/main.go b/main.go index 9ddcbf05..d447fa7a 100644 --- a/main.go +++ b/main.go @@ -239,27 +239,27 @@ func main() { nodeProvider, err := virtualkubelet.NewProvider(cfg.ConfigPath, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort, ctx) go func() { - ILbindNow := false - ILbindOld := false + ILbind := false retValue := -1 + counter := 0 for { - err, ILbindNow, retValue = commonIL.PingInterLink(ctx) + err, ILbind, retValue = commonIL.PingInterLink(ctx) if err != nil { log.G(ctx).Error(err) } - if ILbindNow == true && ILbindOld == false && retValue == 1 { - err = virtualkubelet.NewServiceAccount() - if err != nil { - log.G(ctx).Fatal(err) - } - } else if ILbindNow == true && ILbindOld == false && retValue == 0 { - commonIL.CreateClientsetFrom(ctx, "") + if !ILbind && retValue == 1 { + counter++ + } else if ILbind && retValue == 0 { + counter = 0 + } + + if counter > 10 { + log.G(ctx).Fatal("Unable to communicate with the InterLink API, exiting...") } - ILbindOld = ILbindNow time.Sleep(time.Second * 10) } diff --git a/pkg/interlink/create.go b/pkg/interlink/create.go index cba136e1..cf2fc42c 100644 --- a/pkg/interlink/create.go +++ b/pkg/interlink/create.go @@ -40,7 +40,7 @@ func CreateHandler(w http.ResponseWriter, r *http.Request) { if retrieved_data != nil { bodyBytes, err = json.Marshal(retrieved_data) - log.G(Ctx).Info(string(bodyBytes)) + log.G(Ctx).Debug(string(bodyBytes)) reader := bytes.NewReader(bodyBytes) req, err = http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/create", reader) diff --git a/pkg/interlink/delete.go b/pkg/interlink/delete.go index 562ff07f..76f6b347 100644 --- a/pkg/interlink/delete.go +++ b/pkg/interlink/delete.go @@ -24,43 +24,45 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { } var req *http.Request - var pods []*v1.Pod + var pod *v1.Pod reader := bytes.NewReader(bodyBytes) - json.Unmarshal(bodyBytes, &pods) + err = json.Unmarshal(bodyBytes, &pod) - for _, pod := range pods { - check := true //the following loop is used to add a pod to the list of to be deleted pods. this is to avoid multiple calls - if check { - req, err = http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/delete", reader) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(Ctx).Fatal(err) + } - log.G(Ctx).Info("InterLink: forwarding Delete call to sidecar") - resp, err := http.DefaultClient.Do(req) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - log.G(Ctx).Error(err) - return - } + req, err = http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/delete", reader) - returnValue, _ := io.ReadAll(resp.Body) - statusCode = resp.StatusCode + log.G(Ctx).Info("InterLink: forwarding Delete call to sidecar") + resp, err := http.DefaultClient.Do(req) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(Ctx).Error(err) + return + } + + returnValue, _ := io.ReadAll(resp.Body) + statusCode = resp.StatusCode - if statusCode != http.StatusOK { - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - log.G(Ctx).Debug("InterLink: " + string(returnValue)) - var returnJson []commonIL.PodStatus - returnJson = append(returnJson, commonIL.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace}) + if statusCode != http.StatusOK { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + log.G(Ctx).Debug("InterLink: " + string(returnValue)) + var returnJson []commonIL.PodStatus + returnJson = append(returnJson, commonIL.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace}) - bodyBytes, err = json.Marshal(returnJson) - if err != nil { - log.G(Ctx).Error(err) - w.Write([]byte{}) - } else { - w.Write(bodyBytes) - } - } + bodyBytes, err = json.Marshal(returnJson) + if err != nil { + log.G(Ctx).Error(err) + w.Write([]byte{}) + } else { + w.Write(bodyBytes) } + } diff --git a/pkg/interlink/func.go b/pkg/interlink/func.go index f7797bca..cb637aaa 100644 --- a/pkg/interlink/func.go +++ b/pkg/interlink/func.go @@ -9,11 +9,12 @@ import ( ) func getData(pod commonIL.PodCreateRequests) (commonIL.RetrievedPodData, error) { + log.G(Ctx).Debug(pod.ConfigMaps) var retrieved_data commonIL.RetrievedPodData retrieved_data.Pod = pod.Pod for _, container := range pod.Pod.Spec.Containers { log.G(Ctx).Info("- Retrieving Secrets and ConfigMaps for the Docker Sidecar. Container: " + container.Name) - + log.G(Ctx).Debug(container.VolumeMounts) data, err := retrieve_data(container, pod) if err != nil { log.G(Ctx).Error(err) @@ -30,31 +31,38 @@ func retrieve_data(container v1.Container, pod commonIL.PodCreateRequests) (comm for _, mount_var := range container.VolumeMounts { log.G(Ctx).Debug("-- Retrieving data for mountpoint " + mount_var.Name) - //var podVolumeSpec *v1.VolumeSource - for _, cfgMap := range pod.ConfigMaps { - if cfgMap.Name == mount_var.Name { - retrieved_data.Name = container.Name - retrieved_data.ConfigMaps = append(retrieved_data.ConfigMaps, cfgMap) - } - } + for _, vol := range pod.Pod.Spec.Volumes { + if vol.Name == mount_var.Name { + if vol.ConfigMap != nil { - for _, scrt := range pod.ConfigMaps { - if scrt.Name == mount_var.Name { - retrieved_data.Name = container.Name - retrieved_data.ConfigMaps = append(retrieved_data.ConfigMaps, scrt) - } - } + log.G(Ctx).Info("--- Retrieving ConfigMap " + vol.ConfigMap.Name) + retrieved_data.Name = container.Name + for _, cfgMap := range pod.ConfigMaps { + if cfgMap.Name == vol.ConfigMap.Name { + retrieved_data.Name = container.Name + retrieved_data.ConfigMaps = append(retrieved_data.ConfigMaps, cfgMap) + } + } - for _, vol := range pod.Pod.Spec.Volumes { + } else if vol.Secret != nil { - if vol.EmptyDir != nil { - edPath := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Pod.Namespace+"-"+string(pod.Pod.UID)+"/"+"emptyDirs/"+vol.Name) + log.G(Ctx).Info("--- Retrieving Secret " + vol.Secret.SecretName) + retrieved_data.Name = container.Name + for _, secret := range pod.Secrets { + if secret.Name == vol.Secret.SecretName { + retrieved_data.Name = container.Name + retrieved_data.Secrets = append(retrieved_data.Secrets, secret) + } + } - retrieved_data.Name = container.Name - retrieved_data.EmptyDirs = append(retrieved_data.EmptyDirs, edPath) + } else if vol.EmptyDir != nil { + edPath := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Pod.Namespace+"-"+string(pod.Pod.UID)+"/"+"emptyDirs/"+vol.Name) + + retrieved_data.Name = container.Name + retrieved_data.EmptyDirs = append(retrieved_data.EmptyDirs, edPath) + } } } - } return retrieved_data, nil } diff --git a/pkg/sidecars/slurm/Create.go b/pkg/sidecars/slurm/Create.go index 26c0a947..275f8ef0 100644 --- a/pkg/sidecars/slurm/Create.go +++ b/pkg/sidecars/slurm/Create.go @@ -40,6 +40,7 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { for _, data := range req { containers := data.Pod.Spec.Containers metadata := data.Pod.ObjectMeta + filesPath := commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID) var singularity_command_pod []SingularityCommand @@ -54,13 +55,14 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { envs := prepare_envs(container) image := "" - mounts, err := prepare_mounts(container, req) + mounts, err := prepare_mounts(filesPath, container, req) + log.G(Ctx).Debug(mounts) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) w.Write([]byte("Error prepairing mounts. Check Slurm Sidecar's logs")) log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + string(data.Pod.UID)) + os.RemoveAll(filesPath) return } @@ -72,7 +74,7 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { log.G(Ctx).Info("- image-uri annotation not specified for path in remote filesystem") } } else { - image = "docker://" + container.Image + image = container.Image } log.G(Ctx).Debug("-- Appending all commands together...") @@ -85,13 +87,13 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { singularity_command_pod = append(singularity_command_pod, SingularityCommand{command: singularity_command, containerName: container.Name}) } - path, err := produce_slurm_script(string(data.Pod.UID), metadata, singularity_command_pod) + path, err := produce_slurm_script(filesPath, data.Pod.Namespace, string(data.Pod.UID), metadata, singularity_command_pod) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) w.Write([]byte("Error producing Slurm script. Check Slurm Sidecar's logs")) log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + string(data.Pod.UID)) + os.RemoveAll(filesPath) return } out, err := slurm_batch_submit(path) @@ -100,18 +102,18 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(statusCode) w.Write([]byte("Error submitting Slurm script. Check Slurm Sidecar's logs")) log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + string(data.Pod.UID)) + os.RemoveAll(filesPath) return } log.G(Ctx).Info(out) - err = handle_jid(string(data.Pod.UID), out, data.Pod) + err = handle_jid(string(data.Pod.UID), out, data.Pod, filesPath) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) w.Write([]byte("Error handling JID. Check Slurm Sidecar's logs")) log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + string(data.Pod.UID)) - err = delete_container(string(data.Pod.UID)) + os.RemoveAll(filesPath) + err = delete_container(string(data.Pod.UID), filesPath) return } } diff --git a/pkg/sidecars/slurm/Delete.go b/pkg/sidecars/slurm/Delete.go index e25a86a1..bbbb957d 100644 --- a/pkg/sidecars/slurm/Delete.go +++ b/pkg/sidecars/slurm/Delete.go @@ -24,8 +24,8 @@ func StopHandler(w http.ResponseWriter, r *http.Request) { return } - var req []*v1.Pod - err = json.Unmarshal(bodyBytes, &req) + var pod *v1.Pod + err = json.Unmarshal(bodyBytes, &pod) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -34,18 +34,18 @@ func StopHandler(w http.ResponseWriter, r *http.Request) { return } - for _, pod := range req { - err = delete_container(string(pod.UID)) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error deleting containers. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - return - } - if os.Getenv("SHARED_FS") != "true" { - err = os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID)) - } + filesPath := commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID) + + err = delete_container(string(pod.UID), filesPath) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error deleting containers. Check Slurm Sidecar's logs")) + log.G(Ctx).Error(err) + return + } + if os.Getenv("SHARED_FS") != "true" { + err = os.RemoveAll(filesPath) } w.WriteHeader(statusCode) diff --git a/pkg/sidecars/slurm/GetLogs.go b/pkg/sidecars/slurm/GetLogs.go index 3e956958..3c63e13f 100644 --- a/pkg/sidecars/slurm/GetLogs.go +++ b/pkg/sidecars/slurm/GetLogs.go @@ -37,6 +37,7 @@ func GetLogsHandler(w http.ResponseWriter, r *http.Request) { return } + path := commonIL.InterLinkConfigInst.DataRootFolder + req.Namespace + "-" + req.PodUID var output []byte if req.Opts.Timestamps { log.G(Ctx).Error(errors.New("Not Implemented")) @@ -44,11 +45,11 @@ func GetLogsHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(statusCode) return } else { - log.G(Ctx).Info("Reading " + commonIL.InterLinkConfigInst.DataRootFolder + req.PodUID + "/" + req.ContainerName + ".out") - output, err = os.ReadFile(commonIL.InterLinkConfigInst.DataRootFolder + req.PodUID + "/" + req.ContainerName + ".out") + log.G(Ctx).Info("Reading " + path + "/" + req.ContainerName + ".out") + output, err = os.ReadFile(path + "/" + req.ContainerName + ".out") if err != nil { log.G(Ctx).Info("Failed to read container logs, falling back to job log.") - output, err = os.ReadFile(commonIL.InterLinkConfigInst.DataRootFolder + req.PodUID + "/" + "job.out") + output, err = os.ReadFile(path + "/" + "job.out") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) diff --git a/pkg/sidecars/slurm/Status.go b/pkg/sidecars/slurm/Status.go index 696f7b8a..6759fe0e 100644 --- a/pkg/sidecars/slurm/Status.go +++ b/pkg/sidecars/slurm/Status.go @@ -62,6 +62,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { } for _, pod := range req { + path := commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID) for i, jid := range JIDs { if jid.PodUID == string(pod.UID) { cmd := []string{"--noheader", "-a", "-j " + jid.JID} @@ -73,8 +74,8 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { execReturn, _ := shell.Execute() timeNow = time.Now() - log.G(Ctx).Info("ERR: ", execReturn.Stderr) if execReturn.Stderr != "" { + log.G(Ctx).Info("ERR: ", execReturn.Stderr) containerStatuses := []v1.ContainerStatus{} for _, ct := range pod.Spec.Containers { log.G(Ctx).Info("Getting exit status from " + commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/" + ct.Name + ".status") @@ -132,7 +133,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { case "CD": if jid.EndTime.IsZero() { JIDs[i].EndTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/FinishedAt.time") + f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -147,7 +148,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { case "CG": if jid.StartTime.IsZero() { JIDs[i].StartTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/StartedAt.time") + f, err := os.Create(path + "/StartedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -162,7 +163,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { case "F": if jid.EndTime.IsZero() { JIDs[i].EndTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/FinishedAt.time") + f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -180,7 +181,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { case "PR": if jid.EndTime.IsZero() { JIDs[i].EndTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/FinishedAt.time") + f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -195,7 +196,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { case "R": if jid.StartTime.IsZero() { JIDs[i].StartTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/StartedAt.time") + f, err := os.Create(path + "/StartedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -213,7 +214,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { case "ST": if jid.EndTime.IsZero() { JIDs[i].EndTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/FinishedAt.time") + f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -228,7 +229,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { default: if jid.EndTime.IsZero() { JIDs[i].EndTime = timeNow - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/FinishedAt.time") + f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) diff --git a/pkg/sidecars/slurm/aux.go b/pkg/sidecars/slurm/aux.go index 49f0f986..f1ac58e8 100644 --- a/pkg/sidecars/slurm/aux.go +++ b/pkg/sidecars/slurm/aux.go @@ -152,25 +152,25 @@ func prepare_envs(container v1.Container) []string { } } -func prepare_mounts(container v1.Container, data []commonIL.RetrievedPodData) ([]string, error) { +func prepare_mounts(workingPath string, container v1.Container, data []commonIL.RetrievedPodData) ([]string, error) { log.G(Ctx).Info("-- Preparing mountpoints for " + container.Name) mount := make([]string, 1) mount = append(mount, "--bind") mount_data := "" for _, podData := range data { - err := os.MkdirAll(commonIL.InterLinkConfigInst.DataRootFolder+string(podData.Pod.UID), os.ModePerm) + err := os.MkdirAll(workingPath, os.ModePerm) if err != nil { log.G(Ctx).Error(err) return nil, err } else { - log.G(Ctx).Info("-- Created directory " + commonIL.InterLinkConfigInst.DataRootFolder + string(podData.Pod.UID)) + log.G(Ctx).Info("-- Created directory " + workingPath) } for _, cont := range podData.Containers { for _, cfgMap := range cont.ConfigMaps { if container.Name == cont.Name { - configMapsPaths, envs, err := mountData(container, podData.Pod, cfgMap) + configMapsPaths, envs, err := mountData(workingPath, container, podData.Pod, cfgMap) if err != nil { log.G(Ctx).Error(err) return nil, err @@ -182,17 +182,15 @@ func prepare_mounts(container v1.Container, data []commonIL.RetrievedPodData) ([ splitDirs := strings.Split(dirs[0], "/") dir := filepath.Join(splitDirs[:len(splitDirs)-1]...) prefix += "\nmkdir -p " + dir + " && touch " + dirs[0] + " && echo $" + envs[i] + " > " + dirs[0] - mount_data += dir - } else { - mount_data += path } + mount_data += path } } } for _, secret := range cont.Secrets { if container.Name == cont.Name { - secretsPaths, envs, err := mountData(container, podData.Pod, secret) + secretsPaths, envs, err := mountData(workingPath, container, podData.Pod, secret) if err != nil { log.G(Ctx).Error(err) return nil, err @@ -203,17 +201,15 @@ func prepare_mounts(container v1.Container, data []commonIL.RetrievedPodData) ([ splitDirs := strings.Split(dirs[0], "/") dir := filepath.Join(splitDirs[:len(splitDirs)-1]...) prefix += "\nmkdir -p " + dir + " && touch " + dirs[0] + " && echo $" + envs[i] + " > " + dirs[0] - mount_data += dir - } else { - mount_data += path } + mount_data += path } } } for _, emptyDir := range cont.EmptyDirs { if container.Name == cont.Name { - paths, _, err := mountData(container, podData.Pod, emptyDir) + paths, _, err := mountData(workingPath, container, podData.Pod, emptyDir) if err != nil { log.G(Ctx).Error(err) return nil, err @@ -238,29 +234,23 @@ func prepare_mounts(container v1.Container, data []commonIL.RetrievedPodData) ([ return append(mount, mount_data), nil } -func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands []SingularityCommand) (string, error) { +func produce_slurm_script(path string, podNamespace string, podUID string, metadata metav1.ObjectMeta, commands []SingularityCommand) (string, error) { log.G(Ctx).Info("-- Creating file for the Slurm script") - err := os.MkdirAll(commonIL.InterLinkConfigInst.DataRootFolder+podUID, os.ModePerm) + err := os.MkdirAll(path, os.ModePerm) if err != nil { log.G(Ctx).Error(err) return "", err } else { - log.G(Ctx).Info("-- Created directory " + commonIL.InterLinkConfigInst.DataRootFolder + podUID) + log.G(Ctx).Info("-- Created directory " + path) } - path := commonIL.InterLinkConfigInst.DataRootFolder + podUID + "/job.sh" postfix := "" - err = os.RemoveAll(path) - if err != nil { - log.G(Ctx).Error(err) - return "", err - } - f, err := os.Create(path) + f, err := os.Create(path + "/job.sh") if err != nil { log.G(Ctx).Error(err) return "", err } - err = os.Chmod(path, 0774) + err = os.Chmod(path+"/job.sh", 0774) if err != nil { log.G(Ctx).Error(err) return "", err @@ -268,10 +258,10 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands [] defer f.Close() if err != nil { - log.G(Ctx).Error("Unable to create file " + path) + log.G(Ctx).Error("Unable to create file " + path + "/job.sh") return "", err } else { - log.G(Ctx).Debug("--- Created file " + path) + log.G(Ctx).Debug("--- Created file " + path + "/job.sh") } var sbatch_flags_from_argo []string @@ -323,7 +313,7 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands [] sbatch_macros := "#!" + commonIL.InterLinkConfigInst.BashPath + "\n#SBATCH --job-name=" + podUID + - "\n#SBATCH --output=" + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "/" + "job.out" + + "\n#SBATCH --output=" + path + "/job.out" + sbatch_flags_as_string + "\n" + prefix + @@ -337,8 +327,8 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands [] for _, singularityCommand := range commands { stringToBeWritten += "\n" + strings.Join(singularityCommand.command[:], " ") + - " &> " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "/" + singularityCommand.containerName + ".out; " + - "echo $? > " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "/" + singularityCommand.containerName + ".status &" + " &> " + path + "/" + singularityCommand.containerName + ".out; " + + "echo $? > " + path + "/" + singularityCommand.containerName + ".status &" } stringToBeWritten += "\n" + postfix @@ -352,7 +342,7 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands [] log.G(Ctx).Debug("---- Written file") } - return path, nil + return f.Name(), nil } func slurm_batch_submit(path string) (string, error) { @@ -380,12 +370,10 @@ func slurm_batch_submit(path string) (string, error) { return string(execReturn.Stdout), nil } -func handle_jid(podUID string, output string, pod v1.Pod) error { - //Submitted batch job 8017236 - //Submitted batch job 60920 +func handle_jid(podUID string, output string, pod v1.Pod, path string) error { r := regexp.MustCompile(`Submitted batch job (?P\d+)`) jid := r.FindStringSubmatch(output) - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "/JobID.jid") + f, err := os.Create(path + "/JobID.jid") if err != nil { log.G(Ctx).Error("Can't create jid_file") return err @@ -418,7 +406,7 @@ func removeJID(jidToBeRemoved string) { } } -func delete_container(podUID string) error { +func delete_container(podUID string, path string) error { log.G(Ctx).Info("- Deleting Job for pod " + podUID) for _, jid := range JIDs { if jid.PodUID == podUID { @@ -429,7 +417,7 @@ func delete_container(podUID string) error { } else { log.G(Ctx).Info("- Deleted Job ", jid.JID) } - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID) + os.RemoveAll(path) removeJID(jid.JID) return nil } @@ -438,13 +426,7 @@ func delete_container(podUID string) error { return nil } -func mountData(container v1.Container, pod v1.Pod, data interface{}) ([]string, []string, error) { - wd, err := os.Getwd() - if err != nil { - log.G(Ctx).Error(err) - return nil, nil, err - } - +func mountData(path string, container v1.Container, pod v1.Pod, data interface{}) ([]string, []string, error) { if commonIL.InterLinkConfigInst.ExportPodData { for _, mountSpec := range container.VolumeMounts { var podVolumeSpec *v1.VolumeSource @@ -452,185 +434,185 @@ func mountData(container v1.Container, pod v1.Pod, data interface{}) ([]string, for _, vol := range pod.Spec.Volumes { if vol.Name == mountSpec.Name { podVolumeSpec = &vol.VolumeSource - } - - switch mount := data.(type) { - case v1.ConfigMap: - configMaps := make(map[string]string) - var configMapNamePaths []string - var envs []string - err := os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID) + "/" + "configMaps/" + vol.Name) + switch mount := data.(type) { + case v1.ConfigMap: + configMaps := make(map[string]string) + var configMapNamePaths []string + var envs []string - if err != nil { - log.G(Ctx).Error("Unable to delete root folder") - return nil, nil, err - } + err := os.RemoveAll(path + "/configMaps/" + vol.Name) - if podVolumeSpec != nil && podVolumeSpec.ConfigMap != nil { - log.G(Ctx).Info("--- Mounting ConfigMap " + podVolumeSpec.ConfigMap.Name) - mode := os.FileMode(*podVolumeSpec.ConfigMap.DefaultMode) - podConfigMapDir := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/", "configMaps/", vol.Name) - - if mount.Data != nil { - for key := range mount.Data { - configMaps[key] = mount.Data[key] - path := filepath.Join(podConfigMapDir, key) - path += (":" + mountSpec.MountPath + "/" + key + ",") - configMapNamePaths = append(configMapNamePaths, path) - - if os.Getenv("SHARED_FS") != "true" { - env := string(container.Name) + "_CFG_" + key - log.G(Ctx).Debug("---- Setting env " + env + " to mount the file later") - os.Setenv(env, mount.Data[key]) - envs = append(envs, env) - } - } + if err != nil { + log.G(Ctx).Error("Unable to delete root folder") + return nil, nil, err } - if os.Getenv("SHARED_FS") == "true" { - log.G(Ctx).Info("--- Shared FS enabled, files will be directly created before the job submission") - cmd := []string{"-p " + podConfigMapDir} - shell := exec2.ExecTask{ - Command: "mkdir", - Args: cmd, - Shell: true, + if podVolumeSpec != nil && podVolumeSpec.ConfigMap != nil { + log.G(Ctx).Info("--- Mounting ConfigMap " + podVolumeSpec.ConfigMap.Name) + mode := os.FileMode(*podVolumeSpec.ConfigMap.DefaultMode) + podConfigMapDir := filepath.Join(path+"/", "configMaps/", vol.Name) + + if mount.Data != nil { + for key := range mount.Data { + configMaps[key] = mount.Data[key] + fullPath := filepath.Join(podConfigMapDir, key) + fullPath += (":" + mountSpec.MountPath + "/" + key + ",") + configMapNamePaths = append(configMapNamePaths, fullPath) + + if os.Getenv("SHARED_FS") != "true" { + env := string(container.Name) + "_CFG_" + key + log.G(Ctx).Debug("---- Setting env " + env + " to mount the file later") + os.Setenv(env, mount.Data[key]) + envs = append(envs, env) + } + } } - execReturn, err := shell.Execute() + if os.Getenv("SHARED_FS") == "true" { + log.G(Ctx).Info("--- Shared FS enabled, files will be directly created before the job submission") + cmd := []string{"-p " + podConfigMapDir} + shell := exec2.ExecTask{ + Command: "mkdir", + Args: cmd, + Shell: true, + } - if err != nil { - log.G(Ctx).Error(err) - return nil, nil, err - } else if execReturn.Stderr != "" { - log.G(Ctx).Error(execReturn.Stderr) - return nil, nil, errors.New(execReturn.Stderr) - } else { - log.G(Ctx).Debug("--- Created folder " + podConfigMapDir) - } + execReturn, err := shell.Execute() - log.G(Ctx).Debug("--- Writing ConfigMaps files") - for k, v := range configMaps { - // TODO: Ensure that these files are deleted in failure cases - fullPath := filepath.Join(podConfigMapDir, k) - err = os.WriteFile(fullPath, []byte(v), mode) if err != nil { - log.G(Ctx).Errorf("Could not write ConfigMap file %s", fullPath) - os.RemoveAll(fullPath) + log.G(Ctx).Error(err) + return nil, nil, err + } else if execReturn.Stderr != "" { + log.G(Ctx).Error(execReturn.Stderr) + return nil, nil, errors.New(execReturn.Stderr) + } else { + log.G(Ctx).Debug("--- Created folder " + podConfigMapDir) + } + + log.G(Ctx).Debug("--- Writing ConfigMaps files") + for k, v := range configMaps { + // TODO: Ensure that these files are deleted in failure cases + fullPath := filepath.Join(podConfigMapDir, k) + err = os.WriteFile(fullPath, []byte(v), mode) if err != nil { - log.G(Ctx).Error("Unable to remove file " + fullPath) + log.G(Ctx).Errorf("Could not write ConfigMap file %s", fullPath) + os.RemoveAll(fullPath) + if err != nil { + log.G(Ctx).Error("Unable to remove file " + fullPath) + return nil, nil, err + } return nil, nil, err + } else { + log.G(Ctx).Debug("Written ConfigMap file " + fullPath) } - return nil, nil, err - } else { - log.G(Ctx).Debug("Written ConfigMap file " + fullPath) } } + return configMapNamePaths, envs, nil } - return configMapNamePaths, envs, nil - } - case v1.Secret: - secrets := make(map[string][]byte) - var secretNamePaths []string - var envs []string + case v1.Secret: + secrets := make(map[string][]byte) + var secretNamePaths []string + var envs []string - err := os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID) + "/" + "secrets/" + vol.Name) + err := os.RemoveAll(path + "/secrets/" + vol.Name) - if err != nil { - log.G(Ctx).Error("Unable to delete root folder") - return nil, nil, err - } + if err != nil { + log.G(Ctx).Error("Unable to delete root folder") + return nil, nil, err + } - if podVolumeSpec != nil && podVolumeSpec.Secret != nil { - log.G(Ctx).Info("--- Mounting Secret " + podVolumeSpec.Secret.SecretName) - mode := os.FileMode(*podVolumeSpec.Secret.DefaultMode) - podSecretDir := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/", "secrets/", vol.Name) - - if mount.Data != nil { - for key := range mount.Data { - secrets[key] = mount.Data[key] - path := filepath.Join(podSecretDir, key) - path += (":" + mountSpec.MountPath + "/" + key + ",") - secretNamePaths = append(secretNamePaths, path) - - if os.Getenv("SHARED_FS") != "true" { - env := string(container.Name) + "_SECRET_" + key - log.G(Ctx).Debug("---- Setting env " + env + " to mount the file later") - os.Setenv(env, string(mount.Data[key])) - envs = append(envs, env) + if podVolumeSpec != nil && podVolumeSpec.Secret != nil { + log.G(Ctx).Info("--- Mounting Secret " + podVolumeSpec.Secret.SecretName) + mode := os.FileMode(*podVolumeSpec.Secret.DefaultMode) + podSecretDir := filepath.Join(path+"/", "secrets/", vol.Name) + + if mount.Data != nil { + for key := range mount.Data { + secrets[key] = mount.Data[key] + fullPath := filepath.Join(podSecretDir, key) + fullPath += (":" + mountSpec.MountPath + "/" + key + ",") + secretNamePaths = append(secretNamePaths, fullPath) + + if os.Getenv("SHARED_FS") != "true" { + env := string(container.Name) + "_SECRET_" + key + log.G(Ctx).Debug("---- Setting env " + env + " to mount the file later") + os.Setenv(env, string(mount.Data[key])) + envs = append(envs, env) + } + } + } + + if os.Getenv("SHARED_FS") == "true" { + log.G(Ctx).Info("--- Shared FS enabled, files will be directly created before the job submission") + cmd := []string{"-p " + podSecretDir} + shell := exec2.ExecTask{ + Command: "mkdir", + Args: cmd, + Shell: true, + } + + execReturn, err := shell.Execute() + if strings.Compare(execReturn.Stdout, "") != 0 { + log.G(Ctx).Error(err) + return nil, nil, err + } + if execReturn.Stderr != "" { + log.G(Ctx).Error(execReturn.Stderr) + return nil, nil, errors.New(execReturn.Stderr) + } else { + log.G(Ctx).Debug("--- Created folder " + podSecretDir) + } + + log.G(Ctx).Debug("--- Writing Secret files") + for k, v := range secrets { + // TODO: Ensure that these files are deleted in failure cases + fullPath := filepath.Join(podSecretDir, k) + os.WriteFile(fullPath, v, mode) + if err != nil { + log.G(Ctx).Errorf("Could not write Secret file %s", fullPath) + err = os.RemoveAll(fullPath) + if err != nil { + log.G(Ctx).Error("Unable to remove file " + fullPath) + return nil, nil, err + } + return nil, nil, err + } else { + log.G(Ctx).Debug("--- Written Secret file " + fullPath) + } } } + return secretNamePaths, envs, nil } - if os.Getenv("SHARED_FS") == "true" { - log.G(Ctx).Info("--- Shared FS enabled, files will be directly created before the job submission") - cmd := []string{"-p " + podSecretDir} + case string: + if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil { + var edPath string + edPath = filepath.Join(path + "/" + "emptyDirs/" + vol.Name) + log.G(Ctx).Info("-- Creating EmptyDir in " + edPath) + cmd := []string{"-p " + edPath} shell := exec2.ExecTask{ Command: "mkdir", Args: cmd, Shell: true, } - execReturn, err := shell.Execute() - if strings.Compare(execReturn.Stdout, "") != 0 { + _, err := shell.Execute() + if err != nil { log.G(Ctx).Error(err) return nil, nil, err - } - if execReturn.Stderr != "" { - log.G(Ctx).Error(execReturn.Stderr) - return nil, nil, errors.New(execReturn.Stderr) } else { - log.G(Ctx).Debug("--- Created folder " + podSecretDir) - } - - log.G(Ctx).Debug("--- Writing Secret files") - for k, v := range secrets { - // TODO: Ensure that these files are deleted in failure cases - fullPath := filepath.Join(podSecretDir, k) - os.WriteFile(fullPath, v, mode) - if err != nil { - log.G(Ctx).Errorf("Could not write Secret file %s", fullPath) - err = os.RemoveAll(fullPath) - if err != nil { - log.G(Ctx).Error("Unable to remove file " + fullPath) - return nil, nil, err - } - return nil, nil, err - } else { - log.G(Ctx).Debug("--- Written Secret file " + fullPath) - } + log.G(Ctx).Debug("-- Created EmptyDir in " + edPath) } - } - return secretNamePaths, envs, nil - } - case string: - if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil { - var edPath string - edPath = filepath.Join(wd+"/"+commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/"+"emptyDirs/"+vol.Name) - log.G(Ctx).Info("-- Creating EmptyDir in " + edPath) - cmd := []string{"-p " + edPath} - shell := exec2.ExecTask{ - Command: "mkdir", - Args: cmd, - Shell: true, + edPath += (":" + mountSpec.MountPath + "/" + mountSpec.Name + ",") + return []string{edPath}, nil, nil } - - _, err := shell.Execute() - if err != nil { - log.G(Ctx).Error(err) - return nil, nil, err - } else { - log.G(Ctx).Debug("-- Created EmptyDir in " + edPath) - } - - edPath += (":" + mountSpec.MountPath + "/" + mountSpec.Name + ",") - return []string{edPath}, nil, nil } } } } } - return nil, nil, err + return nil, nil, nil } diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 200589eb..5d9b9f9f 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -296,14 +296,18 @@ func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, cfgmap, err := ClientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{}) if err != nil { log.G(ctx).Warning("Unable to find ConfigMap " + volume.ConfigMap.Name + " for pod " + pod.Name + ". Waiting for it to be initialized") + break + } else { + req.ConfigMaps = append(req.ConfigMaps, *cfgmap) } - req.ConfigMaps = append(req.ConfigMaps, *cfgmap) } else if volume.Secret != nil { scrt, err := ClientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{}) if err != nil { log.G(ctx).Warning("Unable to find Secret " + volume.Secret.SecretName + " for pod " + pod.Name + ". Waiting for it to be initialized") + break + } else { + req.Secrets = append(req.Secrets, *scrt) } - req.Secrets = append(req.Secrets, *scrt) } } From a7d76579148955dfb1533d6d098e2413fceeab80 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Mon, 8 Jan 2024 16:17:17 +0000 Subject: [PATCH 7/9] removed unused functions --- pkg/common/func.go | 43 ------------------ pkg/virtualkubelet/execute.go | 82 ----------------------------------- 2 files changed, 125 deletions(-) diff --git a/pkg/common/func.go b/pkg/common/func.go index 3f32ae65..ccf656c2 100644 --- a/pkg/common/func.go +++ b/pkg/common/func.go @@ -1,17 +1,13 @@ package common import ( - "bytes" "context" - "encoding/json" - "errors" "flag" "fmt" "io" "net/http" "os" "strconv" - "time" "k8s.io/client-go/kubernetes" @@ -164,42 +160,3 @@ func PingInterLink(ctx context.Context) (error, bool, int) { return nil, false, retVal } } - -func CreateClientsetFrom(ctx context.Context, body string) error { - counter := 0 - for { - var returnValue, _ = json.Marshal("Error") - reader := bytes.NewReader([]byte(body)) - req, err := http.NewRequest(http.MethodPost, InterLinkConfigInst.Interlinkurl+":"+InterLinkConfigInst.Interlinkport+"/setKubeCFG", reader) - - if err != nil { - log.G(ctx).Error(err) - } - - token, err := os.ReadFile(InterLinkConfigInst.VKTokenFile) // just pass the file name - if err != nil { - log.G(ctx).Error(err) - return err - } - req.Header.Add("Authorization", "Bearer "+string(token)) - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.G(ctx).Error(err) - counter++ - if counter > 5 { - return errors.New("Timeout occured trying to set a kubeconfig") - } - time.Sleep(5 * time.Second) - continue - } else { - returnValue, _ = io.ReadAll(resp.Body) - } - - if resp.StatusCode == http.StatusOK { - break - } else { - log.G(ctx).Error("Error " + err.Error() + " " + string(returnValue)) - } - } - return nil -} diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 5d9b9f9f..7b3629fa 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -11,7 +11,6 @@ import ( "strconv" "time" - exec "github.com/alexellis/go-execute/pkg/v1" commonIL "github.com/intertwin-eu/interlink/pkg/common" "github.com/containerd/containerd/log" @@ -23,87 +22,6 @@ import ( var ClientSet *kubernetes.Clientset -func NewServiceAccount() error { - - var sa string - var script string - path := commonIL.InterLinkConfigInst.DataRootFolder + ".kube/" - - err := os.MkdirAll(path, os.ModePerm) - if err != nil { - log.G(context.Background()).Error(err) - return err - } - f, err := os.Create(path + "getSAConfig.sh") - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - defer f.Close() - - script = "#!" + commonIL.InterLinkConfigInst.BashPath + "\n" + - "SERVICE_ACCOUNT_NAME=" + commonIL.InterLinkConfigInst.ServiceAccount + "\n" + - "CONTEXT=$(kubectl config current-context)\n" + - "NAMESPACE=" + commonIL.InterLinkConfigInst.Namespace + "\n" + - "NEW_CONTEXT=" + commonIL.InterLinkConfigInst.Namespace + "\n" + - "KUBECONFIG_FILE=\"" + path + "kubeconfig-sa\"\n" + - "SECRET_NAME=$(kubectl get secret -l kubernetes.io/service-account.name=${SERVICE_ACCOUNT_NAME} --namespace ${NAMESPACE} --context ${CONTEXT} -o jsonpath='{.items[0].metadata.name}')\n" + - "TOKEN_DATA=$(kubectl get secret ${SECRET_NAME} --context ${CONTEXT} --namespace ${NAMESPACE} -o jsonpath='{.data.token}')\n" + - "TOKEN=$(echo ${TOKEN_DATA} | base64 -d)\n" + - "kubectl config view --raw > ${KUBECONFIG_FILE}.full.tmp\n" + - "kubectl --kubeconfig ${KUBECONFIG_FILE}.full.tmp config use-context ${CONTEXT}\n" + - "kubectl --kubeconfig ${KUBECONFIG_FILE}.full.tmp config view --flatten --minify > ${KUBECONFIG_FILE}.tmp\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp rename-context ${CONTEXT} ${NEW_CONTEXT}\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-credentials ${CONTEXT}-${NAMESPACE}-token-user --token ${TOKEN}\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-context ${NEW_CONTEXT} --user ${CONTEXT}-${NAMESPACE}-token-user\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp set-context ${NEW_CONTEXT} --namespace ${NAMESPACE}\n" + - "kubectl config --kubeconfig ${KUBECONFIG_FILE}.tmp view --flatten --minify > ${KUBECONFIG_FILE}\n" + - "rm ${KUBECONFIG_FILE}.full.tmp\n" + - "rm ${KUBECONFIG_FILE}.tmp" - - _, err = f.Write([]byte(script)) - - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - //executing the script to actually retrieve a valid service account - cmd := []string{path + "getSAConfig.sh"} - shell := exec.ExecTask{ - Command: "sh", - Args: cmd, - Shell: true, - } - execResult, _ := shell.Execute() - if execResult.Stderr != "" { - log.G(context.Background()).Error("Stderr: " + execResult.Stderr + "\nStdout: " + execResult.Stdout) - return errors.New(execResult.Stderr) - } - - //checking if the config is valid - _, err = clientcmd.LoadFromFile(path + "kubeconfig-sa") - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - config, err := os.ReadFile(path + "kubeconfig-sa") - if err != nil { - log.G(context.Background()).Error(err) - return err - } - - sa = string(config) - os.Remove(path + "getSAConfig.sh") - os.Remove(path + "kubeconfig-sa") - - err = commonIL.CreateClientsetFrom(context.Background(), sa) - - return nil -} - func createRequest(pod commonIL.PodCreateRequests, token string) ([]byte, error) { var returnValue, _ = json.Marshal(commonIL.PodStatus{}) From fb8eb36df0d8669c84338427ad1df06cd88e4fa5 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Thu, 11 Jan 2024 08:17:22 +0000 Subject: [PATCH 8/9] quick patch for multiple get status --- pkg/sidecars/slurm/Delete.go | 2 +- pkg/sidecars/slurm/Status.go | 8 +++++--- pkg/sidecars/slurm/aux.go | 15 +++++++++------ pkg/virtualkubelet/execute.go | 4 +++- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/sidecars/slurm/Delete.go b/pkg/sidecars/slurm/Delete.go index bbbb957d..6f01020b 100644 --- a/pkg/sidecars/slurm/Delete.go +++ b/pkg/sidecars/slurm/Delete.go @@ -36,7 +36,7 @@ func StopHandler(w http.ResponseWriter, r *http.Request) { filesPath := commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID) - err = delete_container(string(pod.UID), filesPath) + err = delete_container(string(pod.UID), filesPath+"/"+pod.Namespace) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) diff --git a/pkg/sidecars/slurm/Status.go b/pkg/sidecars/slurm/Status.go index 6759fe0e..5c5507fc 100644 --- a/pkg/sidecars/slurm/Status.go +++ b/pkg/sidecars/slurm/Status.go @@ -74,12 +74,14 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { execReturn, _ := shell.Execute() timeNow = time.Now() + //log.G(Ctx).Info("Pod: " + jid.PodUID + " | JID: " + jid.JID) + if execReturn.Stderr != "" { - log.G(Ctx).Info("ERR: ", execReturn.Stderr) + log.G(Ctx).Error("ERR: ", execReturn.Stderr) containerStatuses := []v1.ContainerStatus{} for _, ct := range pod.Spec.Containers { - log.G(Ctx).Info("Getting exit status from " + commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/" + ct.Name + ".status") - file, err := os.Open(commonIL.InterLinkConfigInst.DataRootFolder + string(pod.UID) + "/" + ct.Name + ".status") + log.G(Ctx).Info("Getting exit status from " + path + "/" + ct.Name + ".status") + file, err := os.Open(path + "/" + ct.Name + ".status") if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) diff --git a/pkg/sidecars/slurm/aux.go b/pkg/sidecars/slurm/aux.go index f1ac58e8..69f2745a 100644 --- a/pkg/sidecars/slurm/aux.go +++ b/pkg/sidecars/slurm/aux.go @@ -389,7 +389,7 @@ func handle_jid(podUID string, output string, pod v1.Pod, path string) error { return nil } -func removeJID(jidToBeRemoved string) { +func removeJID(jidToBeRemoved string) error { for i, jid := range JIDs { if jid.JID == jidToBeRemoved { if len(JIDs) == 1 { @@ -400,10 +400,11 @@ func removeJID(jidToBeRemoved string) { JIDs = JIDs[:i] } else { JIDs = append(JIDs[:i-1], JIDs[i+1:]...) - return } + return nil } } + return errors.New("Unable to delete JID " + jidToBeRemoved + ". Maybe it already has been deleted?") } func delete_container(podUID string, path string) error { @@ -417,12 +418,14 @@ func delete_container(podUID string, path string) error { } else { log.G(Ctx).Info("- Deleted Job ", jid.JID) } - os.RemoveAll(path) - removeJID(jid.JID) - return nil + os.RemoveAll(path + "/" + podUID) + err = removeJID(jid.JID) + if err != nil { + log.G(Ctx).Warning(err) + } + return err } } - log.G(Ctx).Error("JID for deletion does not exist anymore") return nil } diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 7b3629fa..a4f0ae13 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -265,7 +265,9 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin var PodsList []*v1.Pod for _, pod := range p.pods { - PodsList = append(PodsList, pod) + if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { + PodsList = append(PodsList, pod) + } } //log.G(ctx).Debug(p.pods) //commented out because it's too verbose. uncomment to see all registered pods From bf8ac13cdb9bf91748e72852261985f54b2fb7dc Mon Sep 17 00:00:00 2001 From: Surax98 Date: Tue, 16 Jan 2024 08:22:51 +0000 Subject: [PATCH 9/9] interlink now queries sidecars only for running/pending pods --- pkg/interlink/delete.go | 1 + pkg/interlink/func.go | 36 +++++++++++++++++++++++++ pkg/interlink/status.go | 49 ++++++++++++++++++++++++++++++++++- pkg/sidecars/slurm/Delete.go | 1 + pkg/sidecars/slurm/Status.go | 5 ++++ pkg/virtualkubelet/execute.go | 4 +-- 6 files changed, 92 insertions(+), 4 deletions(-) diff --git a/pkg/interlink/delete.go b/pkg/interlink/delete.go index 76f6b347..9a9a47c7 100644 --- a/pkg/interlink/delete.go +++ b/pkg/interlink/delete.go @@ -34,6 +34,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { log.G(Ctx).Fatal(err) } + deleteCachedStatus(string(pod.UID)) req, err = http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/delete", reader) log.G(Ctx).Info("InterLink: forwarding Delete call to sidecar") diff --git a/pkg/interlink/func.go b/pkg/interlink/func.go index cb637aaa..2ef1dc5b 100644 --- a/pkg/interlink/func.go +++ b/pkg/interlink/func.go @@ -8,6 +8,8 @@ import ( v1 "k8s.io/api/core/v1" ) +var PodStatuses []commonIL.PodStatus + func getData(pod commonIL.PodCreateRequests) (commonIL.RetrievedPodData, error) { log.G(Ctx).Debug(pod.ConfigMaps) var retrieved_data commonIL.RetrievedPodData @@ -66,3 +68,37 @@ func retrieve_data(container v1.Container, pod commonIL.PodCreateRequests) (comm } return retrieved_data, nil } + +func updateStatuses(statuses []commonIL.PodStatus) { + for _, podStatus := range statuses { + updated := false + for i, podStatus2 := range PodStatuses { + if podStatus.PodUID == podStatus2.PodUID { + PodStatuses[i] = podStatus + updated = true + break + } + } + if !updated { + PodStatuses = append(PodStatuses, podStatus) + } + } +} + +func deleteCachedStatus(uid string) { + for i, status := range PodStatuses { + if status.PodUID == uid { + PodStatuses = append(PodStatuses[:i], PodStatuses[i+1:]...) + return + } + } +} + +func checkIfCached(uid string) bool { + for _, podStatus := range PodStatuses { + if podStatus.PodUID == uid { + return true + } + } + return false +} diff --git a/pkg/interlink/status.go b/pkg/interlink/status.go index 7a5c7f6c..91d47dd0 100644 --- a/pkg/interlink/status.go +++ b/pkg/interlink/status.go @@ -2,22 +2,46 @@ package interlink import ( "bytes" + "encoding/json" "io" "net/http" "strconv" "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/common" + v1 "k8s.io/api/core/v1" ) func StatusHandler(w http.ResponseWriter, r *http.Request) { statusCode := http.StatusOK + var pods []*v1.Pod log.G(Ctx).Info("InterLink: received GetStatus call") + bodyBytes, err := io.ReadAll(r.Body) if err != nil { log.G(Ctx).Fatal(err) } + err = json.Unmarshal(bodyBytes, &pods) + if err != nil { + log.G(Ctx).Error(err) + } + + var podsToBeChecked []*v1.Pod + var returnedStatuses []commonIL.PodStatus + + for _, pod := range pods { + cached := checkIfCached(string(pod.UID)) + if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodPending || !cached { + podsToBeChecked = append(podsToBeChecked, pod) + } + } + + bodyBytes, err = json.Marshal(podsToBeChecked) + if err != nil { + log.G(Ctx).Fatal(err) + } + reader := bytes.NewReader(bodyBytes) req, err := http.NewRequest(http.MethodGet, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/status", reader) if err != nil { @@ -38,7 +62,30 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { statusCode = http.StatusInternalServerError } - returnValue, _ := io.ReadAll(resp.Body) + bodyBytes, err = io.ReadAll(resp.Body) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(Ctx).Error(err) + return + } + err = json.Unmarshal(bodyBytes, &returnedStatuses) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(Ctx).Error(err) + return + } + + updateStatuses(returnedStatuses) + + returnValue, err := json.Marshal(PodStatuses) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(Ctx).Error(err) + return + } log.G(Ctx).Debug("InterLink: status " + string(returnValue)) w.WriteHeader(statusCode) diff --git a/pkg/sidecars/slurm/Delete.go b/pkg/sidecars/slurm/Delete.go index 6f01020b..b3df923f 100644 --- a/pkg/sidecars/slurm/Delete.go +++ b/pkg/sidecars/slurm/Delete.go @@ -52,6 +52,7 @@ func StopHandler(w http.ResponseWriter, r *http.Request) { if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred deleting containers. Check Slurm Sidecar's logs")) } else { + w.Write([]byte("All containers for submitted Pods have been deleted")) } } diff --git a/pkg/sidecars/slurm/Status.go b/pkg/sidecars/slurm/Status.go index 5c5507fc..9f888442 100644 --- a/pkg/sidecars/slurm/Status.go +++ b/pkg/sidecars/slurm/Status.go @@ -62,6 +62,11 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { } for _, pod := range req { + if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { + + } else { + + } path := commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID) for i, jid := range JIDs { if jid.PodUID == string(pod.UID) { diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index a4f0ae13..7b3629fa 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -265,9 +265,7 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin var PodsList []*v1.Pod for _, pod := range p.pods { - if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { - PodsList = append(PodsList, pod) - } + PodsList = append(PodsList, pod) } //log.G(ctx).Debug(p.pods) //commented out because it's too verbose. uncomment to see all registered pods