From ef6b8e46d2307765c18f148bf93647b9dec358b8 Mon Sep 17 00:00:00 2001 From: "jicheng.sk" Date: Thu, 14 Dec 2023 17:11:12 +0800 Subject: [PATCH] support tcpCheck in podProbe --- apis/apps/v1alpha1/node_pod_probe_types.go | 2 + .../bases/apps.kruise.io_nodepodprobes.yaml | 4 + .../pod_probe_marker_controller.go | 22 +- .../podprobemarker_event_handler.go | 11 +- pkg/daemon/podprobe/pod_probe_controller.go | 22 +- .../podprobe/pod_probe_controller_test.go | 87 ++++++-- pkg/daemon/podprobe/prober.go | 48 ++++- pkg/daemon/podprobe/worker.go | 23 ++- .../validating/probe_create_update_handler.go | 35 +++- test/e2e/apps/podprobemarker.go | 190 +++++++++++++++++- test/e2e/framework/pod_probe_marker_util.go | 50 ++++- vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go | 63 ++++++ vendor/modules.txt | 1 + 13 files changed, 487 insertions(+), 71 deletions(-) create mode 100644 vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go diff --git a/apis/apps/v1alpha1/node_pod_probe_types.go b/apis/apps/v1alpha1/node_pod_probe_types.go index 9de5a80eae..772bc5f5f2 100644 --- a/apis/apps/v1alpha1/node_pod_probe_types.go +++ b/apis/apps/v1alpha1/node_pod_probe_types.go @@ -32,6 +32,8 @@ type PodProbe struct { Namespace string `json:"namespace"` // pod uid UID string `json:"uid"` + // pod ip + IP string `json:"IP"` // Custom container probe, supports Exec, Tcp, and returns the result to Pod yaml Probes []ContainerProbe `json:"probes,omitempty"` } diff --git a/config/crd/bases/apps.kruise.io_nodepodprobes.yaml b/config/crd/bases/apps.kruise.io_nodepodprobes.yaml index c9e7a23356..65b2269e4a 100644 --- a/config/crd/bases/apps.kruise.io_nodepodprobes.yaml +++ b/config/crd/bases/apps.kruise.io_nodepodprobes.yaml @@ -38,6 +38,9 @@ spec: podProbes: items: properties: + IP: + description: pod ip + type: string name: description: pod name type: string @@ -221,6 +224,7 @@ spec: description: pod uid type: string required: + - IP - name - namespace - uid diff --git a/pkg/controller/podprobemarker/pod_probe_marker_controller.go b/pkg/controller/podprobemarker/pod_probe_marker_controller.go index 3ff4f38524..a08cb15b5f 100644 --- a/pkg/controller/podprobemarker/pod_probe_marker_controller.go +++ b/pkg/controller/podprobemarker/pod_probe_marker_controller.go @@ -23,14 +23,6 @@ import ( "reflect" "strings" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/features" - "github.com/openkruise/kruise/pkg/util" - utilclient "github.com/openkruise/kruise/pkg/util/client" - "github.com/openkruise/kruise/pkg/util/controllerfinder" - utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" - utilfeature "github.com/openkruise/kruise/pkg/util/feature" - "github.com/openkruise/kruise/pkg/util/ratelimiter" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -44,6 +36,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + "github.com/openkruise/kruise/pkg/util/controllerfinder" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + "github.com/openkruise/kruise/pkg/util/ratelimiter" ) func init() { @@ -234,13 +235,16 @@ func (r *ReconcilePodProbeMarker) updateNodePodProbes(ppm *appsv1alpha1.PodProbe exist = true for j := range ppm.Spec.Probes { probe := ppm.Spec.Probes[j] + if podProbe.IP == "" { + podProbe.IP = pod.Status.PodIP + } setPodContainerProbes(podProbe, probe, ppm.Name) } break } } if !exist { - podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID)} + podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), IP: pod.Status.PodIP} for j := range ppm.Spec.Probes { probe := ppm.Spec.Probes[j] podProbe.Probes = append(podProbe.Probes, appsv1alpha1.ContainerProbe{ diff --git a/pkg/controller/podprobemarker/podprobemarker_event_handler.go b/pkg/controller/podprobemarker/podprobemarker_event_handler.go index 7afdfa5898..0056311f1b 100644 --- a/pkg/controller/podprobemarker/podprobemarker_event_handler.go +++ b/pkg/controller/podprobemarker/podprobemarker_event_handler.go @@ -19,9 +19,6 @@ package podprobemarker import ( "context" - appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" - utilclient "github.com/openkruise/kruise/pkg/util/client" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -33,6 +30,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" ) var _ handler.EventHandler = &enqueueRequestForPodProbeMarker{} @@ -95,8 +96,8 @@ func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLim if newInitialCondition == nil { return } - if kubecontroller.IsPodActive(new) && (oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) && - newInitialCondition.Status == corev1.ConditionTrue { + if kubecontroller.IsPodActive(new) && (((oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) && + newInitialCondition.Status == corev1.ConditionTrue) || (old.Status.PodIP != new.Status.PodIP)) { ppms, err := p.getPodProbeMarkerForPod(new) if err != nil { klog.Errorf("List PodProbeMarker fialed: %s", err.Error()) diff --git a/pkg/daemon/podprobe/pod_probe_controller.go b/pkg/daemon/podprobe/pod_probe_controller.go index 8e92700dad..8ff687cc60 100644 --- a/pkg/daemon/podprobe/pod_probe_controller.go +++ b/pkg/daemon/podprobe/pod_probe_controller.go @@ -26,15 +26,6 @@ import ( "sync" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/client" - kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" - clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1" - listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" - daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" - daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" - "github.com/openkruise/kruise/pkg/daemon/util" - commonutil "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +42,16 @@ import ( "k8s.io/gengo/examples/set-gen/sets" "k8s.io/klog/v2" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client" + kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" + clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" + daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" + "github.com/openkruise/kruise/pkg/daemon/util" + commonutil "github.com/openkruise/kruise/pkg/util" ) const ( @@ -63,6 +64,7 @@ type probeKey struct { podNs string podName string podUID string + podIP string containerName string probeName string } @@ -249,7 +251,7 @@ func (c *Controller) sync() error { c.workerLock.Lock() validWorkers := map[probeKey]struct{}{} for _, podProbe := range npp.Spec.PodProbes { - key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID} + key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID, podIP: podProbe.IP} for i := range podProbe.Probes { probe := podProbe.Probes[i] key.containerName = probe.ContainerName diff --git a/pkg/daemon/podprobe/pod_probe_controller_test.go b/pkg/daemon/podprobe/pod_probe_controller_test.go index 1daf0672ed..f013d0a5f1 100644 --- a/pkg/daemon/podprobe/pod_probe_controller_test.go +++ b/pkg/daemon/podprobe/pod_probe_controller_test.go @@ -22,16 +22,18 @@ import ( "testing" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" - listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" - commonutil "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + commonutil "github.com/openkruise/kruise/pkg/util" ) var ( @@ -44,6 +46,7 @@ var ( { Name: "pod-0", UID: "pod-0-uid", + IP: "1.1.1.1", Probes: []appsv1alpha1.ContainerProbe{ { Name: "ppm-1#healthy", @@ -64,6 +67,7 @@ var ( { Name: "pod-1", UID: "pod-1-uid", + IP: "2.2.2.2", Probes: []appsv1alpha1.ContainerProbe{ { Name: "ppm-1#healthy", @@ -81,6 +85,27 @@ var ( }, }, }, + { + Name: "pod-2", + UID: "pod-2-uid", + IP: "3.3.3.3", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#tcpCheck", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", + }, + }, + }, + }, + }, + }, + }, }, }, } @@ -96,7 +121,7 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { { name: "test1, update pod probe status", getUpdate: func() Update { - return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} }, getNodePodProbe: func() *appsv1alpha1.NodePodProbe { demo := demoNodePodProbe.DeepCopy() @@ -144,10 +169,11 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { return obj }, }, + { name: "test2, update pod probe status", getUpdate: func() Update { - return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} }, getNodePodProbe: func() *appsv1alpha1.NodePodProbe { demo := demoNodePodProbe.DeepCopy() @@ -227,10 +253,11 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { return obj }, }, + { name: "test3, update pod probe status", getUpdate: func() Update { - return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} }, getNodePodProbe: func() *appsv1alpha1.NodePodProbe { demo := demoNodePodProbe.DeepCopy() @@ -286,6 +313,7 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { return obj }, }, + { name: "test4, update pod probe status", getUpdate: func() Update { @@ -412,7 +440,7 @@ func TestSyncNodePodProbe(t *testing.T) { }, setWorkers: func(c *Controller) { c.workers = map[probeKey]*worker{} - key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#check"} + key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#check"} c.workers[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -423,7 +451,7 @@ func TestSyncNodePodProbe(t *testing.T) { }, }) go c.workers[key1].run() - key2 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + key2 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"} c.workers[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -437,8 +465,8 @@ func TestSyncNodePodProbe(t *testing.T) { }, expectWorkers: func(c *Controller) map[probeKey]*worker { expect := map[probeKey]*worker{} - key := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} - expect[key] = newWorker(c, key, &appsv1alpha1.ContainerProbeSpec{ + key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"} + expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ Exec: &corev1.ExecAction{ @@ -448,6 +476,17 @@ func TestSyncNodePodProbe(t *testing.T) { InitialDelaySeconds: 100, }, }) + key2 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"} + expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", + }, + }, + }, + }) return expect }, }, @@ -476,7 +515,19 @@ func TestSyncNodePodProbe(t *testing.T) { }, expectWorkers: func(c *Controller) map[probeKey]*worker { expect := map[probeKey]*worker{} - key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + key0 := probeKey{"", "pod-0", "pod-0-uid", "1.1.1.1", "main", "ppm-1#healthy"} + expect[key0] = newWorker(c, key0, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + + key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"} expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -487,7 +538,8 @@ func TestSyncNodePodProbe(t *testing.T) { InitialDelaySeconds: 100, }, }) - key2 := probeKey{"", "pod-1", "pod-1-uid", "nginx", "ppm-1#check"} + + key2 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "nginx", "ppm-1#check"} expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -498,15 +550,16 @@ func TestSyncNodePodProbe(t *testing.T) { InitialDelaySeconds: 100, }, }) - key3 := probeKey{"", "pod-0", "pod-0-uid", "main", "ppm-1#healthy"} + + key3 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"} expect[key3] = newWorker(c, key3, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ - Exec: &corev1.ExecAction{ - Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", }, }, - InitialDelaySeconds: 100, }, }) return expect diff --git a/pkg/daemon/podprobe/prober.go b/pkg/daemon/podprobe/prober.go index 5b2e5c9ef3..6db12c6da1 100644 --- a/pkg/daemon/podprobe/prober.go +++ b/pkg/daemon/podprobe/prober.go @@ -22,13 +22,16 @@ import ( "io" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/apimachinery/pkg/util/intstr" criapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/probe" execprobe "k8s.io/kubernetes/pkg/probe/exec" + tcpprobe "k8s.io/kubernetes/pkg/probe/tcp" "k8s.io/utils/exec" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) const maxProbeMessageLength = 1024 @@ -36,6 +39,7 @@ const maxProbeMessageLength = 1024 // Prober helps to check the probe(exec, http, tcp) of a container. type prober struct { exec execprobe.Prober + tcp tcpprobe.Prober runtimeService criapi.RuntimeService } @@ -44,13 +48,14 @@ type prober struct { func newProber(runtimeService criapi.RuntimeService) *prober { return &prober{ exec: execprobe.New(), + tcp: tcpprobe.New(), runtimeService: runtimeService, } } // probe probes the container. -func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) { - result, msg, err := pb.runProbe(p, container, containerID) +func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, probeKey probeKey, containerRuntimeStatus *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) { + result, msg, err := pb.runProbe(p, probeKey, containerRuntimeStatus, containerID) if bytes.Count([]byte(msg), nil)-1 > maxProbeMessageLength { msg = msg[:maxProbeMessageLength] } @@ -60,19 +65,32 @@ func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeap return appsv1alpha1.ProbeSucceeded, msg, nil } -func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) { +func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, probeKey probeKey, containerRuntimeStatus *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) { timeSecond := p.TimeoutSeconds if timeSecond <= 0 { timeSecond = 1 } timeout := time.Duration(timeSecond) * time.Second // current only support exec - // todo: http, tcp + // todo: http if p.Exec != nil { return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout)) } - klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name) - return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name) + // support tcp socket probe handler + if p.TCPSocket != nil { + port, err := extractPort(p.TCPSocket.Port) + if err != nil { + return probe.Unknown, "", err + } + host := p.TCPSocket.Host + if host == "" { + host = probeKey.podIP + } + klog.InfoS("TCP-Probe Host", "host", host, "port", port, "timeout", timeout) + return pb.tcp.Probe(host, port, timeout) + } + klog.InfoS("Failed to find probe builder for container", "containerName", containerRuntimeStatus.Metadata.Name) + return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", containerRuntimeStatus.Metadata.Name) } type execInContainer struct { @@ -92,6 +110,22 @@ func (pb *prober) newExecInContainer(containerID string, cmd []string, timeout t }} } +func extractPort(param intstr.IntOrString) (int, error) { + port := -1 + switch param.Type { + case intstr.Int: + port = param.IntValue() + case intstr.String: + fallthrough + default: + return port, fmt.Errorf("intOrString had no kind: %+v", param) + } + if port > 0 && port < 65536 { + return port, nil + } + return port, fmt.Errorf("invalid port number: %v", port) +} + func (eic *execInContainer) Run() error { return nil } diff --git a/pkg/daemon/podprobe/worker.go b/pkg/daemon/podprobe/worker.go index 7b9556d1bc..c7c5a04141 100644 --- a/pkg/daemon/podprobe/worker.go +++ b/pkg/daemon/podprobe/worker.go @@ -22,11 +22,12 @@ import ( "reflect" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" "k8s.io/apimachinery/pkg/util/runtime" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" ) // worker handles the periodic probing of its assigned container. Each worker has a go-routine @@ -37,7 +38,7 @@ type worker struct { // Channel for stopping the probe. stopCh chan struct{} - // pod uid, container name, probe name + // pod uid, container name, probe name, ip key probeKey // Describes the probe configuration @@ -119,21 +120,21 @@ func (w *worker) doProbe() (keepGoing bool) { defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging) defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) - container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) - if container == nil { + containerRuntimeStatus, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) + if containerRuntimeStatus == nil { klog.V(5).Infof("Pod(%s/%s) container(%s) Not Found", w.key.podNs, w.key.podName, w.key.containerName) return true } - if w.containerID != container.Id { + if w.containerID != containerRuntimeStatus.Id { if w.containerID != "" { w.probeController.result.remove(w.containerID) } - klog.V(5).Infof("Pod(%s/%s) container(%s) Id changed(%s -> %s)", w.key.podNs, w.key.podName, w.key.containerName, w.containerID, container.Id) - w.containerID = container.Id + klog.V(5).Infof("Pod(%s/%s) container(%s) Id changed(%s -> %s)", w.key.podNs, w.key.podName, w.key.containerName, w.containerID, containerRuntimeStatus.Id) + w.containerID = containerRuntimeStatus.Id w.probeController.result.set(w.containerID, w.key, w.initialValue, "") } - if container.State != runtimeapi.ContainerState_CONTAINER_RUNNING { + if containerRuntimeStatus.State != runtimeapi.ContainerState_CONTAINER_RUNNING { klog.V(5).Infof("Pod(%s/%s) Non-running container(%s) probed", w.key.podNs, w.key.podName, w.key.containerName) w.probeController.result.set(w.containerID, w.key, appsv1alpha1.ProbeFailed, fmt.Sprintf("Container(%s) is Non-running", w.key.containerName)) } @@ -143,7 +144,7 @@ func (w *worker) doProbe() (keepGoing bool) { if initialDelay < 1 { initialDelay = 1 } - curDelay := int32(time.Since(time.Unix(0, container.StartedAt)).Seconds()) + curDelay := int32(time.Since(time.Unix(0, containerRuntimeStatus.StartedAt)).Seconds()) if curDelay < initialDelay { klog.V(5).Infof("Pod(%s:%s) container(%s) probe(%s) initialDelay(%d), but curDelay(%d)", w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, initialDelay, curDelay) @@ -152,7 +153,7 @@ func (w *worker) doProbe() (keepGoing bool) { // the full container environment here, OR we must make a call to the CRI in order to get those environment // values from the running container. - result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID) + result, msg, err := w.probeController.prober.probe(w.spec, w.key, containerRuntimeStatus, w.containerID) if err != nil { klog.Errorf("Pod(%s/%s) do container(%s) probe(%s) spec(%s) failed: %s", w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, util.DumpJSON(w.spec), err.Error()) diff --git a/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go b/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go index b552046871..d7832a47f2 100644 --- a/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go +++ b/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go @@ -24,11 +24,11 @@ import ( "regexp" "strings" + "k8s.io/apimachinery/pkg/util/intstr" + "github.com/openkruise/kruise/pkg/features" utilfeature "github.com/openkruise/kruise/pkg/util/feature" - "github.com/openkruise/kruise/apis/apps/pub" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" genericvalidation "k8s.io/apimachinery/pkg/api/validation" @@ -41,6 +41,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "github.com/openkruise/kruise/apis/apps/pub" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) const ( @@ -205,9 +208,13 @@ func validateHandler(handler *corev1.ProbeHandler, fldPath *field.Path) field.Er numHandlers++ allErrors = append(allErrors, validateExecAction(handler.Exec, fldPath.Child("exec"))...) } - if handler.HTTPGet != nil || handler.TCPSocket != nil { + if handler.HTTPGet != nil { + numHandlers++ + allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current no support http probe")) + } + if handler.TCPSocket != nil { numHandlers++ - allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current only support exec probe")) + allErrors = append(allErrors, validateTCPSocketAction(handler.TCPSocket, fldPath.Child("tcpSocket"))...) } if numHandlers == 0 { allErrors = append(allErrors, field.Required(fldPath, "must specify a handler type")) @@ -223,6 +230,26 @@ func validateExecAction(exec *corev1.ExecAction, fldPath *field.Path) field.Erro return allErrors } +func validateTCPSocketAction(tcp *corev1.TCPSocketAction, fldPath *field.Path) field.ErrorList { + return ValidatePortNumOrName(tcp.Port, fldPath.Child("port")) +} + +func ValidatePortNumOrName(port intstr.IntOrString, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if port.Type == intstr.Int { + for _, msg := range validationutil.IsValidPortNum(port.IntValue()) { + allErrs = append(allErrs, field.Invalid(fldPath, port.IntValue(), msg)) + } + } else if port.Type == intstr.String { + for _, msg := range validationutil.IsValidPortName(port.StrVal) { + allErrs = append(allErrs, field.Invalid(fldPath, port.StrVal, msg)) + } + } else { + allErrs = append(allErrs, field.InternalError(fldPath, fmt.Errorf("unknown type: %v", port.Type))) + } + return allErrs +} + func validateProbeMarkerPolicy(policy *appsv1alpha1.ProbeMarkerPolicy, fldPath *field.Path) field.ErrorList { allErrors := field.ErrorList{} if policy.State != appsv1alpha1.ProbeSucceeded && policy.State != appsv1alpha1.ProbeFailed { diff --git a/test/e2e/apps/podprobemarker.go b/test/e2e/apps/podprobemarker.go index ba60eb2a46..7cafa1ef16 100644 --- a/test/e2e/apps/podprobemarker.go +++ b/test/e2e/apps/podprobemarker.go @@ -23,18 +23,20 @@ import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" - "github.com/openkruise/kruise/pkg/controller/podprobemarker" - "github.com/openkruise/kruise/pkg/util" - "github.com/openkruise/kruise/test/e2e/framework" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/controller/podprobemarker" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/test/e2e/framework" ) var _ = SIGDescribe("PodProbeMarker", func() { @@ -274,4 +276,182 @@ var _ = SIGDescribe("PodProbeMarker", func() { } }) }) + + framework.KruiseDescribe("PodProbeMarker with tcpCheck functionality", func() { + ginkgo.AfterEach(func() { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + }) + + ginkgo.It("pod probe marker tcpCheck test", func() { + nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nodeLen := len(nodeList.Items) + if nodeLen == 0 { + ginkgo.By("pod probe markers list nodeList is zero") + return + } + nppList, err := kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(nppList.Items).To(gomega.HaveLen(nodeLen)) + + // create statefulset + sts := tester.NewBaseStatefulSet(ns, randStr) + // For heterogeneous scenario like edge cluster, I want to deploy a Pod for each Node to verify that the functionality works + sts.Spec.Template.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{ + { + LabelSelector: sts.Spec.Selector, + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: v1.ScheduleAnyway, + }, + } + sts.Spec.Replicas = utilpointer.Int32Ptr(int32(nodeLen)) + ginkgo.By(fmt.Sprintf("Create statefulset(%s/%s)", sts.Namespace, sts.Name)) + tester.CreateStatefulSet(sts) + + // create pod probe marker + ppmList := tester.NewPodProbeMarkerForTcpCheck(ns, randStr) + ppm1 := &ppmList[0] + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Create(context.TODO(), ppm1, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 10) + + // check finalizer + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(controllerutil.ContainsFinalizer(ppm1, podprobemarker.PodProbeMarkerFinalizer)).To(gomega.BeTrue()) + + pods, err := tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + validPods := sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + npp, err := kc.AppsV1alpha1().NodePodProbes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + var podProbe *appsv1alpha1.PodProbe + for i := range npp.Spec.PodProbes { + obj := &npp.Spec.PodProbes[i] + if obj.UID == string(pod.UID) { + podProbe = obj + break + } + } + gomega.Expect(podProbe).NotTo(gomega.BeNil()) + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).To(gomega.BeNil()) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // update failed probe, tcp port check from 80 ---> 8081 + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm1.Spec.Probes[0].Probe.TCPSocket = &v1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8081)}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 60) + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + for _, pod := range pods { + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionFalse))) + } + + // update success probe + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm1.Spec.Probes[0].Probe.TCPSocket = &v1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(80)}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // scale down + sts, err = kc.AppsV1beta1().StatefulSets(ns).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sts.Spec.Replicas = utilpointer.Int32Ptr(1) + _, err = kc.AppsV1beta1().StatefulSets(ns).Update(context.TODO(), sts, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 60) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(pods)).To(gomega.Equal(1)) + validPods = sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // scale up + sts, err = kc.AppsV1beta1().StatefulSets(ns).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sts.Spec.Replicas = utilpointer.Int32Ptr(int32(nodeLen)) + _, err = kc.AppsV1beta1().StatefulSets(ns).Update(context.TODO(), sts, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + tester.WaitForStatefulSetRunning(sts) + time.Sleep(time.Second * 100) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(pods)).To(gomega.Equal(nodeLen)) + validPods = sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // delete podProbeMarker + for _, ppm := range ppmList { + err = kc.AppsV1alpha1().PodProbeMarkers(ns).Delete(context.TODO(), ppm.Name, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + time.Sleep(time.Second * 3) + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + gomega.Expect(npp.Spec.PodProbes).To(gomega.HaveLen(0)) + } + }) + }) }) diff --git a/test/e2e/framework/pod_probe_marker_util.go b/test/e2e/framework/pod_probe_marker_util.go index 37e5a5c8c6..80739b712b 100644 --- a/test/e2e/framework/pod_probe_marker_util.go +++ b/test/e2e/framework/pod_probe_marker_util.go @@ -22,16 +22,18 @@ import ( "time" "github.com/onsi/gomega" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" kubecontroller "k8s.io/kubernetes/pkg/controller" utilpointer "k8s.io/utils/pointer" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" ) type PodProbeMarkerTester struct { @@ -132,6 +134,48 @@ func (s *PodProbeMarkerTester) NewPodProbeMarker(ns, randStr string) []appsv1alp return []appsv1alpha1.PodProbeMarker{nginx, main} } +func (s *PodProbeMarkerTester) NewPodProbeMarkerForTcpCheck(ns, randStr string) []appsv1alpha1.PodProbeMarker { + nginx := appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-nginx", + Namespace: ns, + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": fmt.Sprintf("probe-%s", randStr), + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "nginx", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(80)}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Labels: map[string]string{ + "nginx": "healthy", + }, + }, + }, + }, + }, + }, + } + + return []appsv1alpha1.PodProbeMarker{nginx} +} + func (s *PodProbeMarkerTester) NewBaseStatefulSet(namespace, randStr string) *appsv1beta1.StatefulSet { return &appsv1beta1.StatefulSet{ TypeMeta: metav1.TypeMeta{ diff --git a/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go b/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go new file mode 100644 index 0000000000..7ce1450470 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go @@ -0,0 +1,63 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tcp + +import ( + "net" + "strconv" + "time" + + "k8s.io/kubernetes/pkg/probe" + + "k8s.io/klog/v2" +) + +// New creates Prober. +func New() Prober { + return tcpProber{} +} + +// Prober is an interface that defines the Probe function for doing TCP readiness/liveness checks. +type Prober interface { + Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) +} + +type tcpProber struct{} + +// Probe checks that a TCP connection to the address can be opened. +func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) { + return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) +} + +// DoTCPProbe checks that a TCP socket to the address can be opened. +// If the socket can be opened, it returns Success +// If the socket fails to open, it returns Failure. +// This is exported because some other packages may want to do direct TCP probes. +func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { + d := probe.ProbeDialer() + d.Timeout = timeout + conn, err := d.Dial("tcp", addr) + if err != nil { + // Convert errors to failures to handle timeouts. + return probe.Failure, err.Error(), nil + } + err = conn.Close() + if err != nil { + klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err) + } + return probe.Success, "", nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index fb5d78e7c6..56614bc91c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1369,6 +1369,7 @@ k8s.io/kubernetes/pkg/kubelet/util/format k8s.io/kubernetes/pkg/kubelet/util/ioutils k8s.io/kubernetes/pkg/probe k8s.io/kubernetes/pkg/probe/exec +k8s.io/kubernetes/pkg/probe/tcp k8s.io/kubernetes/pkg/proxy/util k8s.io/kubernetes/pkg/scheduler k8s.io/kubernetes/pkg/scheduler/apis/config