Skip to content

Commit

Permalink
Create exec context which starts and stops a pod
Browse files Browse the repository at this point in the history
  • Loading branch information
nocturnalastro committed Nov 24, 2023
1 parent c6843c9 commit b5f6519
Showing 1 changed file with 216 additions and 0 deletions.
216 changes: 216 additions & 0 deletions pkg/clients/exec_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@ package clients
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
)

const (
startTimeout = 5 * time.Second
deletionTimeout = 10 * time.Minute
)

type ExecContext interface {
ExecCommand([]string) (string, string, error)
ExecCommandStdIn([]string, bytes.Buffer) (string, string, error)
Expand Down Expand Up @@ -155,3 +164,210 @@ func (c *ContainerExecContext) ExecCommand(command []string) (stdout, stderr str
func (c *ContainerExecContext) ExecCommandStdIn(command []string, buffIn bytes.Buffer) (stdout, stderr string, err error) {
return c.execCommand(command, &buffIn)
}

// ContainerExecContext encapsulates the context in which a command is run; the namespace, pod, and container.
type ContainerCreationExecContext struct {
*ContainerExecContext
labels map[string]string
pod *corev1.Pod
containerSecurityContext *corev1.SecurityContext
containerImage string
command []string
volumes []*Volume
hostNetwork bool
}

type Volume struct {
VolumeSource corev1.VolumeSource
Name string
MountPath string
}

func (c *ContainerCreationExecContext) CreatePod() error {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: c.podName,
Namespace: c.namespace,
Labels: c.labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: c.containerName,
Image: c.containerImage,
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
HostNetwork: c.hostNetwork,
},
}
if len(c.command) > 0 {
pod.Spec.Containers[0].Command = c.command
}
if c.containerSecurityContext != nil {
pod.Spec.Containers[0].SecurityContext = c.containerSecurityContext
}
if len(c.volumes) > 0 {
volumes := make([]corev1.Volume, 0)
volumeMounts := make([]corev1.VolumeMount, 0)

for _, v := range c.volumes {
volumes = append(volumes, corev1.Volume{Name: v.Name, VolumeSource: v.VolumeSource})
pod.Spec.Volumes = volumes
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: v.Name, MountPath: v.MountPath})
pod.Spec.Containers[0].VolumeMounts = volumeMounts
}
}

pod, err := c.clientset.K8sClient.CoreV1().Pods(pod.Namespace).Create(
context.TODO(),
pod,
metav1.CreateOptions{},
)
c.pod = pod
if err != nil {
return fmt.Errorf("failed to create pod: %w", err)
}
return nil
}

func (c *ContainerCreationExecContext) listPods(options *metav1.ListOptions) (*corev1.PodList, error) {
pods, err := c.clientset.K8sClient.CoreV1().Pods(c.pod.Namespace).List(
context.TODO(),
*options,
)
if err != nil {
return pods, fmt.Errorf("failed to find pods: %s", err.Error())
}
return pods, nil
}

func (c *ContainerCreationExecContext) refeshPod() error {
pods, err := c.listPods(&metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", c.podName).String(),
ResourceVersion: c.pod.ResourceVersion,
})
if err != nil {
return err
}
if len(pods.Items) == 0 || len(pods.Items) > 1 {
// I don't think k8s allows more than one pod with the same name
return errors.New("found multiple pods with the same name")
}
c.pod = &pods.Items[0]

return nil
}

func (c *ContainerCreationExecContext) IsPodRunning() (bool, error) {
err := c.refeshPod()
if err != nil {
return false, err
}
if c.pod.Status.Phase == corev1.PodRunning {
return true, nil
}
return false, nil
}

func (c *ContainerCreationExecContext) WaitForPodToStart() error {
start := time.Now()
for time.Since(start) <= startTimeout {
running, err := c.IsPodRunning()
if err != nil {
return err
}
if running {
return nil
}
}
return errors.New("timed out waiting for pod to start")
}

func (c *ContainerCreationExecContext) CreatePodAndWaitForStart() error {
var err error
running := false
if c.pod != nil {
running, err = c.IsPodRunning()
if err != nil {
return err
}
}
if !running {
err := c.CreatePod()
if err != nil {
return err
}
}
return c.WaitForPodToStart()
}

func (c *ContainerCreationExecContext) DeletePod() error {
deletePolicy := metav1.DeletePropagationForeground
err := c.clientset.K8sClient.CoreV1().Pods(c.pod.Namespace).Delete(
context.TODO(),
c.pod.Name,
metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
return fmt.Errorf("failed to delete pod: %w", err)
}
return nil
}

func (c *ContainerCreationExecContext) WaitForPodToDelete() error {
start := time.Now()
for time.Since(start) <= deletionTimeout {
pods, err := c.listPods(&metav1.ListOptions{})
if err != nil {
return err
}
found := false
for _, pod := range pods.Items { //nolint:gocritic // This isn't my object I can't use a pointer
if pod.Name == c.podName {
found = true
}
}
if !found {
return nil
}
}
return errors.New("pod has not terminated within the timeout")
}

func (c *ContainerCreationExecContext) DeletePodAndWait() error {
err := c.DeletePod()
if err != nil {
return err
}
return c.WaitForPodToDelete()
}

func NewContainerCreationExecContext(
clientset *Clientset,
namespace, podName, containerName, containerImage string,
labels map[string]string,
command []string,
containerSecurityContext *corev1.SecurityContext,
hostNetwork bool,
volumes []*Volume,
) *ContainerCreationExecContext {
ctx := ContainerExecContext{
namespace: namespace,
podNamePrefix: podName,
podName: podName,
containerName: containerName,
clientset: clientset,
}

return &ContainerCreationExecContext{
ContainerExecContext: &ctx,
containerImage: containerImage,
labels: labels,
command: command,
containerSecurityContext: containerSecurityContext,
hostNetwork: hostNetwork,
volumes: volumes,
}
}

0 comments on commit b5f6519

Please sign in to comment.