Skip to content

Commit

Permalink
e2e: add port-forwarding utility
Browse files Browse the repository at this point in the history
  • Loading branch information
burgerdev committed Feb 29, 2024
1 parent f51edc1 commit f8eb399
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions e2e/internal/kubeclient/portforward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kubeclient

import (
"context"
"fmt"
"net/http"

"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

// PortForwardPod starts a port forward to the selected pod.
//
// On success, the function returns a TCP address that clients can connect to and a function to
// cancel the port forwarding.
func (k *Kubeclient) PortForwardPod(ctx context.Context, namespace, podName, remotePort string) (string, func(), error) {
// This channel sends a stop request to the portforwarding goroutine.
stopCh := make(chan struct{}, 1)
// The portforwarding goroutine closes this channel when it's ready.
readyCh := make(chan struct{})
// Any error returned by the background port-forwarder is sent to this channel.
errorCh := make(chan error)

// Ports are forwarded by upgrading this POST request to a SPDY connection.
req := k.client.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward")

transport, upgrader, err := spdy.RoundTripperFor(k.config)
if err != nil {
return "", nil, fmt.Errorf("creating round tripper: %w", err)
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL())

fw, err := portforward.NewOnAddresses(
dialer,
[]string{"localhost"},
[]string{fmt.Sprintf("0:%s", remotePort)},
stopCh, readyCh,
nil, nil,
)
if err != nil {
return "", nil, fmt.Errorf("creating portforwarder: %w", err)
}

go func() {
if err := fw.ForwardPorts(); err != nil {
errorCh <- err
}
}()

select {
case <-readyCh:
ports, err := fw.GetPorts()
if err != nil {
close(stopCh)
return "", nil, fmt.Errorf("getting ports: %w", err)
}
cleanUp := func() {
close(stopCh)
}
return fmt.Sprintf("localhost:%d", ports[0].Local), cleanUp, nil

case <-ctx.Done():
close(stopCh)
return "", nil, fmt.Errorf("waiting for port forward to be ready: %w", ctx.Err())
case err := <-errorCh:
close(stopCh)
return "", nil, fmt.Errorf("background port-forwarding routine failed: %w", err)
}
}

0 comments on commit f8eb399

Please sign in to comment.