Skip to content

Commit

Permalink
Merge pull request #90 from fedepaol/fix/upstreambugs
Browse files Browse the repository at this point in the history
OCPBUGS-2373: Upstream Align
  • Loading branch information
openshift-merge-robot authored Oct 25, 2022
2 parents cb8e478 + 1d9d8c1 commit 388ac00
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 83 deletions.
67 changes: 0 additions & 67 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,70 +1016,3 @@ func TestControllerDualStackConfig(t *testing.T) {
t.Fatalf("SetPools that deletes the config was accepted")
}
}

func TestControllerSvcAddressSharing(t *testing.T) {
k := &testK8S{t: t}
c := &controller{
ips: allocator.New(),
client: k,
}
l := log.NewNopLogger()

// Set a config with some IPs. Still no allocation, not synced.
pool := map[string]*config.Pool{
"default": {
AutoAssign: true,
CIDR: []*net.IPNet{ipnet("4.5.6.0/24")},
},
}
if c.SetPools(l, pool) == controllers.SyncStateError {
t.Fatalf("SetPools failed")
}

// Configure svc1
svc1 := &v1.Service{
Spec: v1.ServiceSpec{
Type: "LoadBalancer",
ClusterIPs: []string{"4.5.6.1"},
ExternalTrafficPolicy: "Local",
Ports: []v1.ServicePort{
{
Port: 3000,
Protocol: v1.ProtocolTCP,
},
},
},
}

if c.SetBalancer(l, "test1", svc1, epslices.EpsOrSlices{}) == controllers.SyncStateError {
t.Fatalf("SetBalancer failed")
}
gotSvc1 := k.gotService(svc1)
if len(gotSvc1.Status.LoadBalancer.Ingress) == 0 || gotSvc1.Status.LoadBalancer.Ingress[0].IP != "4.5.6.0" {
t.Fatal("svc1 didn't get an IP")
}
k.reset()
// Configure svc2 with IP sharing with svc1 and different port/protocol/ETP
svc2 := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"metallb.universe.tf/allow-shared-ip": "share",
},
},
Spec: v1.ServiceSpec{
Type: "LoadBalancer",
ClusterIPs: []string{"4.5.6.1"},
ExternalTrafficPolicy: "Cluster",
Ports: []v1.ServicePort{
{
Port: 1000,
Protocol: v1.ProtocolUDP,
},
},
},
Status: statusAssigned([]string{"4.5.6.0"}),
}
if c.SetBalancer(l, "test2", svc2, epslices.EpsOrSlices{}) != controllers.SyncStateError {
t.Fatalf("SetBalancer did not fail")
}
}
13 changes: 10 additions & 3 deletions controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,20 @@ func (c *controller) SetBalancer(l log.Logger, name string, svcRo *v1.Service, _
// copy makes the code much easier to follow, and we have a GC for
// a reason.
svc := svcRo.DeepCopy()
successRes := controllers.SyncStateSuccess
wasAllocated := c.isServiceAllocated(name)
if !c.convergeBalancer(l, name, svc) {
return controllers.SyncStateError
}
if wasAllocated && !c.isServiceAllocated(name) { // convergeBalancer may deallocate our service and this means it did it.
// if the service was deallocated, it may have have left room
// for another one, so we reprocess
level.Info(l).Log("event", "serviceUpdated", "msg", "removed loadbalancer from service, services will be reprocessed")
successRes = controllers.SyncStateReprocessAll
}
if reflect.DeepEqual(svcRo, svc) {
level.Debug(l).Log("event", "noChange", "msg", "service converged, no change")
return controllers.SyncStateSuccess
return successRes
}

if !reflect.DeepEqual(svcRo.Status, svc.Status) {
Expand All @@ -88,8 +96,7 @@ func (c *controller) SetBalancer(l log.Logger, name string, svcRo *v1.Service, _
}
}
level.Info(l).Log("event", "serviceUpdated", "msg", "updated service object")

return controllers.SyncStateSuccess
return successRes
}

func (c *controller) deleteBalancer(l log.Logger, name string) {
Expand Down
15 changes: 5 additions & 10 deletions controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package main

import (
"errors"
"fmt"
"net"
"reflect"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/go-kit/log/level"
v1 "k8s.io/api/core/v1"

"go.universe.tf/metallb/internal/allocator"
"go.universe.tf/metallb/internal/allocator/k8salloc"
"go.universe.tf/metallb/internal/ipfamily"
)
Expand All @@ -41,7 +39,7 @@ func (c *controller) convergeBalancer(l log.Logger, key string, svc *v1.Service)
var err error
// Not a LoadBalancer, early exit. It might have been a balancer
// in the past, so we still need to clear LB state.
if svc.Spec.Type != "LoadBalancer" {
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
level.Debug(l).Log("event", "clearAssignment", "reason", "notLoadBalancer", "msg", "not a LoadBalancer")
c.clearServiceState(key, svc)
// Early return, we explicitly do *not* want to reallocate
Expand Down Expand Up @@ -98,13 +96,6 @@ func (c *controller) convergeBalancer(l log.Logger, key string, svc *v1.Service)
if err = c.ips.Assign(key, lbIPs, k8salloc.Ports(svc), k8salloc.SharingKey(svc), k8salloc.BackendKey(svc)); err != nil {
level.Info(l).Log("event", "clearAssignment", "error", err, "msg", "current IP not allowed by config, clearing")
c.clearServiceState(key, svc)
// Check if we cannot assign IP because services were sharing IP using
// "allow-shared-ip" annotation and one of them changed so instead of allocating
// new service IP we fail.
if errors.Is(err, allocator.ErrCannotShareKey) {
c.client.Errorf(svc, "svcCannotShareKey", "current IP not allowed by config:%s", err)
return false
}
lbIPs = []net.IP{}
}

Expand Down Expand Up @@ -220,6 +211,10 @@ func (c *controller) allocateIPs(key string, svc *v1.Service) ([]net.IP, error)
return c.ips.Allocate(key, serviceIPFamily, k8salloc.Ports(svc), k8salloc.SharingKey(svc), k8salloc.BackendKey(svc))
}

func (c *controller) isServiceAllocated(key string) bool {
return c.ips.Pool(key) != ""
}

func getDesiredLbIPs(svc *v1.Service) ([]net.IP, ipfamily.Family, error) {
var desiredLbIPs []net.IP
desiredLbIPsStr := svc.Annotations[annotationLoadBalancerIPs]
Expand Down
119 changes: 119 additions & 0 deletions e2etest/l2tests/assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// SPDX-License-Identifier:Apache-2.0

package l2tests

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
"github.com/onsi/gomega"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
"go.universe.tf/metallb/e2etest/pkg/config"
"go.universe.tf/metallb/e2etest/pkg/k8s"
"go.universe.tf/metallb/e2etest/pkg/service"
internalconfig "go.universe.tf/metallb/internal/config"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
admissionapi "k8s.io/pod-security-admission/api"
)

var _ = ginkgo.Describe("IP Assignment", func() {
var cs clientset.Interface

var f *framework.Framework
ginkgo.AfterEach(func() {
if ginkgo.CurrentGinkgoTestDescription().Failed {
k8s.DumpInfo(Reporter, ginkgo.CurrentGinkgoTestDescription().TestText)
}

// Clean previous configuration.
err := ConfigUpdater.Clean()
framework.ExpectNoError(err)
})

f = framework.NewDefaultFramework("assignment")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged

ginkgo.BeforeEach(func() {
cs = f.ClientSet

ginkgo.By("Clearing any previous configuration")
err := ConfigUpdater.Clean()
framework.ExpectNoError(err)
})

ginkgo.Context("IPV4 Assignment", func() {
table.DescribeTable("should remove the ip from a service assign it to a free one when", func(modify func(svc *v1.Service) error) {
ip, err := config.GetIPFromRangeByIndex(IPV4ServiceRange, 0)
framework.ExpectNoError(err)

resources := internalconfig.ClusterResources{
Pools: []metallbv1beta1.IPAddressPool{
{
ObjectMeta: metav1.ObjectMeta{
Name: "singleip-pool",
},
Spec: metallbv1beta1.IPAddressPoolSpec{
Addresses: []string{
fmt.Sprintf("%s/32", ip),
},
},
},
},
}
err = ConfigUpdater.Update(resources)
framework.ExpectNoError(err)

jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "singleip")
svc, err := jig.CreateLoadBalancerService(10*time.Second, service.TrafficPolicyCluster)
framework.ExpectNoError(err)

ginkgo.By("Creating another service")
svc1, err := jig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Name = "singleip1"
})
framework.ExpectNoError(err)
gomega.Consistently(func() int {
s, err := cs.CoreV1().Services(svc1.Namespace).Get(context.Background(), svc1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
return len(s.Status.LoadBalancer.Ingress)
}, 5*time.Second, 1*time.Second).Should(gomega.BeZero())

err = modify(svc)
framework.ExpectNoError(err)

ginkgo.By("Changing the service type so the ip is free to be used again")
framework.ExpectNoError(err)

ginkgo.By("Checking the second service gets the ip assigned")

gomega.Eventually(func() string {
s, err := cs.CoreV1().Services(svc1.Namespace).Get(context.Background(), svc1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
if len(s.Status.LoadBalancer.Ingress) == 0 {
return ""
}
return s.Status.LoadBalancer.Ingress[0].IP
}, time.Minute, 1*time.Second).Should(gomega.Equal(ip))
},
table.Entry("changing the service type to clusterIP",
func(svc *v1.Service) error {
svc.Spec.Type = v1.ServiceTypeClusterIP
_, err := cs.CoreV1().Services(svc.Namespace).Update(context.Background(), svc, metav1.UpdateOptions{})
return err
}),
table.Entry("deleting the service",
func(svc *v1.Service) error {
err := cs.CoreV1().Services(svc.Namespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{})
return err
}))
})
})
4 changes: 1 addition & 3 deletions internal/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func New() *Allocator {
}
}

var ErrCannotShareKey = errors.New("services can't share key")

// SetPools updates the set of address pools that the allocator owns.
func (a *Allocator) SetPools(pools map[string]*config.Pool) error {
// All the fancy sharing stuff only influences how new allocations
Expand Down Expand Up @@ -419,7 +417,7 @@ func (a *Allocator) checkSharing(svc string, ip string, ports []Port, sk *key) e
}
}
if len(otherSvcs) > 0 {
return fmt.Errorf("can't change sharing key for %q, address also in use by %s: %w", svc, strings.Join(otherSvcs, ","), ErrCannotShareKey)
return fmt.Errorf("can't change sharing key for %q, address also in use by %s", svc, strings.Join(otherSvcs, ","))
}
}

Expand Down

0 comments on commit 388ac00

Please sign in to comment.