From 89464eb10ab385685a7d9cf651fd9e6feab4d584 Mon Sep 17 00:00:00 2001 From: lostbean Date: Tue, 24 Sep 2024 11:35:08 -0300 Subject: [PATCH] update gateway to proxy to new gateway and igress access entries --- kardinal-cli/cmd/root.go | 10 +- kardinal-cli/deployment/deployment.go | 12 +- kardinal-cli/deployment/gateway.go | 177 +++++++++--------- .../deployment/kubernetes_client_factory.go | 19 +- 4 files changed, 113 insertions(+), 105 deletions(-) diff --git a/kardinal-cli/cmd/root.go b/kardinal-cli/cmd/root.go index ff6e82de..8f3103a8 100644 --- a/kardinal-cli/cmd/root.go +++ b/kardinal-cli/cmd/root.go @@ -395,19 +395,13 @@ var gatewayCmd = &cobra.Command{ log.Fatalf("List flow response is empty") } - var host string - - hostFlowIdMap := make(map[string]string) + hostFlowIdMap := make([]api_types.IngressAccessEntry, 0) for _, flow := range *resp.JSON200 { if slices.Contains(flowIds, flow.FlowId) { flowId := flow.FlowId if len(flow.AccessEntry) > 0 { - host = flow.AccessEntry[0].Hostname - if host == "" { - log.Fatalf("Couldn't find flow with id '%s'", flowId) - } - hostFlowIdMap[host] = flowId + hostFlowIdMap = append(hostFlowIdMap, flow.AccessEntry...) } else { log.Fatalf("Flow '%s' has no hosts", flowId) } diff --git a/kardinal-cli/deployment/deployment.go b/kardinal-cli/deployment/deployment.go index 5c9ee03a..33aa0403 100644 --- a/kardinal-cli/deployment/deployment.go +++ b/kardinal-cli/deployment/deployment.go @@ -193,7 +193,11 @@ type templateData struct { } func DeployKardinalManagerInCluster(ctx context.Context, clusterResourcesURL string, kontrolLocation string) error { - kubernetesClientObj, err := createKubernetesClient() + k8sConfig, err := getConfig() + if err != nil { + return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client") + } + kubernetesClientObj, err := createKubernetesClient(k8sConfig) if err != nil { return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client") } @@ -237,7 +241,11 @@ func DeployKardinalManagerInCluster(ctx context.Context, clusterResourcesURL str } func RemoveKardinalManagerFromCluster(ctx context.Context) error { - kubernetesClientObj, err := createKubernetesClient() + k8sConfig, err := getConfig() + if err != nil { + return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client") + } + kubernetesClientObj, err := createKubernetesClient(k8sConfig) if err != nil { return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client") } diff --git a/kardinal-cli/deployment/gateway.go b/kardinal-cli/deployment/gateway.go index c6e1cc10..0cb13d11 100644 --- a/kardinal-cli/deployment/gateway.go +++ b/kardinal-cli/deployment/gateway.go @@ -2,7 +2,6 @@ package deployment import ( "context" - "encoding/json" "fmt" "hash/fnv" "io" @@ -16,6 +15,7 @@ import ( "syscall" "time" + api_types "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,13 +23,11 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" + gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" ) const ( - namespace = "istio-system" - service = "istio-ingressgateway" - localPortForIstio = 61000 - istioGatewayPodPort = 8080 + localPortStartRange = 61000 proxyPortRangeStart = 59000 proxyPortRangeEnd = 60000 maxRetries = 10 @@ -66,61 +64,68 @@ func findAvailablePortInRange(host string, portsInUse *[]int) (int, error) { return port, nil } -func StartGateway(hostFlowIdMap map[string]string) error { - client, err := createKubernetesClient() +func StartGateway(hostFlowIdMap []api_types.IngressAccessEntry) error { + k8sConfig, err := getConfig() if err != nil { return fmt.Errorf("an error occurred while creating a kubernetes client:\n %v", err) } + client, err := createKubernetesClient(k8sConfig) + if err != nil { + return fmt.Errorf("an error occurred while creating a kubernetes client:\n %v", err) + } + gatewayClient, err := createGatewayApiClient(k8sConfig) + if err != nil { + return fmt.Errorf("an error occurred while creating a kubernetes client:\n %v", err) + } + + servers := make([]*http.Server, 0) + ports := make([]int, 0) + stopChan := make(chan struct{}, 1) + + for entryIx, entry := range hostFlowIdMap { + + localPort := int32(localPortStartRange + entryIx) + host := entry.Hostname - for host, flowId := range hostFlowIdMap { logrus.Printf("Starting gateway for host: %s", host) // Check for pods in the prod namespace - err = assertProdNamespaceReady(client.clientSet, flowId) + err = assertProdNamespaceReady(client.clientSet, entry.FlowId) if err != nil { return fmt.Errorf("failed to assert that prod namespace is ready: %v", err) } - // Check for the Envoy filter before proceeding - err = checkGatewayEnvoyFilter(client.clientSet, host) + // Find a pod for the service + pod, port, namespace, err := findPodForService(client.clientSet, gatewayClient, entry) if err != nil { - return err + // return fmt.Errorf("failed to find pod for service: %v", err) + logrus.Errorf("failed to find pod for service: %v", err) + continue } - } - // Find a pod for the service - pod, err := findPodForService(client.clientSet) - if err != nil { - return fmt.Errorf("failed to find pod for service: %v", err) - } - - // Start port forwarding - stopChan := make(chan struct{}, 1) - readyChan := make(chan struct{}) - go func() { - for { - err := portForwardPod(client.config, pod, stopChan, readyChan) - if err != nil { - logrus.Printf("Port forwarding failed: %v. Retrying in 5 seconds...", err) - time.Sleep(5 * time.Second) - continue + // Start port forwarding + readyChan := make(chan struct{}) + go func() { + for { + err := portForwardPod(client.config, namespace, pod, port, stopChan, readyChan, localPort) + if err != nil { + logrus.Printf("Port forwarding failed: %v. Retrying in 5 seconds...", err) + time.Sleep(5 * time.Second) + continue + } + break } - break - } - }() + }() - // Wait for port forwarding to be ready - <-readyChan + // Wait for port forwarding to be ready + <-readyChan - servers := make([]*http.Server, 0) - ports := make([]int, 0) - for host := range hostFlowIdMap { availablePort, err := findAvailablePortInRange(host, &ports) if err != nil { return fmt.Errorf("failed to find an available port: %v", err) } // Start proxy server on the available port - proxy := createProxy(host) + proxy := createProxy(host, localPort) server := &http.Server{ Addr: fmt.Sprintf(":%d", availablePort), Handler: proxy, @@ -217,73 +222,59 @@ func isPodReady(pod *corev1.Pod) bool { return true } -func findPodForService(client *kubernetes.Clientset) (string, error) { - svc, err := client.CoreV1().Services(namespace).Get(context.Background(), service, metav1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("error getting service: %v", err) - } - +func findPodForService(client *kubernetes.Clientset, gwClient *gatewayclientset.Clientset, accessEntry api_types.IngressAccessEntry) (string, int32, string, error) { var labelSelectors []string - for key, value := range svc.Spec.Selector { - labelSelectors = append(labelSelectors, fmt.Sprintf("%s=%s", key, value)) - } - selector := strings.Join(labelSelectors, ",") - - pods, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) - if err != nil { - return "", fmt.Errorf("error listing pods: %v", err) - } - - if len(pods.Items) == 0 { - return "", fmt.Errorf("no pods found for service %s", service) - } - - podName := pods.Items[0].Name - return podName, nil -} - -func checkGatewayEnvoyFilter(client *kubernetes.Clientset, host string) error { - for retry := 0; retry < maxRetries; retry++ { - envoyFilterRaw, err := client.RESTClient(). - Get(). - AbsPath("/apis/networking.istio.io/v1alpha3/namespaces/istio-system/envoyfilters/kardinal-gateway-tracing"). - Do(context.Background()). - Raw() + var port int32 + var namespace string + if accessEntry.Type == "ingress" { + ingress, err := client.NetworkingV1().Ingresses(accessEntry.Namespace).Get(context.Background(), accessEntry.Service, metav1.GetOptions{}) if err != nil { - logrus.Printf("Error getting Envoy filter (attempt %d/%d): %v", retry+1, maxRetries, err) - time.Sleep(retryInterval) - continue + return "", -1, "", fmt.Errorf("error getting ingress: %v", err) } - var envoyFilter map[string]interface{} - err = json.Unmarshal(envoyFilterRaw, &envoyFilter) + ingressClassName := "nginx" + if ingress.Spec.IngressClassName != nil { + ingressClassName = *ingress.Spec.IngressClassName + } + ingressClass, err := client.NetworkingV1().IngressClasses().Get(context.Background(), ingressClassName, metav1.GetOptions{}) if err != nil { - logrus.Printf("Error unmarshaling Envoy filter (attempt %d/%d): %v", retry+1, maxRetries, err) - time.Sleep(retryInterval) - continue + return "", -1, "", fmt.Errorf("error getting ingress class: %v", err) } - luaCode, ok := envoyFilter["spec"].(map[string]interface{})["configPatches"].([]interface{})[0].(map[string]interface{})["patch"].(map[string]interface{})["value"].(map[string]interface{})["typed_config"].(map[string]interface{})["inlineCode"].(string) - if !ok { - logrus.Printf("Error getting Lua code from Envoy filter (attempt %d/%d)", retry+1, maxRetries) - time.Sleep(retryInterval) - continue + for key, value := range ingressClass.Labels { + labelSelectors = append(labelSelectors, fmt.Sprintf("%s=%s", key, value)) } + namespace = ingressClass.Labels["app.kubernetes.io/instance"] + port = 80 - if !strings.Contains(luaCode, host) { - logrus.Printf("Envoy filter 'kardinal-gateway-tracing' does not contain the expected host string: %s (attempt %d/%d)", host, retry+1, maxRetries) - time.Sleep(retryInterval) - continue + } else if accessEntry.Type == "gateway" { + gw, err := gwClient.GatewayV1().Gateways(accessEntry.Namespace).Get(context.Background(), accessEntry.Service, metav1.GetOptions{}) + if err != nil { + return "", 0, "", fmt.Errorf("error getting gateway: %v", err) } + port = int32(gw.Spec.Listeners[0].Port) + labelSelectors = append(labelSelectors, fmt.Sprintf("gateway.networking.k8s.io/gateway-name=%s", accessEntry.Service)) + namespace = accessEntry.Namespace - logrus.Printf("Envoy filter 'kardinal-gateway-tracing' found and contains the expected host string: %s", host) - return nil + } else { + return "", -1, "", fmt.Errorf("unkown access type: %s", accessEntry.Type) } - return fmt.Errorf("failed to find Envoy filter 'kardinal-gateway-tracing' containing the expected host string after %d attempts", maxRetries) + selector := strings.Join(labelSelectors, ",") + pods, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) + if err != nil { + return "", -1, "", fmt.Errorf("error listing pods: %v", err) + } + + if len(pods.Items) == 0 { + return "", -1, "", fmt.Errorf("no pods found for service %s", accessEntry.Service) + } + + podName := pods.Items[0].Name + return podName, port, namespace, nil } -func portForwardPod(config *rest.Config, podName string, stopChan <-chan struct{}, readyChan chan struct{}) error { +func portForwardPod(config *rest.Config, namespace string, podName string, port int32, stopChan <-chan struct{}, readyChan chan struct{}, localPort int32) error { roundTripper, upgrader, err := spdy.RoundTripperFor(config) if err != nil { return fmt.Errorf("failed to create round tripper: %v", err) @@ -299,7 +290,7 @@ func portForwardPod(config *rest.Config, podName string, stopChan <-chan struct{ dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, serverURL) - ports := []string{fmt.Sprintf("%d:%d", localPortForIstio, istioGatewayPodPort)} + ports := []string{fmt.Sprintf("%d:%d", localPort, port)} forwarder, err := portforward.New(dialer, ports, stopChan, readyChan, io.Discard, os.Stderr) if err != nil { return fmt.Errorf("failed to create port forwarder: %v", err) @@ -308,8 +299,8 @@ func portForwardPod(config *rest.Config, podName string, stopChan <-chan struct{ return forwarder.ForwardPorts() } -func createProxy(host string) *httputil.ReverseProxy { - target, _ := url.Parse(fmt.Sprintf("http://localhost:%d", localPortForIstio)) +func createProxy(host string, localPort int32) *httputil.ReverseProxy { + target, _ := url.Parse(fmt.Sprintf("http://localhost:%d", localPort)) proxy := httputil.NewSingleHostReverseProxy(target) originalDirector := proxy.Director diff --git a/kardinal-cli/deployment/kubernetes_client_factory.go b/kardinal-cli/deployment/kubernetes_client_factory.go index 1e099432..d3db7a79 100644 --- a/kardinal-cli/deployment/kubernetes_client_factory.go +++ b/kardinal-cli/deployment/kubernetes_client_factory.go @@ -1,6 +1,8 @@ package deployment import ( + "path/filepath" + "github.com/kurtosis-tech/stacktrace" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" @@ -9,10 +11,10 @@ import ( "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" - "path/filepath" + gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" ) -func createKubernetesClient() (*kubernetesClient, error) { +func getConfig() (*rest.Config, error) { var config *rest.Config // Load in-cluster configuration @@ -27,6 +29,10 @@ func createKubernetesClient() (*kubernetesClient, error) { } } + return config, nil +} + +func createKubernetesClient(config *rest.Config) (*kubernetesClient, error) { clientSet, err := kubernetes.NewForConfig(config) if err != nil { return nil, stacktrace.Propagate(err, "An error occurred while creating kubernetes client using config '%+v'", config) @@ -44,3 +50,12 @@ func createKubernetesClient() (*kubernetesClient, error) { return kubernetesClientObj, nil } + +func createGatewayApiClient(k8sConfig *rest.Config) (*gatewayclientset.Clientset, error) { + gwc, err := gatewayclientset.NewForConfig(k8sConfig) + if err != nil { + return nil, stacktrace.Propagate(err, "An error occurred creating IstIo client from k8s config: %v", k8sConfig) + } + + return gwc, nil +}