Skip to content

Commit

Permalink
Return previous allocation for add cmd
Browse files Browse the repository at this point in the history
This allows the cni to return a previous allocation
for a pod with the same podRef and interface name. This
is needed on networks with limited IPs.

Signed-off-by: Marcelo Guerrero <[email protected]>
  • Loading branch information
mlguerrero12 committed Jun 21, 2024
1 parent 45029aa commit dea1f70
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 25 deletions.
64 changes: 62 additions & 2 deletions cmd/whereabouts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,53 @@ var _ = Describe("Whereabouts operations", func() {
}()
})

It("returns a previously allocated IP", func() {
ipamNetworkName := ""
cniVersion := "0.3.1"

ipRange := "192.168.1.0/24"
ipGateway := "192.168.10.1"
expectedAddress := "192.168.1.1/24"

ipamConf := ipamConfig(podName, podNamespace, ipamNetworkName, ipRange, ipGateway, kubeConfigPath)
Expect(ipamConf.IPRanges).NotTo(BeEmpty())

wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamNetworkName, []whereaboutstypes.IPReservation{
{PodRef: ipamConf.GetPodRef(), IfName: ifname, IP: net.ParseIP(expectedAddress)}, {PodRef: "test"}}...)),
fakek8sclient.NewSimpleClientset(),
0)

cniConf, err := newCNINetConf(cniVersion, ipamConf)
Expect(err).NotTo(HaveOccurred())

args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: nspath,
IfName: ifname,
StdinData: cniConf,
Args: cniArgs(podNamespace, podName),
}
client := mutateK8sIPAM(args.ContainerID, ifname, ipamConf, wbClient)

// Allocate the IP
r, raw, err := testutils.CmdAddWithArgs(args, func() error {
return cmdAdd(client, cniVersion)
})
Expect(err).NotTo(HaveOccurred())
Expect(strings.Index(string(raw), "\"version\":")).Should(BeNumerically(">", 0))

result, err := current.GetResult(r)
Expect(err).NotTo(HaveOccurred())

ExpectWithOffset(1, *result.IPs[0]).To(Equal(
current.IPConfig{
Address: mustCIDR(expectedAddress),
Gateway: ipamConf.Gateway,
}))
})

It("allocates and releases addresses on ADD/DEL", func() {
ipRange := "192.168.1.0/24"
ipGateway := "192.168.10.1"
Expand Down Expand Up @@ -1429,19 +1476,32 @@ users:
`)
}

func ipPool(ipRange string, namespace string, networkName string) *v1alpha1.IPPool {
func ipPool(ipRange string, namespace string, networkName string, podReferences ...whereaboutstypes.IPReservation) *v1alpha1.IPPool {
return &v1alpha1.IPPool{
ObjectMeta: metav1.ObjectMeta{
Name: kubernetes.IPPoolName(kubernetes.PoolIdentifier{IpRange: ipRange, NetworkName: networkName}),
Namespace: namespace,
ResourceVersion: "1",
},
Spec: v1alpha1.IPPoolSpec{
Range: ipRange,
Range: ipRange,
Allocations: allocations(podReferences...),
},
}
}

func allocations(podReferences ...whereaboutstypes.IPReservation) map[string]v1alpha1.IPAllocation {
poolAllocations := map[string]v1alpha1.IPAllocation{}
for i, r := range podReferences {
poolAllocations[fmt.Sprintf("%d", i+1)] = v1alpha1.IPAllocation{
ContainerID: "",
PodRef: r.PodRef,
IfName: r.IfName,
}
}
return poolAllocations
}

func newCNINetConf(cniVersion string, ipamConfig *whereaboutstypes.IPAMConfig) ([]byte, error) {
netConf := whereaboutstypes.NetConfList{
CNIVersion: cniVersion,
Expand Down
126 changes: 126 additions & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,132 @@ var _ = Describe("Whereabouts functionality", func() {
})
})
})

Context("reclaim previously allocated IP", func() {
const (
namespace = "default"
networkName = "recovernet"
rangeWithTwoIPs = "10.10.0.0/30"
replicaNumber = 1
)

var podName string
var secondaryIPs []string
var ifNames = []string{"net1", "net2"}

var tinyNetwork *nettypes.NetworkAttachmentDefinition
var originalAllocations []v1alpha1.IPAllocation
var originalClusterWideAllocations []*v1alpha1.OverlappingRangeIPReservation

BeforeEach(func() {
var err error

podName = fmt.Sprintf("%s-0", serviceName)

tinyNetwork, err = clientInfo.AddNetAttachDef(
macvlanNetworkWithWhereaboutsIPAMNetwork(networkName, namespace, rangeWithTwoIPs, []string{}, wbstorage.UnnamedNetwork, true))
Expect(err).NotTo(HaveOccurred())

// Request 2 interfaces.
_, err = clientInfo.ProvisionStatefulSet(statefulSetName, namespace, serviceName, replicaNumber, networkName, networkName)
Expect(err).NotTo(HaveOccurred())

By("getting pod info")
pod, err := clientInfo.Client.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

By("verifying allocation")
for _, ifName := range ifNames {
secondaryIfaceIPs, err := retrievers.SecondaryIfaceIPValue(pod, ifName)
Expect(err).NotTo(HaveOccurred())

for _, ip := range secondaryIfaceIPs {
verifyAllocations(clientInfo, rangeWithTwoIPs, ip, namespace, podName, ifName)
}
secondaryIPs = append(secondaryIPs, secondaryIfaceIPs...)
}

By("saving initial allocations")
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(context.Background(), wbstorage.IPPoolName(wbstorage.PoolIdentifier{IpRange: rangeWithTwoIPs, NetworkName: wbstorage.UnnamedNetwork}), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

originalAllocations = allocationForPodRef(getPodRef(namespace, podName), *ipPool)
Expect(originalAllocations).To(HaveLen(2))

for _, ip := range secondaryIPs {
overlapping, err := clientInfo.WbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(ipPoolNamespace).Get(context.Background(), wbstorage.NormalizeIP(net.ParseIP(ip), wbstorage.UnnamedNetwork), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
originalClusterWideAllocations = append(originalClusterWideAllocations, overlapping)
}
})

AfterEach(func() {
Expect(clientInfo.DelNetAttachDef(tinyNetwork)).To(Succeed())
Expect(clientInfo.DeleteStatefulSet(namespace, serviceName, selector)).To(Succeed())
})

It("can reclaim the previously allocated IPs", func() {
By("checking that the IP allocation is removed when the pod is deleted")
Expect(clientInfo.ScaleStatefulSet(serviceName, namespace, -1)).To(Succeed())
verifyNoAllocationsForPodRef(clientInfo, rangeWithTwoIPs, namespace, podName, secondaryIPs)

By("adding previous allocations")
ipPool, err := clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(context.Background(), wbstorage.IPPoolName(wbstorage.PoolIdentifier{IpRange: rangeWithTwoIPs, NetworkName: wbstorage.UnnamedNetwork}), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

updatedPool := ipPool.DeepCopy()
for i, ip := range secondaryIPs {
firstIP, _, err := net.ParseCIDR(ipv4TestRange)
Expect(err).NotTo(HaveOccurred())
offset, err := iphelpers.IPGetOffset(net.ParseIP(ip), firstIP)
Expect(err).NotTo(HaveOccurred())

updatedPool.Spec.Allocations[fmt.Sprintf("%d", offset)] = originalAllocations[i]
}

_, err = clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Update(context.Background(), updatedPool, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())

for _, allocation := range originalClusterWideAllocations {
allocation.ResourceVersion = ""
_, err := clientInfo.WbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(ipPoolNamespace).Create(context.Background(), allocation, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
}

By("increasing replica count")
Expect(clientInfo.ScaleStatefulSet(serviceName, namespace, 1)).To(Succeed())
err = wbtestclient.WaitForStatefulSetCondition(context.Background(), clientInfo.Client, namespace, serviceName, replicaNumber, 1*time.Minute, wbtestclient.IsStatefulSetReadyPredicate)
Expect(err).NotTo(HaveOccurred())

By("getting pod info")
pod, err := clientInfo.Client.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

By("verifying allocation")
for _, ifName := range ifNames {
secondaryIfaceIPs, err := retrievers.SecondaryIfaceIPValue(pod, ifName)
Expect(err).NotTo(HaveOccurred())

for _, ip := range secondaryIfaceIPs {
verifyAllocations(clientInfo, rangeWithTwoIPs, ip, namespace, podName, ifName)
}
secondaryIPs = append(secondaryIPs, secondaryIfaceIPs...)
}

By("comparing with previous allocations")
ipPool, err = clientInfo.WbClient.WhereaboutsV1alpha1().IPPools(ipPoolNamespace).Get(context.Background(), wbstorage.IPPoolName(wbstorage.PoolIdentifier{IpRange: rangeWithTwoIPs, NetworkName: wbstorage.UnnamedNetwork}), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

currentAllocation := allocationForPodRef(getPodRef(namespace, podName), *ipPool)
Expect(currentAllocation).To(HaveLen(2))

for i, allocation := range currentAllocation {
Expect(allocation.ContainerID).ToNot(Equal(originalAllocations[i].ContainerID))
Expect(allocation.IfName).To(Equal(originalAllocations[i].IfName))
Expect(allocation.PodRef).To(Equal(originalAllocations[i].PodRef))
}
})
})
})

Context("OverlappingRangeIPReservation", func() {
Expand Down
13 changes: 13 additions & 0 deletions pkg/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ func AssignIP(ipamConf types.RangeConfiguration, reservelist []types.IPReservati
// Setup the basics here.
_, ipnet, _ := net.ParseCIDR(ipamConf.Range)

// Verify if podRef and ifName have already an allocation.
for i, r := range reservelist {
if r.PodRef == podRef && r.IfName == ifName {
logging.Debugf("IP already allocated for podRef: %q - ifName:%q - IP: %s", podRef, ifName, r.IP.String())
if r.ContainerID != containerID {
logging.Debugf("updating container ID: %q", containerID)
reservelist[i].ContainerID = containerID
}

return net.IPNet{IP: r.IP, Mask: ipnet.Mask}, reservelist, nil
}
}

newip, updatedreservelist, err := IterateForAssignment(*ipnet, ipamConf.RangeStart, ipamConf.RangeEnd, reservelist, ipamConf.OmitRanges, containerID, podRef, ifName)
if err != nil {
return net.IPNet{}, nil, err
Expand Down
51 changes: 29 additions & 22 deletions pkg/storage/kubernetes/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,27 +268,27 @@ func (i *KubernetesIPAM) GetOverlappingRangeStore() (storage.OverlappingRangeSto
}

// IsAllocatedInOverlappingRange checks for IP addresses to see if they're allocated cluster wide, for overlapping
// ranges.
func (c *KubernetesOverlappingRangeStore) IsAllocatedInOverlappingRange(ctx context.Context, ip net.IP,
networkName string) (bool, error) {
// ranges. First return value is true if the IP is allocated, second return value is true if the IP is allocated to the
// current podRef
func (c *KubernetesOverlappingRangeStore) GetOverlappingRangeIPReservation(ctx context.Context, ip net.IP,
podRef, networkName string) (*whereaboutsv1alpha1.OverlappingRangeIPReservation, error) {
normalizedIP := NormalizeIP(ip, networkName)

logging.Debugf("OverlappingRangewide allocation check; normalized IP: %q, IP: %q, networkName: %q",
logging.Debugf("Get overlappingRangewide allocation; normalized IP: %q, IP: %q, networkName: %q",
normalizedIP, ip, networkName)

_, err := c.client.WhereaboutsV1alpha1().OverlappingRangeIPReservations(c.namespace).Get(ctx, normalizedIP, metav1.GetOptions{})
r, err := c.client.WhereaboutsV1alpha1().OverlappingRangeIPReservations(c.namespace).Get(ctx, normalizedIP, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
// cluster ip reservation does not exist, this appears to be good news.
// logging.Debugf("IP %v is not reserved cluster wide, allowing.", ip)
return false, nil
return nil, nil
} else if err != nil {
logging.Errorf("k8s get OverlappingRangeIPReservation error: %s", err)
return false, fmt.Errorf("k8s get OverlappingRangeIPReservation error: %s", err)
return nil, fmt.Errorf("k8s get OverlappingRangeIPReservation error: %s", err)
}

logging.Debugf("Normalized IP is reserved; normalized IP: %q, IP: %q, networkName: %q",
normalizedIP, ip, networkName)
return true, nil
return r, nil
}

// UpdateOverlappingRangeAllocation updates clusterwide allocation for overlapping ranges.
Expand Down Expand Up @@ -484,6 +484,7 @@ func IPManagementKubernetesUpdate(ctx context.Context, mode int, ipam *Kubernete
// handle the ip add/del until successful
var overlappingrangeallocations []whereaboutstypes.IPReservation
var ipforoverlappingrangeupdate net.IP
skipOverlappingRangeUpdate := false
for _, ipRange := range ipamConf.IPRanges {
RETRYLOOP:
for j := 0; j < storage.DatastoreRetries; j++ {
Expand Down Expand Up @@ -523,18 +524,22 @@ func IPManagementKubernetesUpdate(ctx context.Context, mode int, ipam *Kubernete
// When it's allocated overlappingrange wide, we add it to a local reserved list
// And we try again.
if ipamConf.OverlappingRanges {
isAllocated, err := overlappingrangestore.IsAllocatedInOverlappingRange(requestCtx, newip.IP,
ipamConf.NetworkName)
overlappingRangeIPReservation, err := overlappingrangestore.GetOverlappingRangeIPReservation(requestCtx, newip.IP,
ipamConf.GetPodRef(), ipamConf.NetworkName)
if err != nil {
logging.Errorf("Error checking overlappingrange allocation: %v", err)
logging.Errorf("Error getting cluster wide IP allocation: %v", err)
return newips, err
}

if isAllocated {
logging.Debugf("Continuing loop, IP is already allocated (possibly from another range): %v", newip)
// We create "dummy" records here for evaluation, but, we need to filter those out later.
overlappingrangeallocations = append(overlappingrangeallocations, whereaboutstypes.IPReservation{IP: newip.IP, IsAllocated: true})
continue
if overlappingRangeIPReservation != nil {
if overlappingRangeIPReservation.Spec.PodRef != ipamConf.GetPodRef() {
logging.Debugf("Continuing loop, IP is already allocated (possibly from another range): %v", newip)
// We create "dummy" records here for evaluation, but, we need to filter those out later.
overlappingrangeallocations = append(overlappingrangeallocations, whereaboutstypes.IPReservation{IP: newip.IP, IsAllocated: true})
continue
}

skipOverlappingRangeUpdate = true
}

ipforoverlappingrangeupdate = newip.IP
Expand Down Expand Up @@ -574,11 +579,13 @@ func IPManagementKubernetesUpdate(ctx context.Context, mode int, ipam *Kubernete
}

if ipamConf.OverlappingRanges {
err = overlappingrangestore.UpdateOverlappingRangeAllocation(requestCtx, mode, ipforoverlappingrangeupdate,
ipamConf.GetPodRef(), ipam.IfName, ipamConf.NetworkName)
if err != nil {
logging.Errorf("Error performing UpdateOverlappingRangeAllocation: %v", err)
return newips, err
if !skipOverlappingRangeUpdate {
err = overlappingrangestore.UpdateOverlappingRangeAllocation(requestCtx, mode, ipforoverlappingrangeupdate,
ipamConf.GetPodRef(), ipam.IfName, ipamConf.NetworkName)
if err != nil {
logging.Errorf("Error performing UpdateOverlappingRangeAllocation: %v", err)
return newips, err
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1"
"net"
"time"

Expand Down Expand Up @@ -33,7 +34,7 @@ type Store interface {

// OverlappingRangeStore is an interface for wrapping overlappingrange storage options
type OverlappingRangeStore interface {
IsAllocatedInOverlappingRange(ctx context.Context, ip net.IP, networkName string) (bool, error)
GetOverlappingRangeIPReservation(ctx context.Context, ip net.IP, podRef, networkName string) (*v1alpha1.OverlappingRangeIPReservation, error)
UpdateOverlappingRangeAllocation(ctx context.Context, mode int, ip net.IP, podRef, ifName, networkName string) error
}

Expand Down

0 comments on commit dea1f70

Please sign in to comment.