diff --git a/pkg/clients/exec_command.go b/pkg/clients/exec_command.go index 8532cc13..788df89c 100644 --- a/pkg/clients/exec_command.go +++ b/pkg/clients/exec_command.go @@ -5,16 +5,25 @@ package clients import ( "bytes" "context" + "errors" "fmt" "strings" + "time" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubectl/pkg/scheme" ) +const ( + startTimeout = 5 * time.Second + deletionTimeout = 10 * time.Minute +) + type ExecContext interface { ExecCommand([]string) (string, string, error) ExecCommandStdIn([]string, bytes.Buffer) (string, string, error) @@ -155,3 +164,210 @@ func (c *ContainerExecContext) ExecCommand(command []string) (stdout, stderr str func (c *ContainerExecContext) ExecCommandStdIn(command []string, buffIn bytes.Buffer) (stdout, stderr string, err error) { return c.execCommand(command, &buffIn) } + +// ContainerExecContext encapsulates the context in which a command is run; the namespace, pod, and container. +type ContainerCreationExecContext struct { + *ContainerExecContext + labels map[string]string + pod *corev1.Pod + containerSecurityContext *corev1.SecurityContext + containerImage string + command []string + volumes []*Volume + hostNetwork bool +} + +type Volume struct { + VolumeSource corev1.VolumeSource + Name string + MountPath string +} + +func (c *ContainerCreationExecContext) CreatePod() error { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.podName, + Namespace: c.namespace, + Labels: c.labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: c.containerName, + Image: c.containerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + HostNetwork: c.hostNetwork, + }, + } + if len(c.command) > 0 { + pod.Spec.Containers[0].Command = c.command + } + if c.containerSecurityContext != nil { + pod.Spec.Containers[0].SecurityContext = c.containerSecurityContext + } + if len(c.volumes) > 0 { + volumes := make([]corev1.Volume, 0) + volumeMounts := make([]corev1.VolumeMount, 0) + + for _, v := range c.volumes { + volumes = append(volumes, corev1.Volume{Name: v.Name, VolumeSource: v.VolumeSource}) + pod.Spec.Volumes = volumes + volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: v.Name, MountPath: v.MountPath}) + pod.Spec.Containers[0].VolumeMounts = volumeMounts + } + } + + pod, err := c.clientset.K8sClient.CoreV1().Pods(pod.Namespace).Create( + context.TODO(), + pod, + metav1.CreateOptions{}, + ) + c.pod = pod + if err != nil { + return fmt.Errorf("failed to create pod: %w", err) + } + return nil +} + +func (c *ContainerCreationExecContext) listPods(options *metav1.ListOptions) (*corev1.PodList, error) { + pods, err := c.clientset.K8sClient.CoreV1().Pods(c.pod.Namespace).List( + context.TODO(), + *options, + ) + if err != nil { + return pods, fmt.Errorf("failed to find pods: %s", err.Error()) + } + return pods, nil +} + +func (c *ContainerCreationExecContext) refeshPod() error { + pods, err := c.listPods(&metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", c.podName).String(), + ResourceVersion: c.pod.ResourceVersion, + }) + if err != nil { + return err + } + if len(pods.Items) == 0 || len(pods.Items) > 1 { + // I don't think k8s allows more than one pod with the same name + return errors.New("found multiple pods with the same name") + } + c.pod = &pods.Items[0] + + return nil +} + +func (c *ContainerCreationExecContext) IsPodRunning() (bool, error) { + err := c.refeshPod() + if err != nil { + return false, err + } + if c.pod.Status.Phase == corev1.PodRunning { + return true, nil + } + return false, nil +} + +func (c *ContainerCreationExecContext) WaitForPodToStart() error { + start := time.Now() + for time.Since(start) <= startTimeout { + running, err := c.IsPodRunning() + if err != nil { + return err + } + if running { + return nil + } + } + return errors.New("timed out waiting for pod to start") +} + +func (c *ContainerCreationExecContext) CreatePodAndWaitForStart() error { + var err error + running := false + if c.pod != nil { + running, err = c.IsPodRunning() + if err != nil { + return err + } + } + if !running { + err := c.CreatePod() + if err != nil { + return err + } + } + return c.WaitForPodToStart() +} + +func (c *ContainerCreationExecContext) DeletePod() error { + deletePolicy := metav1.DeletePropagationForeground + err := c.clientset.K8sClient.CoreV1().Pods(c.pod.Namespace).Delete( + context.TODO(), + c.pod.Name, + metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + }) + if err != nil { + return fmt.Errorf("failed to delete pod: %w", err) + } + return nil +} + +func (c *ContainerCreationExecContext) WaitForPodToDelete() error { + start := time.Now() + for time.Since(start) <= deletionTimeout { + pods, err := c.listPods(&metav1.ListOptions{}) + if err != nil { + return err + } + found := false + for _, pod := range pods.Items { //nolint:gocritic // This isn't my object I can't use a pointer + if pod.Name == c.podName { + found = true + } + } + if !found { + return nil + } + } + return errors.New("pod has not terminated within the timeout") +} + +func (c *ContainerCreationExecContext) DeletePodAndWait() error { + err := c.DeletePod() + if err != nil { + return err + } + return c.WaitForPodToDelete() +} + +func NewContainerCreationExecContext( + clientset *Clientset, + namespace, podName, containerName, containerImage string, + labels map[string]string, + command []string, + containerSecurityContext *corev1.SecurityContext, + hostNetwork bool, + volumes []*Volume, +) *ContainerCreationExecContext { + ctx := ContainerExecContext{ + namespace: namespace, + podNamePrefix: podName, + podName: podName, + containerName: containerName, + clientset: clientset, + } + + return &ContainerCreationExecContext{ + ContainerExecContext: &ctx, + containerImage: containerImage, + labels: labels, + command: command, + containerSecurityContext: containerSecurityContext, + hostNetwork: hostNetwork, + volumes: volumes, + } +}