Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

Commit

Permalink
update gateway to proxy to new gateway and igress access entries
Browse files Browse the repository at this point in the history
  • Loading branch information
lostbean committed Sep 24, 2024
1 parent d778ec8 commit 89464eb
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 105 deletions.
10 changes: 2 additions & 8 deletions kardinal-cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,19 +395,13 @@ var gatewayCmd = &cobra.Command{
log.Fatalf("List flow response is empty")
}

var host string

hostFlowIdMap := make(map[string]string)
hostFlowIdMap := make([]api_types.IngressAccessEntry, 0)

for _, flow := range *resp.JSON200 {
if slices.Contains(flowIds, flow.FlowId) {
flowId := flow.FlowId
if len(flow.AccessEntry) > 0 {
host = flow.AccessEntry[0].Hostname
if host == "" {
log.Fatalf("Couldn't find flow with id '%s'", flowId)
}
hostFlowIdMap[host] = flowId
hostFlowIdMap = append(hostFlowIdMap, flow.AccessEntry...)
} else {
log.Fatalf("Flow '%s' has no hosts", flowId)
}
Expand Down
12 changes: 10 additions & 2 deletions kardinal-cli/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ type templateData struct {
}

func DeployKardinalManagerInCluster(ctx context.Context, clusterResourcesURL string, kontrolLocation string) error {
kubernetesClientObj, err := createKubernetesClient()
k8sConfig, err := getConfig()
if err != nil {
return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client")
}
kubernetesClientObj, err := createKubernetesClient(k8sConfig)
if err != nil {
return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client")
}
Expand Down Expand Up @@ -237,7 +241,11 @@ func DeployKardinalManagerInCluster(ctx context.Context, clusterResourcesURL str
}

func RemoveKardinalManagerFromCluster(ctx context.Context) error {
kubernetesClientObj, err := createKubernetesClient()
k8sConfig, err := getConfig()
if err != nil {
return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client")
}
kubernetesClientObj, err := createKubernetesClient(k8sConfig)
if err != nil {
return stacktrace.Propagate(err, "An error occurred while creating the Kubernetes client")
}
Expand Down
177 changes: 84 additions & 93 deletions kardinal-cli/deployment/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package deployment

import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"io"
Expand All @@ -16,20 +15,19 @@ import (
"syscall"
"time"

api_types "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)

const (
namespace = "istio-system"
service = "istio-ingressgateway"
localPortForIstio = 61000
istioGatewayPodPort = 8080
localPortStartRange = 61000
proxyPortRangeStart = 59000
proxyPortRangeEnd = 60000
maxRetries = 10
Expand Down Expand Up @@ -66,61 +64,68 @@ func findAvailablePortInRange(host string, portsInUse *[]int) (int, error) {
return port, nil
}

func StartGateway(hostFlowIdMap map[string]string) error {
client, err := createKubernetesClient()
func StartGateway(hostFlowIdMap []api_types.IngressAccessEntry) error {
k8sConfig, err := getConfig()
if err != nil {
return fmt.Errorf("an error occurred while creating a kubernetes client:\n %v", err)
}
client, err := createKubernetesClient(k8sConfig)
if err != nil {
return fmt.Errorf("an error occurred while creating a kubernetes client:\n %v", err)
}
gatewayClient, err := createGatewayApiClient(k8sConfig)
if err != nil {
return fmt.Errorf("an error occurred while creating a kubernetes client:\n %v", err)
}

servers := make([]*http.Server, 0)
ports := make([]int, 0)
stopChan := make(chan struct{}, 1)

for entryIx, entry := range hostFlowIdMap {

localPort := int32(localPortStartRange + entryIx)
host := entry.Hostname

for host, flowId := range hostFlowIdMap {
logrus.Printf("Starting gateway for host: %s", host)

// Check for pods in the prod namespace
err = assertProdNamespaceReady(client.clientSet, flowId)
err = assertProdNamespaceReady(client.clientSet, entry.FlowId)
if err != nil {
return fmt.Errorf("failed to assert that prod namespace is ready: %v", err)
}

// Check for the Envoy filter before proceeding
err = checkGatewayEnvoyFilter(client.clientSet, host)
// Find a pod for the service
pod, port, namespace, err := findPodForService(client.clientSet, gatewayClient, entry)
if err != nil {
return err
// return fmt.Errorf("failed to find pod for service: %v", err)
logrus.Errorf("failed to find pod for service: %v", err)
continue
}
}

// Find a pod for the service
pod, err := findPodForService(client.clientSet)
if err != nil {
return fmt.Errorf("failed to find pod for service: %v", err)
}

// Start port forwarding
stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{})
go func() {
for {
err := portForwardPod(client.config, pod, stopChan, readyChan)
if err != nil {
logrus.Printf("Port forwarding failed: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
// Start port forwarding
readyChan := make(chan struct{})
go func() {
for {
err := portForwardPod(client.config, namespace, pod, port, stopChan, readyChan, localPort)
if err != nil {
logrus.Printf("Port forwarding failed: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
}
break
}
break
}
}()
}()

// Wait for port forwarding to be ready
<-readyChan
// Wait for port forwarding to be ready
<-readyChan

servers := make([]*http.Server, 0)
ports := make([]int, 0)
for host := range hostFlowIdMap {
availablePort, err := findAvailablePortInRange(host, &ports)
if err != nil {
return fmt.Errorf("failed to find an available port: %v", err)
}
// Start proxy server on the available port
proxy := createProxy(host)
proxy := createProxy(host, localPort)
server := &http.Server{
Addr: fmt.Sprintf(":%d", availablePort),
Handler: proxy,
Expand Down Expand Up @@ -217,73 +222,59 @@ func isPodReady(pod *corev1.Pod) bool {
return true
}

func findPodForService(client *kubernetes.Clientset) (string, error) {
svc, err := client.CoreV1().Services(namespace).Get(context.Background(), service, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("error getting service: %v", err)
}

func findPodForService(client *kubernetes.Clientset, gwClient *gatewayclientset.Clientset, accessEntry api_types.IngressAccessEntry) (string, int32, string, error) {
var labelSelectors []string
for key, value := range svc.Spec.Selector {
labelSelectors = append(labelSelectors, fmt.Sprintf("%s=%s", key, value))
}
selector := strings.Join(labelSelectors, ",")

pods, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector})
if err != nil {
return "", fmt.Errorf("error listing pods: %v", err)
}

if len(pods.Items) == 0 {
return "", fmt.Errorf("no pods found for service %s", service)
}

podName := pods.Items[0].Name
return podName, nil
}

func checkGatewayEnvoyFilter(client *kubernetes.Clientset, host string) error {
for retry := 0; retry < maxRetries; retry++ {
envoyFilterRaw, err := client.RESTClient().
Get().
AbsPath("/apis/networking.istio.io/v1alpha3/namespaces/istio-system/envoyfilters/kardinal-gateway-tracing").
Do(context.Background()).
Raw()
var port int32
var namespace string
if accessEntry.Type == "ingress" {
ingress, err := client.NetworkingV1().Ingresses(accessEntry.Namespace).Get(context.Background(), accessEntry.Service, metav1.GetOptions{})
if err != nil {
logrus.Printf("Error getting Envoy filter (attempt %d/%d): %v", retry+1, maxRetries, err)
time.Sleep(retryInterval)
continue
return "", -1, "", fmt.Errorf("error getting ingress: %v", err)
}

var envoyFilter map[string]interface{}
err = json.Unmarshal(envoyFilterRaw, &envoyFilter)
ingressClassName := "nginx"
if ingress.Spec.IngressClassName != nil {
ingressClassName = *ingress.Spec.IngressClassName
}
ingressClass, err := client.NetworkingV1().IngressClasses().Get(context.Background(), ingressClassName, metav1.GetOptions{})
if err != nil {
logrus.Printf("Error unmarshaling Envoy filter (attempt %d/%d): %v", retry+1, maxRetries, err)
time.Sleep(retryInterval)
continue
return "", -1, "", fmt.Errorf("error getting ingress class: %v", err)
}

luaCode, ok := envoyFilter["spec"].(map[string]interface{})["configPatches"].([]interface{})[0].(map[string]interface{})["patch"].(map[string]interface{})["value"].(map[string]interface{})["typed_config"].(map[string]interface{})["inlineCode"].(string)
if !ok {
logrus.Printf("Error getting Lua code from Envoy filter (attempt %d/%d)", retry+1, maxRetries)
time.Sleep(retryInterval)
continue
for key, value := range ingressClass.Labels {
labelSelectors = append(labelSelectors, fmt.Sprintf("%s=%s", key, value))
}
namespace = ingressClass.Labels["app.kubernetes.io/instance"]
port = 80

if !strings.Contains(luaCode, host) {
logrus.Printf("Envoy filter 'kardinal-gateway-tracing' does not contain the expected host string: %s (attempt %d/%d)", host, retry+1, maxRetries)
time.Sleep(retryInterval)
continue
} else if accessEntry.Type == "gateway" {
gw, err := gwClient.GatewayV1().Gateways(accessEntry.Namespace).Get(context.Background(), accessEntry.Service, metav1.GetOptions{})
if err != nil {
return "", 0, "", fmt.Errorf("error getting gateway: %v", err)
}
port = int32(gw.Spec.Listeners[0].Port)
labelSelectors = append(labelSelectors, fmt.Sprintf("gateway.networking.k8s.io/gateway-name=%s", accessEntry.Service))
namespace = accessEntry.Namespace

logrus.Printf("Envoy filter 'kardinal-gateway-tracing' found and contains the expected host string: %s", host)
return nil
} else {
return "", -1, "", fmt.Errorf("unkown access type: %s", accessEntry.Type)
}

return fmt.Errorf("failed to find Envoy filter 'kardinal-gateway-tracing' containing the expected host string after %d attempts", maxRetries)
selector := strings.Join(labelSelectors, ",")
pods, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector})
if err != nil {
return "", -1, "", fmt.Errorf("error listing pods: %v", err)
}

if len(pods.Items) == 0 {
return "", -1, "", fmt.Errorf("no pods found for service %s", accessEntry.Service)
}

podName := pods.Items[0].Name
return podName, port, namespace, nil
}

func portForwardPod(config *rest.Config, podName string, stopChan <-chan struct{}, readyChan chan struct{}) error {
func portForwardPod(config *rest.Config, namespace string, podName string, port int32, stopChan <-chan struct{}, readyChan chan struct{}, localPort int32) error {
roundTripper, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
return fmt.Errorf("failed to create round tripper: %v", err)
Expand All @@ -299,7 +290,7 @@ func portForwardPod(config *rest.Config, podName string, stopChan <-chan struct{

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, serverURL)

ports := []string{fmt.Sprintf("%d:%d", localPortForIstio, istioGatewayPodPort)}
ports := []string{fmt.Sprintf("%d:%d", localPort, port)}
forwarder, err := portforward.New(dialer, ports, stopChan, readyChan, io.Discard, os.Stderr)
if err != nil {
return fmt.Errorf("failed to create port forwarder: %v", err)
Expand All @@ -308,8 +299,8 @@ func portForwardPod(config *rest.Config, podName string, stopChan <-chan struct{
return forwarder.ForwardPorts()
}

func createProxy(host string) *httputil.ReverseProxy {
target, _ := url.Parse(fmt.Sprintf("http://localhost:%d", localPortForIstio))
func createProxy(host string, localPort int32) *httputil.ReverseProxy {
target, _ := url.Parse(fmt.Sprintf("http://localhost:%d", localPort))
proxy := httputil.NewSingleHostReverseProxy(target)

originalDirector := proxy.Director
Expand Down
19 changes: 17 additions & 2 deletions kardinal-cli/deployment/kubernetes_client_factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package deployment

import (
"path/filepath"

"github.com/kurtosis-tech/stacktrace"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
Expand All @@ -9,10 +11,10 @@ import (
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)

func createKubernetesClient() (*kubernetesClient, error) {
func getConfig() (*rest.Config, error) {
var config *rest.Config

// Load in-cluster configuration
Expand All @@ -27,6 +29,10 @@ func createKubernetesClient() (*kubernetesClient, error) {
}
}

return config, nil
}

func createKubernetesClient(config *rest.Config) (*kubernetesClient, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred while creating kubernetes client using config '%+v'", config)
Expand All @@ -44,3 +50,12 @@ func createKubernetesClient() (*kubernetesClient, error) {

return kubernetesClientObj, nil
}

func createGatewayApiClient(k8sConfig *rest.Config) (*gatewayclientset.Clientset, error) {
gwc, err := gatewayclientset.NewForConfig(k8sConfig)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred creating IstIo client from k8s config: %v", k8sConfig)
}

return gwc, nil
}

0 comments on commit 89464eb

Please sign in to comment.