Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e: retry failed port-forward #718

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 34 additions & 62 deletions e2e/internal/contrasttest/contrasttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/edgelesssys/contrast/internal/kuberesource"
"github.com/edgelesssys/contrast/node-installer/platforms"
ksync "github.com/katexochen/sync/api/client"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -180,60 +181,22 @@ func (ct *ContrastTest) Apply(t *testing.T) {
func (ct *ContrastTest) Set(t *testing.T) {
require := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

require.NoError(ct.Kubeclient.WaitFor(ctx, kubeclient.StatefulSet{}, ct.Namespace, "coordinator"))

coordinator, cancelPortForward, err := ct.Kubeclient.PortForwardPod(ctx, ct.Namespace, "port-forwarder-coordinator", "1313")
require.NoError(err)
defer cancelPortForward()

args := append(ct.commonArgs(),
"--coordinator-policy-hash", ct.coordinatorPolicyHash,
"--coordinator", coordinator,
path.Join(ct.WorkDir, "resources.yaml"))

set := cmd.NewSetCmd()
set.Flags().String("workspace-dir", "", "") // Make set aware of root flags
set.SetArgs(args)
set.SetOut(io.Discard)
errBuf := &bytes.Buffer{}
set.SetErr(errBuf)

require.NoError(set.Execute(), "could not set manifest at coordinator: %s", errBuf)
require.NoError(ct.runAgainstCoordinator(ctx, cmd.NewSetCmd(), path.Join(ct.WorkDir, "resources.yaml")))
}

// RunVerify runs the contrast verify subcommand.
func (ct *ContrastTest) RunVerify() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

if err := ct.Kubeclient.WaitFor(ctx, kubeclient.StatefulSet{}, ct.Namespace, "coordinator"); err != nil {
return fmt.Errorf("waiting for coordinator: %w", err)
}

coordinator, cancelPortForward, err := ct.Kubeclient.PortForwardPod(ctx, ct.Namespace, "port-forwarder-coordinator", "1313")
if err != nil {
if err := ct.runAgainstCoordinator(ctx, cmd.NewVerifyCmd()); err != nil {
return err
}
defer cancelPortForward()

verify := cmd.NewVerifyCmd()
verify.Flags().String("workspace-dir", "", "") // Make verify aware of root flags
verify.SetArgs(append(
ct.commonArgs(),
"--coordinator-policy-hash", ct.coordinatorPolicyHash,
"--coordinator", coordinator,
))
verify.SetOut(io.Discard)
errBuf := &bytes.Buffer{}
verify.SetErr(errBuf)

if err := verify.Execute(); err != nil {
return fmt.Errorf("running verify failed: %w\n%s", err, errBuf)
}

var err error
ct.meshCACertPEM, err = os.ReadFile(path.Join(ct.WorkDir, "mesh-ca.pem"))
if err != nil {
return fmt.Errorf("no mesh ca cert: %w", err)
Expand All @@ -254,27 +217,10 @@ func (ct *ContrastTest) Verify(t *testing.T) {
func (ct *ContrastTest) Recover(t *testing.T) {
require := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

require.NoError(ct.Kubeclient.WaitFor(ctx, kubeclient.StatefulSet{}, ct.Namespace, "coordinator"))

coordinator, cancelPortForward, err := ct.Kubeclient.PortForwardPod(ctx, ct.Namespace, "port-forwarder-coordinator", "1313")
require.NoError(err)
defer cancelPortForward()

args := append(ct.commonArgs(),
"--coordinator-policy-hash", ct.coordinatorPolicyHash,
"--coordinator", coordinator)

set := cmd.NewRecoverCmd()
set.Flags().String("workspace-dir", "", "") // Make set aware of root flags
set.SetArgs(args)
set.SetOut(io.Discard)
errBuf := &bytes.Buffer{}
set.SetErr(errBuf)

require.NoError(set.Execute(), "could not recover coordinator: %s", errBuf)
require.NoError(ct.runAgainstCoordinator(ctx, cmd.NewRecoverCmd()))
}

// MeshCACert returns a CertPool that contains the coordinator mesh CA cert.
Expand Down Expand Up @@ -317,6 +263,32 @@ func (ct *ContrastTest) installRuntime(t *testing.T) {
require.NoError(ct.Kubeclient.WaitFor(ctx, kubeclient.DaemonSet{}, ct.Namespace, "contrast-node-installer"))
}

// runAgainstCoordinator forwards the coordinator port and executes the command against it.
func (ct *ContrastTest) runAgainstCoordinator(ctx context.Context, cmd *cobra.Command, args ...string) error {
if err := ct.Kubeclient.WaitFor(ctx, kubeclient.StatefulSet{}, ct.Namespace, "coordinator"); err != nil {
return fmt.Errorf("waiting for coordinator: %w", err)
}

// Make the subcommand aware of the persistent flag.
// Do it outside the closure because declaring a flag twice panics.
cmd.Flags().String("workspace-dir", "", "")

return ct.Kubeclient.WithForwardedPort(ctx, ct.Namespace, "port-forwarder-coordinator", "1313", func(addr string) error {
commonArgs := append(ct.commonArgs(),
"--coordinator-policy-hash", ct.coordinatorPolicyHash,
"--coordinator", addr)
cmd.SetArgs(append(commonArgs, args...))
cmd.SetOut(io.Discard)
errBuf := &bytes.Buffer{}
cmd.SetErr(errBuf)

if err := cmd.Execute(); err != nil {
return fmt.Errorf("running %q: %s", cmd.Use, errBuf)
}
return nil
})
}

func makeNamespace(t *testing.T) string {
buf := make([]byte, 4)
re := regexp.MustCompile("[a-z0-9-]+")
Expand Down
57 changes: 46 additions & 11 deletions e2e/internal/kubeclient/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,56 @@ package kubeclient

import (
"context"
"errors"
"fmt"
"io"
"net/http"

"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

// PortForwardPod starts a port forward to the selected pod.
// WithForwardedPort opens a local port, forwards it to the given pod and invokes the func with the local address.
//
// On success, the function returns a TCP address that clients can connect to and a function to
// cancel the port forwarding.
func (k *Kubeclient) PortForwardPod(ctx context.Context, namespace, podName, remotePort string) (string, func(), error) {
// If the func fails and port-forwarding had an error, too, the func is retried up to two times.
func (k *Kubeclient) WithForwardedPort(ctx context.Context, namespace, podName, remotePort string, f func(addr string) error) error {
var funcErr error
for i := range 3 {
log := k.log.With("attempt", i, "namespace", namespace, "pod", podName, "port", remotePort)

addr, cancel, errorCh, err := k.portForwardPod(ctx, namespace, podName, remotePort)
if err != nil {
log.Error("Could not forward port", "error", err)
funcErr = err
continue
}
log.Info("forwarded port", "addr", addr)
funcErr = f(addr)
cancel()
if funcErr == nil {
return nil
}
log.Error("port-forwarded func failed", "error", err)
select {
case err := <-errorCh:
log.Error("Encountered port forwarding error", "error", err)
continue
default:
if errors.Is(funcErr, io.EOF) {
log.Info("io.EOF during port-forwarding triggered retry")
continue
}
log.Info("no port-forwarding error")
return funcErr
}
}
return funcErr
}

func (k *Kubeclient) portForwardPod(ctx context.Context, namespace, podName, remotePort string) (string, func(), <-chan error, error) {
// We can only forward to the pod once it's ready.
if err := k.WaitForPod(ctx, namespace, podName); err != nil {
return "", nil, fmt.Errorf("waiting for pod %s: %w", podName, err)
return "", nil, nil, fmt.Errorf("waiting for pod %s: %w", podName, err)
}

// This channel sends a stop request to the portforwarding goroutine.
Expand All @@ -38,7 +73,7 @@ func (k *Kubeclient) PortForwardPod(ctx context.Context, namespace, podName, rem

transport, upgrader, err := spdy.RoundTripperFor(k.config)
if err != nil {
return "", nil, fmt.Errorf("creating round tripper: %w", err)
return "", nil, nil, fmt.Errorf("creating round tripper: %w", err)
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, req.URL())
Expand All @@ -51,7 +86,7 @@ func (k *Kubeclient) PortForwardPod(ctx context.Context, namespace, podName, rem
nil, nil,
)
if err != nil {
return "", nil, fmt.Errorf("creating portforwarder: %w", err)
return "", nil, nil, fmt.Errorf("creating portforwarder: %w", err)
}

go func() {
Expand All @@ -65,18 +100,18 @@ func (k *Kubeclient) PortForwardPod(ctx context.Context, namespace, podName, rem
ports, err := fw.GetPorts()
if err != nil {
close(stopCh)
return "", nil, fmt.Errorf("getting ports: %w", err)
return "", nil, nil, fmt.Errorf("getting ports: %w", err)
}
cleanUp := func() {
close(stopCh)
}
return fmt.Sprintf("localhost:%d", ports[0].Local), cleanUp, nil
return fmt.Sprintf("localhost:%d", ports[0].Local), cleanUp, errorCh, nil

case <-ctx.Done():
close(stopCh)
return "", nil, fmt.Errorf("waiting for port forward to be ready: %w", ctx.Err())
return "", nil, nil, fmt.Errorf("waiting for port forward to be ready: %w", ctx.Err())
case err := <-errorCh:
close(stopCh)
return "", nil, fmt.Errorf("background port-forwarding routine failed: %w", err)
return "", nil, nil, fmt.Errorf("background port-forwarding routine failed: %w", err)
}
}
16 changes: 8 additions & 8 deletions e2e/openssl/openssl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ func TestOpenSSL(t *testing.T) {

require.NoError(ct.Kubeclient.WaitFor(ctx, kubeclient.Deployment{}, ct.Namespace, opensslFrontend))

addr, cancelPortForward, err := ct.Kubeclient.PortForwardPod(ctx, ct.Namespace, "port-forwarder-openssl-frontend", "443")
require.NoError(err)
defer cancelPortForward()

dialer := &tls.Dialer{Config: &tls.Config{RootCAs: pool}}
conn, err := dialer.DialContext(ctx, "tcp", addr)
require.NoError(err)
conn.Close()
require.NoError(ct.Kubeclient.WithForwardedPort(ctx, ct.Namespace, "port-forwarder-openssl-frontend", "443", func(addr string) error {
dialer := &tls.Dialer{Config: &tls.Config{RootCAs: pool}}
conn, err := dialer.DialContext(ctx, "tcp", addr)
if err == nil {
conn.Close()
}
return err
}))
})
}

Expand Down
28 changes: 16 additions & 12 deletions e2e/servicemesh/servicemesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/edgelesssys/contrast/e2e/internal/contrasttest"
"github.com/edgelesssys/contrast/e2e/internal/kubeclient"
"github.com/edgelesssys/contrast/internal/kuberesource"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -71,18 +72,21 @@ func TestIngressEgress(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

web, cancelPortForward, err := ct.Kubeclient.PortForwardPod(ctx, ct.Namespace, "port-forwarder-web-svc", "443")
require.NoError(err)
t.Cleanup(cancelPortForward)

tlsConf := &tls.Config{RootCAs: pool}
hc := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConf}}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s/", web), http.NoBody)
require.NoError(err)
resp, err := hc.Do(req)
require.NoError(err)
defer resp.Body.Close()
require.Equal(http.StatusOK, resp.StatusCode)
require.NoError(ct.Kubeclient.WithForwardedPort(ctx, ct.Namespace, "port-forwarder-web-svc", "443", func(addr string) error {
tlsConf := &tls.Config{RootCAs: pool}
hc := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConf}}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s/", addr), http.NoBody)
if !assert.NoError(t, err) {
return nil
}
resp, err := hc.Do(req)
if err != nil {
return err
}
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
return nil
}))
})
}

Expand Down
6 changes: 6 additions & 0 deletions internal/kuberesource/parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ func (p *PortForwarderConfig) WithListenPort(port int32) *PortForwarderConfig {
).
WithEnv(
NewEnvVar("LISTEN_PORT", strconv.Itoa(int(port))),
).
WithStartupProbe(Probe().
WithInitialDelaySeconds(1).
WithPeriodSeconds(1).
WithTCPSocket(TCPSocketAction().
WithPort(intstr.FromInt32(port))),
)
return p
}
Expand Down