From 59c54b923d7886d07fbba26eaa2dd3acf6f21002 Mon Sep 17 00:00:00 2001 From: rongxin Date: Wed, 24 Aug 2022 17:11:08 +0800 Subject: [PATCH 1/6] refactor: translate upstream --- .../apisix/translation/apisix_route.go | 160 ------------------ .../apisix/translation/apisix_upstream.go | 20 --- .../ingress/translation/translator.go | 129 +++----------- pkg/providers/translation/service.go | 60 ++++++- 4 files changed, 83 insertions(+), 286 deletions(-) diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go index f91ea1289e..644feec21c 100644 --- a/pkg/providers/apisix/translation/apisix_route.go +++ b/pkg/providers/apisix/translation/apisix_route.go @@ -21,7 +21,6 @@ import ( "strings" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -914,165 +913,6 @@ func (t *translator) translateStreamRouteNotStrictlyV2(ctx *translation.Translat return nil } -func (t *translator) GetServiceClusterIPAndPort(backend *configv2.ApisixRouteHTTPBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.Any("namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - -// getStreamServiceClusterIPAndPortV2beta2 is for v2beta2 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2beta2(backend configv2beta2.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - -// getStreamServiceClusterIPAndPortV2beta3 is for v2beta3 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2beta3(backend configv2beta3.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - -// getStreamServiceClusterIPAndPortV2 is for v2 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2(backend configv2.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - func (t *translator) TranslateOldRoute(ar kube.ApisixRoute) (*translation.TranslateContext, error) { switch ar.GroupVersion() { case config.ApisixV2: diff --git a/pkg/providers/apisix/translation/apisix_upstream.go b/pkg/providers/apisix/translation/apisix_upstream.go index ac1e901b5d..093f18b978 100644 --- a/pkg/providers/apisix/translation/apisix_upstream.go +++ b/pkg/providers/apisix/translation/apisix_upstream.go @@ -16,7 +16,6 @@ package translation import ( "github.com/apache/apisix-ingress-controller/pkg/id" - "github.com/apache/apisix-ingress-controller/pkg/providers/translation" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -27,22 +26,3 @@ func (t *translator) translateUpstreamNotStrictly(namespace, svcName, subset str ups.ID = id.GenID(ups.Name) return ups, nil } - -func (t *translator) translateService(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) { - ups, err := t.TranslateService(namespace, svcName, subset, svcPort) - if err != nil { - return nil, err - } - if svcResolveGranularity == "service" { - ups.Nodes = apisixv1.UpstreamNodes{ - { - Host: svcClusterIP, - Port: int(svcPort), - Weight: translation.DefaultWeight, - }, - } - } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort) - ups.ID = id.GenID(ups.Name) - return ups, nil -} diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index 48b2a87dfa..2892f093ec 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -79,18 +79,14 @@ func NewIngressTranslator(opts *TranslatorOptions, return t } -func (t *translator) TranslateIngress(ing kube.Ingress, args ...bool) (*translation.TranslateContext, error) { - var skipVerify = false - if len(args) != 0 { - skipVerify = args[0] - } +func (t *translator) TranslateIngress(ing kube.Ingress) (*translation.TranslateContext, error) { switch ing.GroupVersion() { case kube.IngressV1: - return t.translateIngressV1(ing.V1(), skipVerify) + return t.translateIngressV1(ing.V1()) case kube.IngressV1beta1: - return t.translateIngressV1beta1(ing.V1beta1(), skipVerify) + return t.translateIngressV1beta1(ing.V1beta1()) case kube.IngressExtensionsV1beta1: - return t.translateIngressExtensionsV1beta1(ing.ExtensionsV1beta1(), skipVerify) + return t.translateIngressExtensionsV1beta1(ing.ExtensionsV1beta1()) default: return nil, fmt.Errorf("translator: source group version not supported: %s", ing.GroupVersion()) } @@ -145,17 +141,13 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bo err error ) if pathRule.Backend.Service != nil { - if skipVerify { - ups = t.translateDefaultUpstreamFromIngressV1(ing.Namespace, pathRule.Backend.Service) - } else { - ups, err = t.translateUpstreamFromIngressV1(ing.Namespace, pathRule.Backend.Service) - if err != nil { - log.Errorw("failed to translate ingress backend to upstream", - zap.Error(err), - zap.Any("ingress", ing), - ) - return nil, err - } + ups, err = t.translateUpstreamFromIngressV1(ing.Namespace, pathRule.Backend.Service) + if err != nil { + log.Errorw("failed to translate ingress backend to upstream", + zap.Error(err), + zap.Any("ingress", ing), + ) + return nil, err } ctx.AddUpstream(ups) } @@ -220,7 +212,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bo return ctx, nil } -func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { +func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() plugins := t.TranslateAnnotations(ing.Annotations) annoExtractor := annotations.NewExtractor(ing.Annotations) @@ -265,17 +257,13 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress, ski err error ) if pathRule.Backend.ServiceName != "" { - if skipVerify { - ups = t.translateDefaultUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - } else { - ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - if err != nil { - log.Errorw("failed to translate ingress backend to upstream", - zap.Error(err), - zap.Any("ingress", ing), - ) - return nil, err - } + ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) + if err != nil { + log.Errorw("failed to translate ingress backend to upstream", + zap.Error(err), + zap.Any("ingress", ing), + ) + return nil, err } ctx.AddUpstream(ups) } @@ -363,36 +351,6 @@ func (t *translator) translateDefaultUpstreamFromIngressV1(namespace string, bac ups.ID = id.GenID(ups.Name) return ups } -func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *networkingv1.IngressServiceBackend) (*apisixv1.Upstream, error) { - var svcPort int32 - if backend.Port.Name != "" { - svc, err := t.ServiceLister.Services(namespace).Get(backend.Name) - if err != nil { - return nil, err - } - for _, port := range svc.Spec.Ports { - if port.Name == backend.Port.Name { - svcPort = port.Port - break - } - } - if svcPort == 0 { - return nil, &translation.TranslateError{ - Field: "service", - Reason: "port not found", - } - } - } else { - svcPort = backend.Port.Number - } - ups, err := t.TranslateService(namespace, backend.Name, "", svcPort) - if err != nil { - return nil, err - } - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort) - ups.ID = id.GenID(ups.Name) - return ups, nil -} func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() @@ -410,17 +368,13 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In ) if pathRule.Backend.ServiceName != "" { // Structure here is same to ingress.extensions/v1beta1, so just use this method. - if skipVerify { - ups = t.translateDefaultUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - } else { - ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - if err != nil { - log.Errorw("failed to translate ingress backend to upstream", - zap.Error(err), - zap.Any("ingress", ing), - ) - return nil, err - } + ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) + if err != nil { + log.Errorw("failed to translate ingress backend to upstream", + zap.Error(err), + zap.Any("ingress", ing), + ) + return nil, err } ctx.AddUpstream(ups) } @@ -509,37 +463,6 @@ func (t *translator) translateDefaultUpstreamFromIngressV1beta1(namespace string return ups } -func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcName string, svcPort intstr.IntOrString) (*apisixv1.Upstream, error) { - var portNumber int32 - if svcPort.Type == intstr.String { - svc, err := t.ServiceLister.Services(namespace).Get(svcName) - if err != nil { - return nil, err - } - for _, port := range svc.Spec.Ports { - if port.Name == svcPort.StrVal { - portNumber = port.Port - break - } - } - if portNumber == 0 { - return nil, &translation.TranslateError{ - Field: "service", - Reason: "port not found", - } - } - } else { - portNumber = svcPort.IntVal - } - ups, err := t.TranslateService(namespace, svcName, "", portNumber) - if err != nil { - return nil, err - } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber) - ups.ID = id.GenID(ups.Name) - return ups, nil -} - func (t *translator) TranslateOldIngress(ing kube.Ingress) (*translation.TranslateContext, error) { switch ing.GroupVersion() { case kube.IngressV1: diff --git a/pkg/providers/translation/service.go b/pkg/providers/translation/service.go index 196e7c45d6..ab87a8f7ca 100644 --- a/pkg/providers/translation/service.go +++ b/pkg/providers/translation/service.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/kube" @@ -50,7 +51,7 @@ func (t *translator) TranslateService(namespace, name, subset string, port int32 } } -func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { +func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, subset string, intstr.IntOrString) (*apisixv1.Upstream, error) { au, err := t.ApisixUpstreamLister.V2(namespace, name) ups := apisixv1.NewDefaultUpstream() if err != nil { @@ -102,7 +103,7 @@ func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, sub return ups, nil } -func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { +func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name, subset string, port intstr.IntOrString) (*apisixv1.Upstream, error) { au, err := t.ApisixUpstreamLister.V2beta3(namespace, name) ups := apisixv1.NewDefaultUpstream() if err != nil { @@ -145,7 +146,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name } } // Filter nodes by subset. - nodes, err := t.TranslateEndpoint(*ep, port, labels) + nodes, err := t.TranslateEndpoints(namespace, name, port, labels) if err != nil { return nil, err } @@ -218,6 +219,59 @@ func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, label return nodes, nil } +func (t *translator) TranslateEndpoints(namespace, name string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) { + svc, err := t.ServiceLister.Services(namespace).Get(name) + if err != nil { + return nil, &TranslateError{ + Field: "service", + Reason: err.Error(), + } + } + var svcPort *corev1.ServicePort + if port.Type == intstr.String { + for _, exposePort := range svc.Spec.Ports { + if exposePort.Name == port.StrVal { + svcPort = &exposePort + break + } + } + } else { + for _, exposePort := range svc.Spec.Ports { + if exposePort.Port == port.IntVal { + svcPort = &exposePort + break + } + } + } + if svcPort == nil { + return nil, &TranslateError{ + Field: "service", + Reason: "port not found", + } + } + nodes := make(apisixv1.UpstreamNodes, 0) + endpoint, err := t.EndpointLister.GetEndpoint(namespace, name) + if err != nil { + return nodes, nil + } + + // As nodes is not optional, here we create an empty slice, + // not a nil slice. + for _, hostport := range endpoint.Endpoints(svcPort) { + nodes = append(nodes, apisixv1.UpstreamNode{ + Host: hostport.Host, + Port: hostport.Port, + // FIXME Custom node weight + Weight: DefaultWeight, + }) + } + if labels != nil { + nodes = t.filterNodesByLabels(nodes, labels, namespace) + return nodes, nil + } + return nodes, nil +} + func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes { if labels == nil { return nodes From 4559b34b6c3b27570ac340c582826b08f9cb1661 Mon Sep 17 00:00:00 2001 From: rongxin Date: Mon, 29 Aug 2022 11:29:48 +0800 Subject: [PATCH 2/6] refactor TranslateEndpoint --- .../apisix/translation/apisix_plugin.go | 6 +- .../apisix/translation/apisix_plugin_test.go | 2 +- .../apisix/translation/apisix_route.go | 153 ++++++++--------- .../gateway/translation/gateway_httproute.go | 3 +- .../gateway/translation/gateway_tlsroute.go | 3 +- .../ingress/translation/translator.go | 41 ++--- pkg/providers/k8s/endpoint/base.go | 5 +- pkg/providers/translation/translator.go | 14 +- pkg/providers/translation/translator_test.go | 12 +- .../translation/{service.go => upstream.go} | 161 +++++++++--------- 10 files changed, 190 insertions(+), 210 deletions(-) rename pkg/providers/translation/{service.go => upstream.go} (65%) diff --git a/pkg/providers/apisix/translation/apisix_plugin.go b/pkg/providers/apisix/translation/apisix_plugin.go index 4c726ede55..2a62ae70d1 100644 --- a/pkg/providers/apisix/translation/apisix_plugin.go +++ b/pkg/providers/apisix/translation/apisix_plugin.go @@ -46,11 +46,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *translation.TranslateConte ) for _, backend := range backends { - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ns) - if err != nil { - return nil, err - } - ups, err := t.translateService(ns, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ns, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return nil, err } diff --git a/pkg/providers/apisix/translation/apisix_plugin_test.go b/pkg/providers/apisix/translation/apisix_plugin_test.go index 2c0b224cf6..3e904c13d9 100644 --- a/pkg/providers/apisix/translation/apisix_plugin_test.go +++ b/pkg/providers/apisix/translation/apisix_plugin_test.go @@ -546,7 +546,7 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) { cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, cfg) assert.NotNil(t, err) - assert.Equal(t, "service.spec.ports: port not defined", err.Error()) + assert.Contains(t, err.Error(), "service.Spec.Ports: port.Name not defined") backends[1].ServicePort.StrVal = "port2" backends[1].ResolveGranularity = "service" diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go index 644feec21c..4da7846294 100644 --- a/pkg/providers/apisix/translation/apisix_route.go +++ b/pkg/providers/apisix/translation/apisix_route.go @@ -21,6 +21,7 @@ import ( "strings" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -119,16 +120,6 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext backend := backends[0] backends = backends[1:] - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } - pluginMap := make(apisixv1.Plugins) // add route plugins for _, plugin := range part.Plugins { @@ -155,6 +146,7 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext } var exprs [][]apisixv1.StringOrSlice + var err error if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -174,7 +166,6 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -184,7 +175,6 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap @@ -205,14 +195,15 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext } route.Plugins["traffic-split"] = plugin } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) - if err != nil { - return err - } + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) + if err != nil { + return err + } + if !ctx.CheckUpstreamExist(ups.Name) { ctx.AddUpstream(ups) } + route.UpstreamId = ups.ID + ctx.AddRoute(route) } return nil } @@ -230,16 +221,6 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext backend := backends[0] backends = backends[1:] - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } - var timeout *apisixv1.UpstreamTimeout if part.Timeout != nil { timeout = &apisixv1.UpstreamTimeout{ @@ -289,6 +270,7 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext } var exprs [][]apisixv1.StringOrSlice + var err error if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -308,7 +290,6 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -318,7 +299,6 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap route.Timeout = timeout @@ -341,14 +321,15 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext } route.Plugins["traffic-split"] = plugin } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) - if err != nil { - return err - } + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) + if err != nil { + return err + } + if !ctx.CheckUpstreamExist(ups.Name) { ctx.AddUpstream(ups) } + route.UpstreamId = ups.ID + ctx.AddRoute(route) } return nil } @@ -366,16 +347,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar backend := backends[0] backends = backends[1:] - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } - var timeout *apisixv1.UpstreamTimeout if part.Timeout != nil { timeout = &apisixv1.UpstreamTimeout{ @@ -425,6 +396,7 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar } var exprs [][]apisixv1.StringOrSlice + var err error if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -444,7 +416,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -454,7 +425,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap route.Timeout = timeout @@ -477,14 +447,15 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar } route.Plugins["traffic-split"] = plugin } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) - if err != nil { - return err - } + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) + if err != nil { + return err + } + if !ctx.CheckUpstreamExist(ups.Name) { ctx.AddUpstream(ups) } + route.UpstreamId = ups.ID + ctx.AddRoute(route) } return nil } @@ -738,20 +709,12 @@ func (t *translator) translateStreamRouteV2beta2(ctx *translation.TranslateConte } ruleNameMap[part.Name] = struct{}{} backend := part.Backend - svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2beta2(backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } + sr := apisixv1.NewDefaultStreamRoute() name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return err } @@ -773,20 +736,11 @@ func (t *translator) translateStreamRouteV2beta3(ctx *translation.TranslateConte } ruleNameMap[part.Name] = struct{}{} backend := part.Backend - svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2beta3(backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } sr := apisixv1.NewDefaultStreamRoute() name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return err } @@ -808,15 +762,6 @@ func (t *translator) translateStreamRouteV2(ctx *translation.TranslateContext, a } ruleNameMap[part.Name] = struct{}{} backend := part.Backend - svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2(backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } // add stream route plugins pluginMap := make(apisixv1.Plugins) @@ -835,7 +780,7 @@ func (t *translator) translateStreamRouteV2(ctx *translation.TranslateContext, a name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return err } @@ -999,3 +944,43 @@ func (t *translator) translateOldRouteV2beta3(ar *configv2beta3.ApisixRoute) (*t } return oldCtx, nil } + +// getStreamServiceClusterIPAndPortV2beta2 is for v2beta2 streamRoute +func (t *translator) getStreamServiceClusterIPAndPortV2beta2(backend configv2beta2.ApisixRouteStreamBackend, ns string) (string, int32, error) { + svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) + if err != nil { + return "", 0, err + } + svcPort := int32(-1) + if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { + log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", + zap.String("ApisixRoute namespace", ns), + zap.Any("service", svc), + ) + return "", 0, errors.New("conflict headless service and backend resolve granularity") + } +loop: + for _, port := range svc.Spec.Ports { + switch backend.ServicePort.Type { + case intstr.Int: + if backend.ServicePort.IntVal == port.Port { + svcPort = port.Port + break loop + } + case intstr.String: + if backend.ServicePort.StrVal == port.Name { + svcPort = port.Port + break loop + } + } + } + if svcPort == -1 { + log.Errorw("ApisixRoute refers to non-existent Service port", + zap.String("ApisixRoute namespace", ns), + zap.String("port", backend.ServicePort.String()), + ) + return "", 0, err + } + + return svc.Spec.ClusterIP, svcPort, nil +} diff --git a/pkg/providers/gateway/translation/gateway_httproute.go b/pkg/providers/gateway/translation/gateway_httproute.go index e1eaa657cf..7c357b4478 100644 --- a/pkg/providers/gateway/translation/gateway_httproute.go +++ b/pkg/providers/gateway/translation/gateway_httproute.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -92,7 +93,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha continue } - ups, err := t.KubeTranslator.TranslateService(ns, string(backend.Name), "", int32(*backend.Port)) + ups, err := t.KubeTranslator.TranslateUpstream(ns, string(backend.Name), "", "", intstr.FromInt(int(*backend.Port))) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } diff --git a/pkg/providers/gateway/translation/gateway_tlsroute.go b/pkg/providers/gateway/translation/gateway_tlsroute.go index 138cc40c2e..d2cef7b0f9 100644 --- a/pkg/providers/gateway/translation/gateway_tlsroute.go +++ b/pkg/providers/gateway/translation/gateway_tlsroute.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -86,7 +87,7 @@ func (t *translator) TranslateGatewayTLSRouteV1Alpha2(tlsRoute *gatewayv1alpha2. continue } - ups, err := t.KubeTranslator.TranslateService(ns, string(backend.Name), "", int32(*backend.Port)) + ups, err := t.KubeTranslator.TranslateUpstream(ns, string(backend.Name), "", "", intstr.FromInt(int(*backend.Port))) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index 2892f093ec..29f989dc59 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -62,7 +62,7 @@ type IngressTranslator interface { // TranslateIngress composes a couple of APISIX Routes and upstreams according // to the given Ingress resource. // For old objects, you cannot use TranslateIngress to build. Because it needs to parse the latest service, which will cause data inconsistency. - TranslateIngress(ing kube.Ingress, args ...bool) (*translation.TranslateContext, error) + TranslateIngress(ing kube.Ingress) (*translation.TranslateContext, error) // TranslateOldIngress get route objects from cache // Build upstream and plugin_config through route TranslateOldIngress(kube.Ingress) (*translation.TranslateContext, error) @@ -96,7 +96,7 @@ const ( _regexPriority = 100 ) -func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { +func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() plugins := t.TranslateAnnotations(ing.Annotations) annoExtractor := annotations.NewExtractor(ing.Annotations) @@ -141,7 +141,13 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bo err error ) if pathRule.Backend.Service != nil { - ups, err = t.translateUpstreamFromIngressV1(ing.Namespace, pathRule.Backend.Service) + var port intstr.IntOrString + if pathRule.Backend.Service.Name != "" { + port = intstr.FromString(pathRule.Backend.Service.Port.Name) + } else { + port = intstr.FromInt(int(pathRule.Backend.Service.Port.Number)) + } + ups, err = t.TranslateUpstream(ing.Namespace, pathRule.Backend.Service.Name, "", "", port) if err != nil { log.Errorw("failed to translate ingress backend to upstream", zap.Error(err), @@ -257,7 +263,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*t err error ) if pathRule.Backend.ServiceName != "" { - ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) + ups, err = t.TranslateUpstream(ing.Namespace, pathRule.Backend.ServiceName, "", "", pathRule.Backend.ServicePort) if err != nil { log.Errorw("failed to translate ingress backend to upstream", zap.Error(err), @@ -352,7 +358,7 @@ func (t *translator) translateDefaultUpstreamFromIngressV1(namespace string, bac return ups } -func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { +func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() plugins := t.TranslateAnnotations(ing.Annotations) annoExtractor := annotations.NewExtractor(ing.Annotations) @@ -368,7 +374,7 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In ) if pathRule.Backend.ServiceName != "" { // Structure here is same to ingress.extensions/v1beta1, so just use this method. - ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) + ups, err = t.TranslateUpstream(ing.Namespace, pathRule.Backend.ServiceName, "", "", pathRule.Backend.ServicePort) if err != nil { log.Errorw("failed to translate ingress backend to upstream", zap.Error(err), @@ -440,29 +446,6 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In return ctx, nil } -func (t *translator) translateDefaultUpstreamFromIngressV1beta1(namespace string, svcName string, svcPort intstr.IntOrString) *apisixv1.Upstream { - var portNumber int32 - if svcPort.Type == intstr.String { - svc, err := t.ServiceLister.Services(namespace).Get(svcName) - if err != nil { - portNumber = 0 - } else { - for _, port := range svc.Spec.Ports { - if port.Name == svcPort.StrVal { - portNumber = port.Port - break - } - } - } - } else { - portNumber = svcPort.IntVal - } - ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber) - ups.ID = id.GenID(ups.Name) - return ups -} - func (t *translator) TranslateOldIngress(ing kube.Ingress) (*translation.TranslateContext, error) { switch ing.GroupVersion() { case kube.IngressV1: diff --git a/pkg/providers/k8s/endpoint/base.go b/pkg/providers/k8s/endpoint/base.go index ac67aa870a..41ac212d37 100644 --- a/pkg/providers/k8s/endpoint/base.go +++ b/pkg/providers/k8s/endpoint/base.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/intstr" listerscorev1 "k8s.io/client-go/listers/core/v1" "github.com/apache/apisix-ingress-controller/pkg/config" @@ -79,7 +80,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo clusters := c.APISIX.ListClusters() for _, port := range svc.Spec.Ports { for _, subset := range subsets { - nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels) + nodes, err := c.translator.TranslateEndpoint(ep, intstr.FromInt(int(port.Port)), subset.Labels) if err != nil { log.Errorw("failed to translate upstream nodes", zap.Error(err), @@ -110,7 +111,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo clusters := c.APISIX.ListClusters() for _, port := range svc.Spec.Ports { for _, subset := range subsets { - nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels) + nodes, err := c.translator.TranslateEndpoint(ep, intstr.FromInt(int(port.Port)), subset.Labels) if err != nil { log.Errorw("failed to translate upstream nodes", zap.Error(err), diff --git a/pkg/providers/translation/translator.go b/pkg/providers/translation/translator.go index 13ee36b26c..464c6a536e 100644 --- a/pkg/providers/translation/translator.go +++ b/pkg/providers/translation/translator.go @@ -17,6 +17,8 @@ package translation import ( "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" listerscorev1 "k8s.io/client-go/listers/core/v1" "github.com/apache/apisix-ingress-controller/pkg/kube" @@ -47,6 +49,13 @@ type Translator interface { // TranslateUpstreamConfigV2 translates ApisixUpstreamConfig (part of ApisixUpstream) // to APISIX Upstream, it doesn't fill the the Upstream metadata and nodes. TranslateUpstreamConfigV2(*configv2.ApisixUpstreamConfig) (*apisixv1.Upstream, error) + // TranslateService translate Service resources to APISIX Upstream nodes (cluster ip) + // upstream nodes. + TranslateService(*corev1.Service, intstr.IntOrString) (apisixv1.UpstreamNodes, error) + // TranslateEndpoint translate Endpoints resources to APISIX Upstream nodes (pod ip) + // according to the give port. Extra labels can be passed to filter the ultimate + // upstream nodes. + TranslateEndpoint(kube.Endpoint, intstr.IntOrString, types.Labels) (apisixv1.UpstreamNodes, error) // TranslateUpstream composes an upstream according to the // given namespace, name (searching Service/Endpoints) and port (filtering Endpoints). // The returned Upstream doesn't have metadata info. @@ -56,11 +65,12 @@ type Translator interface { // matching the subset labels (defined in ApisixUpstream) will be selected. // When the subset is not found, the node list will be empty. When the subset is empty, // all pods IP will be filled. - TranslateService(string, string, string, int32) (*apisixv1.Upstream, error) + // Resolvegranularity is used to resolve the granularity of the service. It supports service/endpoint. + TranslateUpstream(namespace string, name string, subset string, resolveGranularity string, port intstr.IntOrString) (*apisixv1.Upstream, error) // TranslateUpstreamNodes translate Endpoints resources to APISIX Upstream nodes // according to the give port. Extra labels can be passed to filter the ultimate // upstream nodes. - TranslateEndpoint(kube.Endpoint, int32, types.Labels) (apisixv1.UpstreamNodes, error) + TranslateUpstreamNodes(namespace string, name string, resolveGranularity string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) } // TranslatorOptions contains options to help Translator diff --git a/pkg/providers/translation/translator_test.go b/pkg/providers/translation/translator_test.go index 73ecf974f2..b6d207e140 100644 --- a/pkg/providers/translation/translator_test.go +++ b/pkg/providers/translation/translator_test.go @@ -244,14 +244,14 @@ func TestTranslateUpstreamNodes(t *testing.T) { }} <-processCh - nodes, err := tr.TranslateEndpoint(kube.NewEndpoint(endpoints), 10080, nil) + nodes, err := tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(10080), nil) assert.Nil(t, nodes) assert.Equal(t, &TranslateError{ Field: "service.spec.ports", Reason: "port not defined", }, err) - nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), 80, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(80), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { @@ -266,7 +266,7 @@ func TestTranslateUpstreamNodes(t *testing.T) { }, }, nodes) - nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), 443, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(443), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { @@ -373,14 +373,14 @@ func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { }} <-processCh - nodes, err := tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), 10080, nil) + nodes, err := tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(10080), nil) assert.Nil(t, nodes) assert.Equal(t, err, &TranslateError{ Field: "service.spec.ports", Reason: "port not defined", }) - nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), 80, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(80), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { @@ -395,7 +395,7 @@ func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { }, }, nodes) - nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), 443, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(443), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { diff --git a/pkg/providers/translation/service.go b/pkg/providers/translation/upstream.go similarity index 65% rename from pkg/providers/translation/service.go rename to pkg/providers/translation/upstream.go index ab87a8f7ca..673c5413f2 100644 --- a/pkg/providers/translation/service.go +++ b/pkg/providers/translation/upstream.go @@ -18,6 +18,7 @@ package translation import ( + "errors" "fmt" "go.uber.org/zap" @@ -26,34 +27,39 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" + "github.com/apache/apisix-ingress-controller/pkg/id" "github.com/apache/apisix-ingress-controller/pkg/kube" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) -func (t *translator) TranslateService(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { - endpoint, err := t.EndpointLister.GetEndpoint(namespace, name) +func (t *translator) TranslateUpstream(namespace, name, subset, resolveGranularity string, port intstr.IntOrString) (*apisixv1.Upstream, error) { + svc, err := t.ServiceLister.Services(namespace).Get(name) if err != nil { - return nil, &TranslateError{ - Field: "endpoints", - Reason: err.Error(), - } + return nil, err } - + portInt32, err := t.parseServicePort(svc, port) + if err != nil { + return nil, err + } + port = intstr.FromInt(int(portInt32)) switch t.APIVersion { case config.ApisixV2beta3: - return t.translateUpstreamV2beta3(&endpoint, namespace, name, subset, port) + return t.translateUpstreamV2beta3(namespace, name, subset, port, resolveGranularity) case config.ApisixV2: - return t.translateUpstreamV2(&endpoint, namespace, name, subset, port) + return t.translateUpstreamV2(namespace, name, subset, port, resolveGranularity) default: panic(fmt.Errorf("unsupported ApisixUpstream version %v", t.APIVersion)) } } -func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, subset string, intstr.IntOrString) (*apisixv1.Upstream, error) { - au, err := t.ApisixUpstreamLister.V2(namespace, name) +func (t *translator) translateUpstreamV2(namespace, name, subset string, port intstr.IntOrString, resolveGranularity string) (*apisixv1.Upstream, error) { ups := apisixv1.NewDefaultUpstream() + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.ID = id.GenID(ups.Name) + + au, err := t.ApisixUpstreamLister.V2(namespace, name) if err != nil { if k8serrors.IsNotFound(err) { // If subset in ApisixRoute is not empty but the ApisixUpstream resource not found, @@ -79,7 +85,7 @@ func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, sub } } // Filter nodes by subset. - nodes, err := t.TranslateEndpoint(*ep, port, labels) + nodes, err := t.TranslateUpstreamNodes(namespace, name, resolveGranularity, port, labels) if err != nil { return nil, err } @@ -90,7 +96,7 @@ func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, sub upsCfg := &au.V2().Spec.ApisixUpstreamConfig for _, pls := range au.V2().Spec.PortLevelSettings { - if pls.Port == port { + if pls.Port == port.IntVal { upsCfg = &pls.ApisixUpstreamConfig break } @@ -100,12 +106,17 @@ func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, sub return nil, err } ups.Nodes = nodes + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.ID = id.GenID(ups.Name) return ups, nil } -func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name, subset string, port intstr.IntOrString) (*apisixv1.Upstream, error) { - au, err := t.ApisixUpstreamLister.V2beta3(namespace, name) +func (t *translator) translateUpstreamV2beta3(namespace, name, subset string, port intstr.IntOrString, resolveGranularity string) (*apisixv1.Upstream, error) { ups := apisixv1.NewDefaultUpstream() + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.ID = id.GenID(ups.Name) + + au, err := t.ApisixUpstreamLister.V2beta3(namespace, name) if err != nil { if k8serrors.IsNotFound(err) { // If subset in ApisixRoute is not empty but the ApisixUpstream resource not found, @@ -121,21 +132,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name } } } - if err != nil { - if k8serrors.IsNotFound(err) { - // If subset in ApisixRoute is not empty but the ApisixUpstream resource not found, - // just set an empty node list. - if subset != "" { - ups.Nodes = apisixv1.UpstreamNodes{} - return ups, nil - } - } else { - return nil, &TranslateError{ - Field: "ApisixUpstream", - Reason: err.Error(), - } - } - } + var labels types.Labels if subset != "" { for _, ss := range au.V2beta3().Spec.Subsets { @@ -146,7 +143,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name } } // Filter nodes by subset. - nodes, err := t.TranslateEndpoints(namespace, name, port, labels) + nodes, err := t.TranslateUpstreamNodes(namespace, name, resolveGranularity, port, labels) if err != nil { return nil, err } @@ -157,7 +154,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name upsCfg := &au.V2beta3().Spec.ApisixUpstreamConfig for _, pls := range au.V2beta3().Spec.PortLevelSettings { - if pls.Port == port { + if pls.Port == port.IntVal { upsCfg = &pls.ApisixUpstreamConfig break } @@ -170,7 +167,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name return ups, nil } -func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, labels types.Labels) (apisixv1.UpstreamNodes, error) { +func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) { namespace, err := endpoint.Namespace() if err != nil { log.Errorw("failed to get endpoint namespace", @@ -190,7 +187,7 @@ func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, label var svcPort *corev1.ServicePort for _, exposePort := range svc.Spec.Ports { - if exposePort.Port == port { + if exposePort.Port == port.IntVal { svcPort = &exposePort break } @@ -219,57 +216,42 @@ func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, label return nodes, nil } -func (t *translator) TranslateEndpoints(namespace, name string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) { - svc, err := t.ServiceLister.Services(namespace).Get(name) - if err != nil { - return nil, &TranslateError{ - Field: "service", - Reason: err.Error(), - } +func (t *translator) TranslateService(svc *corev1.Service, port intstr.IntOrString) (apisixv1.UpstreamNodes, error) { + if svc == nil { + return nil, errors.New("service should not be empty") } - var svcPort *corev1.ServicePort - if port.Type == intstr.String { - for _, exposePort := range svc.Spec.Ports { - if exposePort.Name == port.StrVal { - svcPort = &exposePort - break - } - } - } else { - for _, exposePort := range svc.Spec.Ports { - if exposePort.Port == port.IntVal { - svcPort = &exposePort - break - } - } + if svc.Spec.ClusterIP == "" { + return nil, errors.New("conflict headless service and backend resolve granularity") } - if svcPort == nil { - return nil, &TranslateError{ - Field: "service", - Reason: "port not found", - } - } - nodes := make(apisixv1.UpstreamNodes, 0) - endpoint, err := t.EndpointLister.GetEndpoint(namespace, name) + svcPort, err := t.parseServicePort(svc, port) if err != nil { - return nodes, nil + return nil, err } - - // As nodes is not optional, here we create an empty slice, - // not a nil slice. - for _, hostport := range endpoint.Endpoints(svcPort) { - nodes = append(nodes, apisixv1.UpstreamNode{ - Host: hostport.Host, - Port: hostport.Port, - // FIXME Custom node weight + return apisixv1.UpstreamNodes{ + { + Host: svc.Spec.ClusterIP, + Port: int(svcPort), Weight: DefaultWeight, - }) - } - if labels != nil { - nodes = t.filterNodesByLabels(nodes, labels, namespace) - return nodes, nil + }, + }, nil +} + +func (t *translator) TranslateUpstreamNodes(namespace, name, resolveGranularity string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) { + nodes := make(apisixv1.UpstreamNodes, 0) + switch resolveGranularity { + case "service": + svc, err := t.ServiceLister.Services(namespace).Get(name) + if err != nil { + return nil, err + } + return t.TranslateService(svc, port) + default: + ep, err := t.EndpointLister.GetEndpoint(namespace, name) + if err != nil { + return nodes, nil + } + return t.TranslateEndpoint(ep, port, labels) } - return nodes, nil } func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes { @@ -301,3 +283,24 @@ func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels ty } return filteredNodes } + +func (t *translator) parseServicePort(svc *corev1.Service, port intstr.IntOrString) (int32, error) { + if svc == nil { + return 0, fmt.Errorf("service does not exist") + } + if port.Type == intstr.String { + for _, p := range svc.Spec.Ports { + if p.Name == port.StrVal { + return p.Port, nil + } + } + return 0, fmt.Errorf("service.Spec.Ports: port.Name not defined, port.Name: %s", port.StrVal) + } + for _, p := range svc.Spec.Ports { + if p.Port == port.IntVal { + return p.Port, nil + } + } + return 0, fmt.Errorf("service.Spec.Ports: port.Port not defined, port.Port: %d", port.IntVal) + +} From 57f604c13b9d7272479955332864beb01776ebc3 Mon Sep 17 00:00:00 2001 From: rongxin Date: Tue, 30 Aug 2022 10:00:16 +0800 Subject: [PATCH 3/6] golangci-lint --- .../apisix/translation/apisix_route.go | 41 ------------------- .../ingress/translation/translator.go | 24 ----------- pkg/providers/translation/translator_test.go | 8 ++-- pkg/providers/translation/upstream.go | 35 +++++++--------- 4 files changed, 19 insertions(+), 89 deletions(-) diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go index 4da7846294..854cffc558 100644 --- a/pkg/providers/apisix/translation/apisix_route.go +++ b/pkg/providers/apisix/translation/apisix_route.go @@ -21,7 +21,6 @@ import ( "strings" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -944,43 +943,3 @@ func (t *translator) translateOldRouteV2beta3(ar *configv2beta3.ApisixRoute) (*t } return oldCtx, nil } - -// getStreamServiceClusterIPAndPortV2beta2 is for v2beta2 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2beta2(backend configv2beta2.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index 29f989dc59..772e068d84 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -334,30 +334,6 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*t return ctx, nil } -func (t *translator) translateDefaultUpstreamFromIngressV1(namespace string, backend *networkingv1.IngressServiceBackend) *apisixv1.Upstream { - var portNumber int32 - if backend.Port.Name != "" { - svc, err := t.ServiceLister.Services(namespace).Get(backend.Name) - if err != nil { - portNumber = 0 - } else { - for _, port := range svc.Spec.Ports { - if port.Name == backend.Port.Name { - portNumber = port.Port - break - } - } - } - - } else { - portNumber = backend.Port.Number - } - ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", portNumber) - ups.ID = id.GenID(ups.Name) - return ups -} - func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() plugins := t.TranslateAnnotations(ing.Annotations) diff --git a/pkg/providers/translation/translator_test.go b/pkg/providers/translation/translator_test.go index b6d207e140..04a65f8c4e 100644 --- a/pkg/providers/translation/translator_test.go +++ b/pkg/providers/translation/translator_test.go @@ -247,8 +247,8 @@ func TestTranslateUpstreamNodes(t *testing.T) { nodes, err := tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(10080), nil) assert.Nil(t, nodes) assert.Equal(t, &TranslateError{ - Field: "service.spec.ports", - Reason: "port not defined", + Field: "service.Spec.Ports", + Reason: "service.Spec.Ports: port.Port not defined, port.Port: 10080", }, err) nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(80), nil) @@ -376,8 +376,8 @@ func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { nodes, err := tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(10080), nil) assert.Nil(t, nodes) assert.Equal(t, err, &TranslateError{ - Field: "service.spec.ports", - Reason: "port not defined", + Field: "service.Spec.Ports", + Reason: "service.Spec.Ports: port.Port not defined, port.Port: 10080", }) nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(80), nil) diff --git a/pkg/providers/translation/upstream.go b/pkg/providers/translation/upstream.go index 673c5413f2..0e191bbabe 100644 --- a/pkg/providers/translation/upstream.go +++ b/pkg/providers/translation/upstream.go @@ -39,11 +39,11 @@ func (t *translator) TranslateUpstream(namespace, name, subset, resolveGranulari if err != nil { return nil, err } - portInt32, err := t.parseServicePort(svc, port) + svcPort, err := t.parseServicePort(svc, port) if err != nil { return nil, err } - port = intstr.FromInt(int(portInt32)) + port = intstr.FromInt(int(svcPort.Port)) switch t.APIVersion { case config.ApisixV2beta3: return t.translateUpstreamV2beta3(namespace, name, subset, port, resolveGranularity) @@ -164,6 +164,8 @@ func (t *translator) translateUpstreamV2beta3(namespace, name, subset string, po return nil, err } ups.Nodes = nodes + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.ID = id.GenID(ups.Name) return ups, nil } @@ -184,18 +186,11 @@ func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port intstr.IntOr Reason: err.Error(), } } - - var svcPort *corev1.ServicePort - for _, exposePort := range svc.Spec.Ports { - if exposePort.Port == port.IntVal { - svcPort = &exposePort - break - } - } - if svcPort == nil { + svcPort, err := t.parseServicePort(svc, port) + if err != nil { return nil, &TranslateError{ - Field: "service.spec.ports", - Reason: "port not defined", + Field: "service.Spec.Ports", + Reason: err.Error(), } } // As nodes is not optional, here we create an empty slice, @@ -230,7 +225,7 @@ func (t *translator) TranslateService(svc *corev1.Service, port intstr.IntOrStri return apisixv1.UpstreamNodes{ { Host: svc.Spec.ClusterIP, - Port: int(svcPort), + Port: int(svcPort.Port), Weight: DefaultWeight, }, }, nil @@ -284,23 +279,23 @@ func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels ty return filteredNodes } -func (t *translator) parseServicePort(svc *corev1.Service, port intstr.IntOrString) (int32, error) { +func (t *translator) parseServicePort(svc *corev1.Service, port intstr.IntOrString) (*corev1.ServicePort, error) { if svc == nil { - return 0, fmt.Errorf("service does not exist") + return nil, fmt.Errorf("service does not exist") } if port.Type == intstr.String { for _, p := range svc.Spec.Ports { if p.Name == port.StrVal { - return p.Port, nil + return &p, nil } } - return 0, fmt.Errorf("service.Spec.Ports: port.Name not defined, port.Name: %s", port.StrVal) + return nil, fmt.Errorf("service.Spec.Ports: port.Name not defined, port.Name: %s", port.StrVal) } for _, p := range svc.Spec.Ports { if p.Port == port.IntVal { - return p.Port, nil + return &p, nil } } - return 0, fmt.Errorf("service.Spec.Ports: port.Port not defined, port.Port: %d", port.IntVal) + return nil, fmt.Errorf("service.Spec.Ports: port.Port not defined, port.Port: %d", port.IntVal) } From f385ccd9e9cb34dc3fb5954bb649c73aefbfcbe3 Mon Sep 17 00:00:00 2001 From: rongxin Date: Wed, 31 Aug 2022 09:15:59 +0800 Subject: [PATCH 4/6] update ingress tanslate upstream --- pkg/providers/ingress/translation/translator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index 772e068d84..b8da0f9192 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -142,7 +142,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*translation ) if pathRule.Backend.Service != nil { var port intstr.IntOrString - if pathRule.Backend.Service.Name != "" { + if pathRule.Backend.Service.Port.Name != "" { port = intstr.FromString(pathRule.Backend.Service.Port.Name) } else { port = intstr.FromInt(int(pathRule.Backend.Service.Port.Number)) From 94f1b0dd921606bf7c376b17e15d46c024c03756 Mon Sep 17 00:00:00 2001 From: rongxin Date: Tue, 13 Sep 2022 10:32:41 +0800 Subject: [PATCH 5/6] resolve comment --- pkg/providers/apisix/apisix_upstream.go | 7 ++----- .../apisix/translation/apisix_plugin_test.go | 2 +- pkg/providers/translation/apisix_upstream.go | 15 +++++++-------- pkg/providers/translation/translator.go | 13 ++++++------- pkg/providers/translation/translator_test.go | 15 ++++++++------- pkg/providers/translation/upstream.go | 19 +++++++------------ 6 files changed, 31 insertions(+), 40 deletions(-) diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go index b30c3430e0..85126031c3 100644 --- a/pkg/providers/apisix/apisix_upstream.go +++ b/pkg/providers/apisix/apisix_upstream.go @@ -278,15 +278,14 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) return err } - var newUps *apisixv1.Upstream + newUps := apisixv1.NewDefaultUpstream() if au.Spec != nil && ev.Type != types.EventDelete { cfg, ok := portLevelSettings[port.Port] if !ok { cfg = &au.Spec.ApisixUpstreamConfig } // FIXME Same ApisixUpstreamConfig might be translated multiple times. - newUps, err = c.translator.TranslateUpstreamConfigV2(cfg) - if err != nil { + if err = c.translator.TranslateUpstreamConfigV2(cfg, newUps); err != nil { log.Errorw("ApisixUpstream conversion cannot be completed, or the format is incorrect", zap.Any("object", au), zap.Error(err), @@ -295,8 +294,6 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) return err } - } else { - newUps = apisixv1.NewDefaultUpstream() } newUps.Metadata = ups.Metadata diff --git a/pkg/providers/apisix/translation/apisix_plugin_test.go b/pkg/providers/apisix/translation/apisix_plugin_test.go index b0f1f11f86..2e24c4d62c 100644 --- a/pkg/providers/apisix/translation/apisix_plugin_test.go +++ b/pkg/providers/apisix/translation/apisix_plugin_test.go @@ -546,7 +546,7 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) { cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, cfg) assert.NotNil(t, err) - assert.Contains(t, err.Error(), "service.Spec.Ports: port.Name not defined") + assert.Contains(t, err.Error(), "service.Spec.Ports: port port-not-found not found") backends[1].ServicePort.StrVal = "port2" backends[1].ResolveGranularity = "service" diff --git a/pkg/providers/translation/apisix_upstream.go b/pkg/providers/translation/apisix_upstream.go index 352f0eea20..acc74c13f0 100644 --- a/pkg/providers/translation/apisix_upstream.go +++ b/pkg/providers/translation/apisix_upstream.go @@ -42,24 +42,23 @@ func (t *translator) TranslateUpstreamConfigV2beta3(au *configv2beta3.ApisixUpst return ups, nil } -func (t *translator) TranslateUpstreamConfigV2(au *configv2.ApisixUpstreamConfig) (*apisixv1.Upstream, error) { - ups := apisixv1.NewDefaultUpstream() +func (t *translator) TranslateUpstreamConfigV2(au *configv2.ApisixUpstreamConfig, ups *apisixv1.Upstream) error { if err := t.translateUpstreamScheme(au.Scheme, ups); err != nil { - return nil, err + return err } if err := t.translateUpstreamLoadBalancerV2(au.LoadBalancer, ups); err != nil { - return nil, err + return err } if err := t.translateUpstreamHealthCheckV2(au.HealthCheck, ups); err != nil { - return nil, err + return err } if err := t.translateUpstreamRetriesAndTimeoutV2(au.Retries, au.Timeout, ups); err != nil { - return nil, err + return err } if err := t.translateClientTLSV2(au.TLSSecret, ups); err != nil { - return nil, err + return err } - return ups, nil + return nil } func (t *translator) translateUpstreamRetriesAndTimeoutV2beta3(retries *int, timeout *configv2beta3.UpstreamTimeout, ups *apisixv1.Upstream) error { diff --git a/pkg/providers/translation/translator.go b/pkg/providers/translation/translator.go index 464c6a536e..38154da61a 100644 --- a/pkg/providers/translation/translator.go +++ b/pkg/providers/translation/translator.go @@ -48,12 +48,11 @@ type Translator interface { TranslateUpstreamConfigV2beta3(*configv2beta3.ApisixUpstreamConfig) (*apisixv1.Upstream, error) // TranslateUpstreamConfigV2 translates ApisixUpstreamConfig (part of ApisixUpstream) // to APISIX Upstream, it doesn't fill the the Upstream metadata and nodes. - TranslateUpstreamConfigV2(*configv2.ApisixUpstreamConfig) (*apisixv1.Upstream, error) - // TranslateService translate Service resources to APISIX Upstream nodes (cluster ip) - // upstream nodes. + TranslateUpstreamConfigV2(*configv2.ApisixUpstreamConfig, *apisixv1.Upstream) error + // TranslateService translates the K8s Service to APISIX Upstream nodes (cluster ip) TranslateService(*corev1.Service, intstr.IntOrString) (apisixv1.UpstreamNodes, error) // TranslateEndpoint translate Endpoints resources to APISIX Upstream nodes (pod ip) - // according to the give port. Extra labels can be passed to filter the ultimate + // according to the given port. Extra labels can be passed to filter the ultimate // upstream nodes. TranslateEndpoint(kube.Endpoint, intstr.IntOrString, types.Labels) (apisixv1.UpstreamNodes, error) // TranslateUpstream composes an upstream according to the @@ -65,10 +64,10 @@ type Translator interface { // matching the subset labels (defined in ApisixUpstream) will be selected. // When the subset is not found, the node list will be empty. When the subset is empty, // all pods IP will be filled. - // Resolvegranularity is used to resolve the granularity of the service. It supports service/endpoint. + // ResolveGranularity determines the granularity of the generated nodes of upstream. It supports service/endpoint. TranslateUpstream(namespace string, name string, subset string, resolveGranularity string, port intstr.IntOrString) (*apisixv1.Upstream, error) - // TranslateUpstreamNodes translate Endpoints resources to APISIX Upstream nodes - // according to the give port. Extra labels can be passed to filter the ultimate + // TranslateUpstreamNodes translates the K8s Endpoint to APISIX Upstream nodes + // according to the given port. Extra labels can be passed to filter the ultimate // upstream nodes. TranslateUpstreamNodes(namespace string, name string, resolveGranularity string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) } diff --git a/pkg/providers/translation/translator_test.go b/pkg/providers/translation/translator_test.go index 04a65f8c4e..d2b65910cb 100644 --- a/pkg/providers/translation/translator_test.go +++ b/pkg/providers/translation/translator_test.go @@ -107,7 +107,8 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { Scheme: apisixv1.SchemeGRPC, } - ups, err := tr.TranslateUpstreamConfigV2(au) + ups := apisixv1.NewDefaultUpstream() + err := tr.TranslateUpstreamConfigV2(au, ups) assert.Nil(t, err, "checking upstream config translating") assert.Equal(t, apisixv1.LbRoundRobin, ups.Type) assert.Equal(t, apisixv1.SchemeGRPC, ups.Scheme) @@ -120,7 +121,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { }, Scheme: apisixv1.SchemeHTTP, } - ups, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Nil(t, err, "checking upstream config translating") assert.Equal(t, apisixv1.LbConsistentHash, ups.Type) assert.Equal(t, "user-agent", ups.Key) @@ -135,7 +136,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { }, Scheme: "dns", } - _, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Error(t, err, &TranslateError{ Field: "scheme", Reason: "invalid value", @@ -146,7 +147,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { Type: "hash", }, } - _, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Error(t, err, &TranslateError{ Field: "loadbalancer.type", Reason: "invalid value", @@ -158,7 +159,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { HashOn: "arg", }, } - _, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Error(t, err, &TranslateError{ Field: "loadbalancer.hashOn", Reason: "invalid value", @@ -248,7 +249,7 @@ func TestTranslateUpstreamNodes(t *testing.T) { assert.Nil(t, nodes) assert.Equal(t, &TranslateError{ Field: "service.Spec.Ports", - Reason: "service.Spec.Ports: port.Port not defined, port.Port: 10080", + Reason: "service.Spec.Ports: port 10080 not found", }, err) nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(80), nil) @@ -377,7 +378,7 @@ func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { assert.Nil(t, nodes) assert.Equal(t, err, &TranslateError{ Field: "service.Spec.Ports", - Reason: "service.Spec.Ports: port.Port not defined, port.Port: 10080", + Reason: "service.Spec.Ports: port 10080 not found", }) nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(80), nil) diff --git a/pkg/providers/translation/upstream.go b/pkg/providers/translation/upstream.go index 0e191bbabe..c5d810fe4d 100644 --- a/pkg/providers/translation/upstream.go +++ b/pkg/providers/translation/upstream.go @@ -56,7 +56,7 @@ func (t *translator) TranslateUpstream(namespace, name, subset, resolveGranulari func (t *translator) translateUpstreamV2(namespace, name, subset string, port intstr.IntOrString, resolveGranularity string) (*apisixv1.Upstream, error) { ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal, resolveGranularity) ups.ID = id.GenID(ups.Name) au, err := t.ApisixUpstreamLister.V2(namespace, name) @@ -85,12 +85,11 @@ func (t *translator) translateUpstreamV2(namespace, name, subset string, port in } } // Filter nodes by subset. - nodes, err := t.TranslateUpstreamNodes(namespace, name, resolveGranularity, port, labels) + ups.Nodes, err = t.TranslateUpstreamNodes(namespace, name, resolveGranularity, port, labels) if err != nil { return nil, err } if au == nil || au.V2().Spec == nil { - ups.Nodes = nodes return ups, nil } @@ -101,19 +100,15 @@ func (t *translator) translateUpstreamV2(namespace, name, subset string, port in break } } - ups, err = t.TranslateUpstreamConfigV2(upsCfg) - if err != nil { + if err := t.TranslateUpstreamConfigV2(upsCfg, ups); err != nil { return nil, err } - ups.Nodes = nodes - ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) - ups.ID = id.GenID(ups.Name) return ups, nil } func (t *translator) translateUpstreamV2beta3(namespace, name, subset string, port intstr.IntOrString, resolveGranularity string) (*apisixv1.Upstream, error) { ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal, resolveGranularity) ups.ID = id.GenID(ups.Name) au, err := t.ApisixUpstreamLister.V2beta3(namespace, name) @@ -164,7 +159,7 @@ func (t *translator) translateUpstreamV2beta3(namespace, name, subset string, po return nil, err } ups.Nodes = nodes - ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal) + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal, resolveGranularity) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -289,13 +284,13 @@ func (t *translator) parseServicePort(svc *corev1.Service, port intstr.IntOrStri return &p, nil } } - return nil, fmt.Errorf("service.Spec.Ports: port.Name not defined, port.Name: %s", port.StrVal) + return nil, fmt.Errorf("service.Spec.Ports: port %s not found", port.StrVal) } for _, p := range svc.Spec.Ports { if p.Port == port.IntVal { return &p, nil } } - return nil, fmt.Errorf("service.Spec.Ports: port.Port not defined, port.Port: %d", port.IntVal) + return nil, fmt.Errorf("service.Spec.Ports: port %d not found", port.IntVal) } From 0e08fb6ae1d128e988882a0263dd35bc1bcd3cc6 Mon Sep 17 00:00:00 2001 From: rongxin Date: Tue, 13 Sep 2022 10:38:37 +0800 Subject: [PATCH 6/6] update TranslateEndpoint --- pkg/providers/translation/translator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/providers/translation/translator.go b/pkg/providers/translation/translator.go index 38154da61a..553bcf38f1 100644 --- a/pkg/providers/translation/translator.go +++ b/pkg/providers/translation/translator.go @@ -51,7 +51,7 @@ type Translator interface { TranslateUpstreamConfigV2(*configv2.ApisixUpstreamConfig, *apisixv1.Upstream) error // TranslateService translates the K8s Service to APISIX Upstream nodes (cluster ip) TranslateService(*corev1.Service, intstr.IntOrString) (apisixv1.UpstreamNodes, error) - // TranslateEndpoint translate Endpoints resources to APISIX Upstream nodes (pod ip) + // TranslateEndpoint translates the K8s Endpoint to APISIX Upstream nodes (pod ip) // according to the given port. Extra labels can be passed to filter the ultimate // upstream nodes. TranslateEndpoint(kube.Endpoint, intstr.IntOrString, types.Labels) (apisixv1.UpstreamNodes, error)