From 4af30233c7e84d64d16a506b873679312663a118 Mon Sep 17 00:00:00 2001 From: craig Date: Thu, 31 Oct 2024 12:24:06 +0000 Subject: [PATCH] update probe logic not to share memory Signed-off-by: craig rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED --- .../dnshealthcheckprobe_reconciler.go | 12 +- internal/controller/dnsrecord_controller.go | 12 +- internal/probes/worker.go | 151 ++++++++++-------- internal/probes/worker_test.go | 41 ++--- 4 files changed, 116 insertions(+), 100 deletions(-) diff --git a/internal/controller/dnshealthcheckprobe_reconciler.go b/internal/controller/dnshealthcheckprobe_reconciler.go index 5844c2a..fb03cf7 100644 --- a/internal/controller/dnshealthcheckprobe_reconciler.go +++ b/internal/controller/dnshealthcheckprobe_reconciler.go @@ -76,17 +76,17 @@ func (r *DNSProbeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } return ctrl.Result{}, err } - return ctrl.Result{}, nil } if !controllerutil.ContainsFinalizer(dnsProbe, DNSHealthCheckFinalizer) { - controllerutil.AddFinalizer(dnsProbe, DNSHealthCheckFinalizer) - if err := r.Client.Update(ctx, dnsProbe); err != nil { - if apierrors.IsConflict(err) { - return ctrl.Result{Requeue: true}, nil + if controllerutil.AddFinalizer(dnsProbe, DNSHealthCheckFinalizer) { + if err := r.Client.Update(ctx, dnsProbe); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } + return ctrl.Result{}, err } - return ctrl.Result{}, err } return ctrl.Result{}, nil } diff --git a/internal/controller/dnsrecord_controller.go b/internal/controller/dnsrecord_controller.go index 195a988..72119c8 100644 --- a/internal/controller/dnsrecord_controller.go +++ b/internal/controller/dnsrecord_controller.go @@ -223,6 +223,12 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return r.updateStatus(ctx, previous, dnsRecord, false, err) } + if probesEnabled { + if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil { + return ctrl.Result{}, err + } + } + // just check probes here and don't publish // Publish the record hadChanges, err := r.publishRecord(ctx, dnsRecord, dnsProvider) if err != nil { @@ -232,12 +238,6 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return r.updateStatus(ctx, previous, dnsRecord, hadChanges, err) } - if probesEnabled { - if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil { - return ctrl.Result{}, err - } - } - return r.updateStatus(ctx, previous, dnsRecord, hadChanges, nil) } diff --git a/internal/probes/worker.go b/internal/probes/worker.go index 07d13f3..474de8f 100644 --- a/internal/probes/worker.go +++ b/internal/probes/worker.go @@ -39,74 +39,44 @@ func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { } type Probe struct { - probeConfig *v1alpha1.DNSHealthCheckProbe Transport RoundTripperFunc probeHeaders v1alpha1.AdditionalHeaders } -func NewProbe(probeConfig *v1alpha1.DNSHealthCheckProbe, headers v1alpha1.AdditionalHeaders) *Probe { +func NewProbe(headers v1alpha1.AdditionalHeaders) *Probe { return &Probe{ - probeConfig: probeConfig, probeHeaders: headers, } } -// Start a worker in a separate gouroutine. Returns a cancel func to kill the worker. -// If a worker is nil, no routines will start and cancel could be ignored (still returning it to prevent panic) -func (w *Probe) Start(clientctx context.Context, k8sClient client.Client) context.CancelFunc { - ctx, cancel := context.WithCancel(clientctx) - logger := log.FromContext(ctx) - logger.Info("health: starting new worker for", "probe ", keyForProbe(w.probeConfig)) - metrics.ProbeCounter.WithLabelValues(w.probeConfig.Name, w.probeConfig.Namespace, w.probeConfig.Spec.Hostname).Inc() - go func() { - for range w.ExecuteProbe(ctx, w.probeConfig) { - // each time this is done it will send a signal - logger.V(1).Info("health: probe finished ", "updating status for probe", keyForProbe(w.probeConfig), "status", w.probeConfig.Status) - err := k8sClient.Status().Update(clientctx, w.probeConfig) - if err != nil { - logger.Error(err, "health: probe finished. error updating probe status", "probe", keyForProbe(w.probeConfig)) - } - } - logger.V(1).Info("health: stopped executing probe", "probe", keyForProbe(w.probeConfig)) - metrics.ProbeCounter.WithLabelValues(w.probeConfig.Name, w.probeConfig.Namespace, w.probeConfig.Spec.Hostname).Dec() - }() - return cancel -} - -func (w *Probe) ExecuteProbe(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) <-chan struct{} { - sig := make(chan struct{}) - +// ExecuteProbe executes the health check request on a background routine at the correct interval. It returns a channel on which it will send each probe result +func (w *Probe) ExecuteProbe(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) <-chan ProbeResult { + sig := make(chan ProbeResult) + localProbe := probe.DeepCopy() go func() { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(localProbe)) for { - if sig == nil || probe == nil { + if sig == nil || localProbe == nil { logger.Error(fmt.Errorf("channel or probe nil "), "exiting probe") return } - timer := time.NewTimer(executeAt(probe)) + timer := time.NewTimer(executeAt(localProbe)) select { case <-ctx.Done(): - logger.V(1).Info("health: context shutdown. Stopping", "probe", keyForProbe(probe)) + logger.V(2).Info("health probe worker: context shutdown. Stopping") if sig != nil { timer.Stop() close(sig) sig = nil - logger.V(1).Info("health: context shutdown. time stopped and channel closed", "probe", keyForProbe(probe)) + logger.V(2).Info("health probe worker: context shutdown. time stopped and channel closed") } return case <-timer.C: - result := w.execute(ctx, probe) - probe.Status.ObservedGeneration = probe.Generation - if !result.Healthy { - probe.Status.ConsecutiveFailures++ - } else { - probe.Status.ConsecutiveFailures = 0 - } - probe.Status.Healthy = &result.Healthy - probe.Status.LastCheckedAt = result.CheckedAt - probe.Status.Reason = result.Reason - logger.V(1).Info("health: execution complete ", "probe", keyForProbe(probe), "result", result) - sig <- struct{}{} + logger.V(2).Info("health probe worker: executing") + result := w.execute(ctx, localProbe) + // as this routine is just executing the local config it only cares about when it should execute again + localProbe.Status.LastCheckedAt = result.CheckedAt + sig <- result } } }() @@ -125,42 +95,39 @@ func executeAt(probe *v1alpha1.DNSHealthCheckProbe) time.Duration { } func (w *Probe) execute(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) ProbeResult { - logger := log.FromContext(ctx) - logger.V(1).Info("kinperforming health check") + logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probe)) + logger.V(2).Info("performing health check") var ips []net.IP //if the address is a CNAME, check all IP Addresses that it resolves to - logger.V(1).Info("looking up address ", "address", probe.Spec.Address) + logger.V(2).Info("looking up address ", "address", probe.Spec.Address) ip := net.ParseIP(probe.Spec.Address) if ip == nil { IPAddr, err := net.LookupIP(probe.Spec.Address) if err != nil { - logger.V(1).Error(err, "error looking up address", "address", probe.Spec.Address) + logger.Error(err, "error looking up address", "address", probe.Spec.Address) return ProbeResult{CheckedAt: metav1.Now(), Healthy: false, Reason: err.Error()} } ips = append(ips, IPAddr...) } else { ips = append(ips, ip) } - + var result ProbeResult for _, ip = range ips { - result := w.performRequest(ctx, string(probe.Spec.Protocol), probe.Spec.Hostname, probe.Spec.Path, ip.String(), probe.Spec.Port, probe.Spec.AllowInsecureCertificate, w.probeHeaders) - // if any IP in a CNAME is healthy, it is a healthy CNAME + result = w.performRequest(ctx, string(probe.Spec.Protocol), probe.Spec.Hostname, probe.Spec.Path, ip.String(), probe.Spec.Port, probe.Spec.AllowInsecureCertificate, w.probeHeaders) + // return as any healthy IP is a good result (multiple can only really happen with a CNAME) if result.Healthy { return result } } - - // all IPs returned an unhealthy result - return ProbeResult{ - CheckedAt: metav1.Now(), - Healthy: false, - } + //TODO deal with multiple results for a CNAME better and don't just rely on last result + // all IPs returned an unhealthy. Result will be the last ProbeResult so return this to have some status set + return result } func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip string, port int, allowInsecure bool, headers v1alpha1.AdditionalHeaders) ProbeResult { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).WithValues("health probe worker:", "preforming request") probeClient := metrics.NewInstrumentedClient("probe", &http.Client{ Transport: TransportWithDNSResponse(map[string]string{host: ip}, allowInsecure), }) @@ -182,7 +149,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str httpReq.Header.Add(h.Name, h.Value) } - logger.V(1).Info("health: probe executing against ", "url", httpReq.URL) + logger.V(2).Info("health: probe executing against ", "url", httpReq.URL) // Send the request res, err := probeClient.Do(httpReq) @@ -191,7 +158,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str } else if err != nil { return ProbeResult{CheckedAt: metav1.Now(), Healthy: false, Reason: fmt.Sprintf("error: %s, response: %+v", err.Error(), res)} } - logger.V(1).Info("health: probe execution complete against ", "url", httpReq.URL, "status code", res.StatusCode) + logger.V(2).Info("health: probe execution complete against ", "url", httpReq.URL, "status code", res.StatusCode) if !slice.Contains[int](ExpectedResponses, func(i int) bool { return i == res.StatusCode }) { return ProbeResult{ CheckedAt: metav1.Now(), @@ -204,6 +171,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str return ProbeResult{ CheckedAt: metav1.Now(), Healthy: true, + Status: res.StatusCode, } } @@ -245,36 +213,79 @@ func NewProbeManager() *ProbeManager { // StopProbeWorker stops the worker and removes it from the WorkerManager func (m *ProbeManager) StopProbeWorker(ctx context.Context, probeCR *v1alpha1.DNSHealthCheckProbe) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probeCR)) if stop, ok := m.probes[keyForProbe(probeCR)]; ok { - logger.V(1).Info("health: Stopping existing worker", "probe", keyForProbe(probeCR)) + logger.V(2).Info("Stopping existing worker", "probe", keyForProbe(probeCR)) stop() delete(m.probes, keyForProbe(probeCR)) } } +// Start a worker in a separate gouroutine. Returns a cancel func to kill the worker. +// If a worker is nil, no routines will start and cancel could be ignored (still returning it to prevent panic) +func (w *Probe) Start(clientctx context.Context, k8sClient client.Client, probe *v1alpha1.DNSHealthCheckProbe) context.CancelFunc { + + ctx, cancel := context.WithCancel(clientctx) + logger := log.FromContext(ctx).WithValues("probe", keyForProbe(probe)) + logger.V(1).Info("health: starting new worker for probe") + + go func() { + metrics.ProbeCounter.WithLabelValues(probe.Name, probe.Namespace, probe.Spec.Hostname).Inc() + //each time the probe executes it will send a result on the channel returned by ExecuteProbe until the probe is cancelled. The probe can be cancelled by a new spec being created for the healthcheck or on shutdown + for probeResult := range w.ExecuteProbe(ctx, probe) { + freshProbe := &v1alpha1.DNSHealthCheckProbe{} + if err := k8sClient.Get(clientctx, client.ObjectKeyFromObject(probe), freshProbe); err != nil { + // if we hit an error here we cancel and return as it is an unusual state + logger.Error(err, "health: probe finished. error getting upto date probe. Cancelling") + cancel() + return + } + freshProbe.Status.ObservedGeneration = freshProbe.Generation + if !probeResult.Healthy { + freshProbe.Status.ConsecutiveFailures++ + } else { + freshProbe.Status.ConsecutiveFailures = 0 + } + freshProbe.Status.Healthy = &probeResult.Healthy + freshProbe.Status.LastCheckedAt = probeResult.CheckedAt + freshProbe.Status.Reason = probeResult.Reason + freshProbe.Status.Status = probeResult.Status + logger.V(1).Info("health: execution complete ", "result", probeResult) + + logger.V(2).Info("health: probe finished updating status for probe", "status", freshProbe) + err := k8sClient.Status().Update(clientctx, freshProbe) + if err != nil { + logger.Error(err, "health: probe finished. error updating probe status") + } + } + + logger.V(1).Info("health: stopped executing probe", "probe", keyForProbe(probe)) + metrics.ProbeCounter.WithLabelValues(probe.Name, probe.Namespace, probe.Spec.Hostname).Dec() + }() + return cancel +} + // EnsureProbeWorker ensures a new worker per generation of the probe. // New generation of probe - new worker. // If the generation has not changed, it will re-create a worker. If context is done (we are deleting) that worker will die immediately. func (m *ProbeManager) EnsureProbeWorker(ctx context.Context, k8sClient client.Client, probeCR *v1alpha1.DNSHealthCheckProbe, headers v1alpha1.AdditionalHeaders) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probeCR)) logger.Info("ensure probe") - // if worker exists if stop, ok := m.probes[keyForProbe(probeCR)]; ok { // gen has not changed (spec has not changed) - nothing to do, // or first reconcile of the probe but worker already in place if probeCR.Status.ObservedGeneration == probeCR.Generation || probeCR.Status.ObservedGeneration == 0 { - logger.V(1).Info("health: probe worker exists for generation. Continuing", "probe", keyForProbe(probeCR)) + logger.V(2).Info("already exists for generation and status. Continuing", "probe", keyForProbe(probeCR)) return } - logger.V(1).Info("health: worker already exists. New generation of the probe: stopping existing worker", "probe", keyForProbe(probeCR)) + logger.V(2).Info("old worker exists. New generation of the probe found: stopping existing worker", "probe", keyForProbe(probeCR)) stop() } // Either worker does not exist, or gen changed and old worker got killed. Creating a new one. - logger.V(1).Info("health: starting fresh worker for", "generation", probeCR.Generation, "probe", keyForProbe(probeCR)) - probe := NewProbe(probeCR, headers) - m.probes[keyForProbe(probeCR)] = probe.Start(ctx, k8sClient) + logger.V(2).Info("health: starting fresh worker for", "generation", probeCR.Generation, "probe", keyForProbe(probeCR)) + probe := NewProbe(headers) + m.probes[keyForProbe(probeCR)] = probe.Start(ctx, k8sClient, probeCR) } func keyForProbe(probe *v1alpha1.DNSHealthCheckProbe) string { diff --git a/internal/probes/worker_test.go b/internal/probes/worker_test.go index 058583b..a577569 100644 --- a/internal/probes/worker_test.go +++ b/internal/probes/worker_test.go @@ -47,7 +47,7 @@ func TestWorker_ProbeSuccess(t *testing.T) { Transport func() probes.RoundTripperFunc ProbeConfig func() *v1alpha1.DNSHealthCheckProbe ProbeHeaders v1alpha1.AdditionalHeaders - Validate func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) + Validate func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) Ctx context.Context ExpectedProbeCalls int }{ @@ -64,15 +64,17 @@ func TestWorker_ProbeSuccess(t *testing.T) { ProbeConfig: func() *v1alpha1.DNSHealthCheckProbe { return testProbe.DeepCopy() }, - Validate: func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) { - if probe.Status.ConsecutiveFailures != 0 { - t.Fatalf("expected no failures but got %v", probe.Status.ConsecutiveFailures) + Validate: func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) { + if len(results) != expectedCalls { + t.Fatalf("expected %v results got %v", expectedCalls, len(results)) } - if probe.Status.Healthy == nil || *probe.Status.Healthy != true { - t.Fatalf("expected the probe to be healthy") + lastResult := results[expectedCalls-1] + // get the last probe result + if lastResult.Healthy == false { + t.Fatalf("expected the result of the probe to be healthy but it was not") } - if tt.calls != expectedCalls { - t.Fatalf("expected the number of health probe http calls to be %v but got %v", expectedCalls, tt.calls) + if lastResult.Status != 200 { + t.Fatalf("expected the result status to be 200 but got %v", lastResult.Status) } }, }, @@ -89,15 +91,16 @@ func TestWorker_ProbeSuccess(t *testing.T) { return testProbe.DeepCopy() }, ExpectedProbeCalls: 3, - Validate: func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) { - if probe.Status.ConsecutiveFailures != expectedCalls { - t.Fatalf("expected %v failures but got %v", expectedCalls, probe.Status.ConsecutiveFailures) + Validate: func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) { + if len(results) != expectedCalls { + t.Fatalf("expected %v results got %v", expectedCalls, len(results)) } - if probe.Status.Healthy == nil || *probe.Status.Healthy != false { - t.Fatalf("expected the probe to be unhealthy") + lastResult := results[expectedCalls-1] + if lastResult.Healthy != false { + t.Fatalf("expected the result of the probe to be un-healthy but it was not") } - if tt.calls != expectedCalls { - t.Fatalf("expected the number of health probe http calls to be %v but got %v", expectedCalls, tt.calls) + if lastResult.Status != 503 { + t.Fatalf("expected result status to be 503 but got %v", lastResult.Status) } }, }, @@ -106,7 +109,7 @@ func TestWorker_ProbeSuccess(t *testing.T) { for _, test := range testCases { t.Run(test.Name, func(t *testing.T) { probeConfig := test.ProbeConfig() - w := probes.NewProbe(probeConfig, test.ProbeHeaders) + w := probes.NewProbe(test.ProbeHeaders) tTransport := &testTransport{} tTransport.f = test.Transport() w.Transport = tTransport.countedHTTP @@ -116,11 +119,13 @@ func TestWorker_ProbeSuccess(t *testing.T) { wait := sync.WaitGroup{} wait.Add(test.ExpectedProbeCalls) // read from our channel + executedProbes := []probes.ProbeResult{} go func() { calls := 0 - for range exChan { + for result := range exChan { calls++ t.Logf("channel read %v", calls) + executedProbes = append(executedProbes, result) wait.Done() } }() @@ -135,7 +140,7 @@ func TestWorker_ProbeSuccess(t *testing.T) { // cancel our context and wait for the probe to exit cancel() wait.Wait() - test.Validate(t, probeConfig, tTransport, test.ExpectedProbeCalls) + test.Validate(t, executedProbes, tTransport, test.ExpectedProbeCalls) }) } }