Skip to content

Commit

Permalink
ip reconciler: add a control loop
Browse files Browse the repository at this point in the history
Listen to pod deletion, and for every deleted pod, assure their IPs
are gone.

The rough algorithm goes like this:
  - for every network-status in the pod's annotations:
    - read associated net-attach-def from the k8s API
    - extract the range from the net-attach-def
    - find the corresponding IP pool
    - look for allocations belonging to the deleted pod
    - delete them using `IPManagement(..., types.Deallocate, ...)`

Signed-off-by: Miguel Duarte Barroso <[email protected]>
  • Loading branch information
maiqueb committed Feb 4, 2022
1 parent 5e6a51e commit 7dc4dab
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 6 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13
FROM golang:1.17
ADD . /usr/src/whereabouts
RUN mkdir -p $GOPATH/src/github.com/k8snetworkplumbingwg/whereabouts
WORKDIR $GOPATH/src/github.com/k8snetworkplumbingwg/whereabouts
Expand All @@ -8,6 +8,7 @@ RUN ./hack/build-go.sh
FROM alpine:latest
LABEL org.opencontainers.image.source https://github.com/k8snetworkplumbingwg/whereabouts
COPY --from=0 /go/src/github.com/k8snetworkplumbingwg/whereabouts/bin/whereabouts .
COPY --from=0 /go/src/github.com/k8snetworkplumbingwg/whereabouts/bin/ip-control-loop .
COPY --from=0 /go/src/github.com/k8snetworkplumbingwg/whereabouts/bin/ip-reconciler .
COPY script/install-cni.sh .
CMD ["/install-cni.sh"]
93 changes: 93 additions & 0 deletions cmd/controlloop/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"flag"
"fmt"
"os"
"os/signal"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"

nadclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler/controlloop"
)

const (
allNamespaces = ""
controllerName = "pod-ip-reconciler"
)

const (
couldNotCreateController = 1
)

const (
defaultLogLevel = "debug"
)

func main() {
logLevel := flag.String("log-level", defaultLogLevel, "Specify the pod controller application logging level")
if logLevel != nil && *logLevel != defaultLogLevel {
logging.SetLogLevel(*logLevel)
}

stopChan := make(chan struct{})
defer close(stopChan)
handleSignals(stopChan, os.Interrupt)

networkController, err := newPodController(stopChan)
if err != nil {
_ = logging.Errorf("could not create the pod networks controller: %v", err)
os.Exit(couldNotCreateController)
}

networkController.Start(stopChan)
<-stopChan
}

func handleSignals(stopChannel chan struct{}, signals ...os.Signal) {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, signals...)
go func() {
<-signalChannel
stopChannel <- struct{}{}
}()
}

func newPodController(stopChannel chan struct{}) (*controlloop.PodController, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to implicitly generate the kubeconfig: %w", err)
}

k8sClientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create the Kubernetes client: %w", err)
}

nadK8sClientSet, err := nadclient.NewForConfig(cfg)
if err != nil {
return nil, err
}

eventBroadcaster := newEventBroadcaster(k8sClientSet)
return controlloop.NewPodController(k8sClientSet, nadK8sClientSet, eventBroadcaster, newEventRecorder(eventBroadcaster), stopChannel)
}

func newEventBroadcaster(k8sClientset kubernetes.Interface) record.EventBroadcaster {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logging.Verbosef)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClientset.CoreV1().Events(allNamespaces)})
return eventBroadcaster
}

func newEventRecorder(broadcaster record.EventBroadcaster) record.EventRecorder {
return broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
}
12 changes: 12 additions & 0 deletions doc/crds/daemonset-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ rules:
- pods
verbs:
- list
- watch
- apiGroups: ["k8s.cni.cncf.io"]
resources:
- network-attachment-definitions
verbs:
- get
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down Expand Up @@ -78,6 +84,12 @@ spec:
effect: NoSchedule
containers:
- name: whereabouts
command: [ "/bin/sh" ]
args:
- -c
- >
SLEEP=false /install-cni.sh &&
/ip-control-loop -log-level debug
image: ghcr.io/k8snetworkplumbingwg/whereabouts:latest-amd64
env:
- name: WHEREABOUTS_NAMESPACE
Expand Down
1 change: 1 addition & 0 deletions hack/build-go.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ GLDFLAGS="${GLDFLAGS} ${VERSION_LDFLAGS}"

CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} ${GO} build ${GOFLAGS} -ldflags "${GLDFLAGS}" -o bin/${cmd} cmd/${cmd}.go
CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} ${GO} build ${GOFLAGS} -ldflags "${GLDFLAGS}" -o bin/ip-reconciler cmd/reconciler/*.go
CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} ${GO} build ${GOFLAGS} -ldflags "${GLDFLAGS}" -o bin/ip-control-loop cmd/controlloop/*.go
3 changes: 2 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func canonicalizeIP(ip *net.IP) error {
// LoadIPAMConfig creates IPAMConfig using json encoded configuration provided
// as `bytes`. At the moment values provided in envArgs are ignored so there
// is no possibility to overload the json configuration using envArgs
func LoadIPAMConfig(bytes []byte, envArgs string) (*types.IPAMConfig, string, error) {
func LoadIPAMConfig(bytes []byte, envArgs string, extraConfigPaths ...string) (*types.IPAMConfig, string, error) {

// We first load up what we already have, before we start reading a file...
n := types.Net{
Expand All @@ -57,6 +57,7 @@ func LoadIPAMConfig(bytes []byte, envArgs string) (*types.IPAMConfig, string, er

// Once we have our basics, let's look for our (optional) configuration file
confdirs := []string{"/etc/kubernetes/cni/net.d/whereabouts.d/whereabouts.conf", "/etc/cni/net.d/whereabouts.d/whereabouts.conf"}
confdirs = append(confdirs, extraConfigPaths...)
// We prefix the optional configuration path (so we look there first)
if n.IPAM.ConfigurationPath != "" {
confdirs = append([]string{n.IPAM.ConfigurationPath}, confdirs...)
Expand Down
160 changes: 160 additions & 0 deletions pkg/reconciler/controlloop/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package controlloop

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
nadclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1"

whereaboutsv1alpha1 "github.com/k8snetworkplumbingwg/whereabouts/pkg/api/v1alpha1"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/config"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/types"
)

type handler struct {
nadClientSet nadclient.K8sCniCncfIoV1Interface
}

func (h *handler) deletePodHandler(obj interface{}) {
logging.Verbosef("handler called")
oldPod := obj.(*v1.Pod)
if oldPod == nil {
_ = logging.Errorf("pod deleted but could not unmarshall into struct: %v", obj)
return
}

podNamespace := oldPod.GetNamespace()
podName := oldPod.GetName()
logging.Verbosef("pod [%s] deleted", podID(podNamespace, podName))

ifaceStatuses, err := podNetworkStatus(oldPod)
if err != nil {
logging.Errorf("failed to access the network status for pod [%s/%s]: %v", podName, podNamespace, err)
return
}

k8sClient, err := wbclient.NewClient()
if err != nil {
logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
return
}

for _, ifaceStatus := range ifaceStatuses {
if ifaceStatus.Default {
logging.Verbosef("skipped net-attach-def for default network")
continue
}
nad, err := h.ifaceNetAttachDef(ifaceStatus)
if err != nil {
logging.Errorf("failed to get network-attachment-definition for iface %s: %+v", ifaceStatus.Name, err)
return
}

logging.Verbosef("the NAD's config: %s", nad.Spec)
ipamConfig, err := ipamConfiguration(nad, podNamespace, podName, ifaceStatus.Name)
if err != nil {
logging.Errorf("failed to create an IPAM configuration for the pod %s iface %s: %+v", podID(podNamespace, podName), ifaceStatus.Name, err)
return
}

pool, err := ipPool(k8sClient, ipamConfig.Range)
if err != nil {
logging.Errorf("failed to get the IPPool data: %+v", err)
return
}

logging.Verbosef("pool's [%s]", pool.Spec.Range)
for _, allocation := range pool.Spec.Allocations {
if err := removeStaleIPAllocation(allocation, podNamespace, podName, ipamConfig); err != nil {
logging.Errorf("failed to remove the allocation %v: %v", allocation, err)
return
}
}
}
}

func (h *handler) ifaceNetAttachDef(ifaceStatus nadv1.NetworkStatus) (*nadv1.NetworkAttachmentDefinition, error) {
const (
namespaceIndex = 0
nameIndex = 1
)

logging.Debugf("pod's network status: %+v", ifaceStatus)
ifaceInfo := strings.Split(ifaceStatus.Name, "/")
if len(ifaceInfo) < 2 {
return nil, fmt.Errorf("pod %s name does not feature namespace/pod name syntax", ifaceStatus.Name)
}

netNamespaceName := ifaceInfo[namespaceIndex]
netName := ifaceInfo[nameIndex]
nad, err := h.nadClientSet.NetworkAttachmentDefinitions(netNamespaceName).Get(context.TODO(), netName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return nad, nil
}

func podNetworkStatus(pod *v1.Pod) ([]nadv1.NetworkStatus, error) {
var ifaceStatuses []nadv1.NetworkStatus
networkStatus, found := pod.Annotations[nadv1.NetworkStatusAnnot]
if found {
if err := json.Unmarshal([]byte(networkStatus), &ifaceStatuses); err != nil {
return nil, err
}
}
return ifaceStatuses, nil
}

func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace string, podName string, ifaceName string) (*types.IPAMConfig, error) {
const mounterWhereaboutsConfigFilePath = "/host/etc/cni/net.d/whereabouts.d/whereabouts.conf"

ipamConfig, _, err := config.LoadIPAMConfig([]byte(nad.Spec.Config), "", mounterWhereaboutsConfigFilePath)
if err != nil {
return nil, err
}
ipamConfig.PodName = podName
ipamConfig.PodNamespace = podNamespace
ipamConfig.Kubernetes.KubeConfigPath = "/host" + ipamConfig.Kubernetes.KubeConfigPath // must use the mount path

return ipamConfig, nil
}

func ipPool(k8sClient *wbclient.Client, cidr string) (*whereaboutsv1alpha1.IPPool, error) {
pool, err := k8sClient.IPPool(
context.TODO(),
ipPoolsNamespace(),
wbclient.NormalizeRange(cidr))
if err != nil {
return nil, err
}
return pool, nil
}

func removeStaleIPAllocation(allocation whereaboutsv1alpha1.IPAllocation, podNamespace string, podName string, ipamConfig *types.IPAMConfig) error {
logging.Verbosef("allocation|| %+v", allocation)
if allocation.PodRef == podID(podNamespace, podName) {
logging.Verbosef("SHOULD DELETE %+v", allocation)

if _, err := wbclient.IPManagement(context.TODO(), types.Deallocate, *ipamConfig, allocation.ContainerID, podID(podNamespace, podName)); err != nil {
return err
}
}
return nil
}

func ipPoolsNamespace() string {
if wbNamespace, found := os.LookupEnv("WHEREABOUTS_NAMESPACE"); found {
return wbNamespace
}
const wbDefaultNamespace = "kube-system"
return wbDefaultNamespace
}
Loading

0 comments on commit 7dc4dab

Please sign in to comment.