Skip to content

Commit

Permalink
incorporate feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
miampf committed Sep 27, 2024
1 parent 19a6851 commit 4b7a266
Showing 1 changed file with 3 additions and 70 deletions.
73 changes: 3 additions & 70 deletions e2e/internal/kubeclient/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (c *Kubeclient) checkIfRunning(ctx context.Context, name string, namespace
if err != nil {
return false, err
}
toBeRunningPods := len(pods)
for _, pod := range pods {
// check if all containers in the pod are running
var containers []corev1.ContainerStatus
Expand All @@ -226,21 +225,14 @@ func (c *Kubeclient) checkIfRunning(ctx context.Context, name string, namespace
} else {
containers = pod.Status.ContainerStatuses
}
toBeRunningContainers := len(containers)

for _, container := range containers {
if container.State.Running != nil {
toBeRunningContainers--
if container.State.Running == nil {
return false, nil
}
}
if toBeRunningContainers == 0 {
toBeRunningPods--
}
}
if toBeRunningPods == 0 {
return true, nil
}
return false, nil
return true, nil
}

// WaitFor watches the given resource kind and blocks until the desired number of pods are
Expand Down Expand Up @@ -408,65 +400,6 @@ loop:
}
}

// WaitForLoadBalancer waits until the given service is configured with an external IP and returns it.
func (c *Kubeclient) WaitForLoadBalancer(ctx context.Context, namespace, name string) (string, error) {
watcher, err := c.Client.CoreV1().Services(namespace).Watch(ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + name})
if err != nil {
return "", err
}
var ip string
var port int
loop:
for {
evt, ok := <-watcher.ResultChan()
if !ok {
if ctx.Err() == nil {
return "", fmt.Errorf("watcher for LoadBalancer %s/%s unexpectedly closed", namespace, name)
}
return "", fmt.Errorf("LoadBalancer %s/%s did not get a public IP before %w", namespace, name, ctx.Err())
}
switch evt.Type {
case watch.Added:
fallthrough
case watch.Modified:
svc, ok := evt.Object.(*corev1.Service)
if !ok {
return "", fmt.Errorf("watcher received unexpected type %T", evt.Object)
}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP != "" {
ip = ingress.IP
// TODO(burgerdev): deal with more than one port, and protocols other than TCP
port = int(svc.Spec.Ports[0].Port)
break loop
}
}
case watch.Deleted:
return "", fmt.Errorf("service %s/%s was deleted while waiting for it", namespace, name)
default:
c.log.Warn("ignoring unexpected watch event", "type", evt.Type, "object", evt.Object)
}
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

dialer := &net.Dialer{}
for {
select {
case <-ticker.C:
conn, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort(ip, strconv.Itoa(port)))
if err == nil {
conn.Close()
return ip, nil
}
c.log.Info("probe failed", "namespace", namespace, "name", name, "error", err)
case <-ctx.Done():
return "", fmt.Errorf("LoadBalancer %s/%s never responded to probing before %w", namespace, name, ctx.Err())
}
}
}

func (c *Kubeclient) toJSON(a any) string {
s, err := json.Marshal(a)
if err != nil {
Expand Down

0 comments on commit 4b7a266

Please sign in to comment.