Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update probe logic not to share memory #288

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions internal/controller/dnshealthcheckprobe_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,17 @@ func (r *DNSProbeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

if !controllerutil.ContainsFinalizer(dnsProbe, DNSHealthCheckFinalizer) {
controllerutil.AddFinalizer(dnsProbe, DNSHealthCheckFinalizer)
if err := r.Client.Update(ctx, dnsProbe); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
if controllerutil.AddFinalizer(dnsProbe, DNSHealthCheckFinalizer) {
if err := r.Client.Update(ctx, dnsProbe); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/dnsrecord_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.updateStatus(ctx, previous, dnsRecord, false, err)
}

if probesEnabled {
if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil {
return ctrl.Result{}, err
}
}
// just check probes here and don't publish
// Publish the record
hadChanges, err := r.publishRecord(ctx, dnsRecord, dnsProvider)
if err != nil {
Expand All @@ -232,12 +238,6 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.updateStatus(ctx, previous, dnsRecord, hadChanges, err)
}

if probesEnabled {
if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil {
return ctrl.Result{}, err
}
}

return r.updateStatus(ctx, previous, dnsRecord, hadChanges, nil)
}

Expand Down
151 changes: 81 additions & 70 deletions internal/probes/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,74 +39,44 @@ func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
}

type Probe struct {
probeConfig *v1alpha1.DNSHealthCheckProbe
Transport RoundTripperFunc
probeHeaders v1alpha1.AdditionalHeaders
}

func NewProbe(probeConfig *v1alpha1.DNSHealthCheckProbe, headers v1alpha1.AdditionalHeaders) *Probe {
func NewProbe(headers v1alpha1.AdditionalHeaders) *Probe {
return &Probe{
probeConfig: probeConfig,
probeHeaders: headers,
}
}

// Start a worker in a separate gouroutine. Returns a cancel func to kill the worker.
// If a worker is nil, no routines will start and cancel could be ignored (still returning it to prevent panic)
func (w *Probe) Start(clientctx context.Context, k8sClient client.Client) context.CancelFunc {
ctx, cancel := context.WithCancel(clientctx)
logger := log.FromContext(ctx)
logger.Info("health: starting new worker for", "probe ", keyForProbe(w.probeConfig))
metrics.ProbeCounter.WithLabelValues(w.probeConfig.Name, w.probeConfig.Namespace, w.probeConfig.Spec.Hostname).Inc()
go func() {
for range w.ExecuteProbe(ctx, w.probeConfig) {
// each time this is done it will send a signal
logger.V(1).Info("health: probe finished ", "updating status for probe", keyForProbe(w.probeConfig), "status", w.probeConfig.Status)
err := k8sClient.Status().Update(clientctx, w.probeConfig)
if err != nil {
logger.Error(err, "health: probe finished. error updating probe status", "probe", keyForProbe(w.probeConfig))
}
}
logger.V(1).Info("health: stopped executing probe", "probe", keyForProbe(w.probeConfig))
metrics.ProbeCounter.WithLabelValues(w.probeConfig.Name, w.probeConfig.Namespace, w.probeConfig.Spec.Hostname).Dec()
}()
return cancel
}

func (w *Probe) ExecuteProbe(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) <-chan struct{} {
sig := make(chan struct{})

// ExecuteProbe executes the health check request on a background routine at the correct interval. It returns a channel on which it will send each probe result
func (w *Probe) ExecuteProbe(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) <-chan ProbeResult {
sig := make(chan ProbeResult)
localProbe := probe.DeepCopy()
go func() {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(localProbe))
for {
if sig == nil || probe == nil {
if sig == nil || localProbe == nil {
logger.Error(fmt.Errorf("channel or probe nil "), "exiting probe")
return
}
timer := time.NewTimer(executeAt(probe))
timer := time.NewTimer(executeAt(localProbe))
select {
case <-ctx.Done():
logger.V(1).Info("health: context shutdown. Stopping", "probe", keyForProbe(probe))
logger.V(2).Info("health probe worker: context shutdown. Stopping")
if sig != nil {
timer.Stop()
close(sig)
sig = nil
logger.V(1).Info("health: context shutdown. time stopped and channel closed", "probe", keyForProbe(probe))
logger.V(2).Info("health probe worker: context shutdown. time stopped and channel closed")
}
return
case <-timer.C:
result := w.execute(ctx, probe)
probe.Status.ObservedGeneration = probe.Generation
if !result.Healthy {
probe.Status.ConsecutiveFailures++
} else {
probe.Status.ConsecutiveFailures = 0
}
probe.Status.Healthy = &result.Healthy
probe.Status.LastCheckedAt = result.CheckedAt
probe.Status.Reason = result.Reason
logger.V(1).Info("health: execution complete ", "probe", keyForProbe(probe), "result", result)
sig <- struct{}{}
logger.V(2).Info("health probe worker: executing")
result := w.execute(ctx, localProbe)
// as this routine is just executing the local config it only cares about when it should execute again
localProbe.Status.LastCheckedAt = result.CheckedAt
sig <- result
}
}
}()
Expand All @@ -125,42 +95,39 @@ func executeAt(probe *v1alpha1.DNSHealthCheckProbe) time.Duration {
}

func (w *Probe) execute(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) ProbeResult {
logger := log.FromContext(ctx)
logger.V(1).Info("kinperforming health check")
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probe))
logger.V(2).Info("performing health check")
var ips []net.IP

//if the address is a CNAME, check all IP Addresses that it resolves to
logger.V(1).Info("looking up address ", "address", probe.Spec.Address)
logger.V(2).Info("looking up address ", "address", probe.Spec.Address)
ip := net.ParseIP(probe.Spec.Address)

if ip == nil {
IPAddr, err := net.LookupIP(probe.Spec.Address)
if err != nil {
logger.V(1).Error(err, "error looking up address", "address", probe.Spec.Address)
logger.Error(err, "error looking up address", "address", probe.Spec.Address)
return ProbeResult{CheckedAt: metav1.Now(), Healthy: false, Reason: err.Error()}
}
ips = append(ips, IPAddr...)
} else {
ips = append(ips, ip)
}

var result ProbeResult
for _, ip = range ips {
result := w.performRequest(ctx, string(probe.Spec.Protocol), probe.Spec.Hostname, probe.Spec.Path, ip.String(), probe.Spec.Port, probe.Spec.AllowInsecureCertificate, w.probeHeaders)
// if any IP in a CNAME is healthy, it is a healthy CNAME
result = w.performRequest(ctx, string(probe.Spec.Protocol), probe.Spec.Hostname, probe.Spec.Path, ip.String(), probe.Spec.Port, probe.Spec.AllowInsecureCertificate, w.probeHeaders)
// return as any healthy IP is a good result (multiple can only really happen with a CNAME)
if result.Healthy {
return result
}
}

// all IPs returned an unhealthy result
return ProbeResult{
CheckedAt: metav1.Now(),
Healthy: false,
}
//TODO deal with multiple results for a CNAME better and don't just rely on last result
// all IPs returned an unhealthy. Result will be the last ProbeResult so return this to have some status set
return result
}

func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip string, port int, allowInsecure bool, headers v1alpha1.AdditionalHeaders) ProbeResult {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", "preforming request")
probeClient := metrics.NewInstrumentedClient("probe", &http.Client{
Transport: TransportWithDNSResponse(map[string]string{host: ip}, allowInsecure),
})
Expand All @@ -182,7 +149,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str
httpReq.Header.Add(h.Name, h.Value)
}

logger.V(1).Info("health: probe executing against ", "url", httpReq.URL)
logger.V(2).Info("health: probe executing against ", "url", httpReq.URL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly the V(2) level cannot be displayed with the way we implemented a logger. It is log.Info for info log.V(1).info for debug level and log.Error() and log.V(1).Error for error level logs. Byt setting it to V(2) we unable to display them 🤔


// Send the request
res, err := probeClient.Do(httpReq)
Expand All @@ -191,7 +158,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str
} else if err != nil {
return ProbeResult{CheckedAt: metav1.Now(), Healthy: false, Reason: fmt.Sprintf("error: %s, response: %+v", err.Error(), res)}
}
logger.V(1).Info("health: probe execution complete against ", "url", httpReq.URL, "status code", res.StatusCode)
logger.V(2).Info("health: probe execution complete against ", "url", httpReq.URL, "status code", res.StatusCode)
if !slice.Contains[int](ExpectedResponses, func(i int) bool { return i == res.StatusCode }) {
return ProbeResult{
CheckedAt: metav1.Now(),
Expand All @@ -204,6 +171,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str
return ProbeResult{
CheckedAt: metav1.Now(),
Healthy: true,
Status: res.StatusCode,
}
}

Expand Down Expand Up @@ -245,36 +213,79 @@ func NewProbeManager() *ProbeManager {

// StopProbeWorker stops the worker and removes it from the WorkerManager
func (m *ProbeManager) StopProbeWorker(ctx context.Context, probeCR *v1alpha1.DNSHealthCheckProbe) {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probeCR))
if stop, ok := m.probes[keyForProbe(probeCR)]; ok {
logger.V(1).Info("health: Stopping existing worker", "probe", keyForProbe(probeCR))
logger.V(2).Info("Stopping existing worker", "probe", keyForProbe(probeCR))
stop()
delete(m.probes, keyForProbe(probeCR))
}
}

// Start a worker in a separate gouroutine. Returns a cancel func to kill the worker.
// If a worker is nil, no routines will start and cancel could be ignored (still returning it to prevent panic)
func (w *Probe) Start(clientctx context.Context, k8sClient client.Client, probe *v1alpha1.DNSHealthCheckProbe) context.CancelFunc {

ctx, cancel := context.WithCancel(clientctx)
logger := log.FromContext(ctx).WithValues("probe", keyForProbe(probe))
logger.V(1).Info("health: starting new worker for probe")

go func() {
metrics.ProbeCounter.WithLabelValues(probe.Name, probe.Namespace, probe.Spec.Hostname).Inc()
//each time the probe executes it will send a result on the channel returned by ExecuteProbe until the probe is cancelled. The probe can be cancelled by a new spec being created for the healthcheck or on shutdown
for probeResult := range w.ExecuteProbe(ctx, probe) {
freshProbe := &v1alpha1.DNSHealthCheckProbe{}
if err := k8sClient.Get(clientctx, client.ObjectKeyFromObject(probe), freshProbe); err != nil {
// if we hit an error here we cancel and return as it is an unusual state
logger.Error(err, "health: probe finished. error getting upto date probe. Cancelling")
cancel()
return
}
freshProbe.Status.ObservedGeneration = freshProbe.Generation
if !probeResult.Healthy {
freshProbe.Status.ConsecutiveFailures++
} else {
freshProbe.Status.ConsecutiveFailures = 0
}
freshProbe.Status.Healthy = &probeResult.Healthy
freshProbe.Status.LastCheckedAt = probeResult.CheckedAt
freshProbe.Status.Reason = probeResult.Reason
freshProbe.Status.Status = probeResult.Status
logger.V(1).Info("health: execution complete ", "result", probeResult)

logger.V(2).Info("health: probe finished updating status for probe", "status", freshProbe)
err := k8sClient.Status().Update(clientctx, freshProbe)
if err != nil {
logger.Error(err, "health: probe finished. error updating probe status")
}
}

logger.V(1).Info("health: stopped executing probe", "probe", keyForProbe(probe))
metrics.ProbeCounter.WithLabelValues(probe.Name, probe.Namespace, probe.Spec.Hostname).Dec()
}()
return cancel
}

// EnsureProbeWorker ensures a new worker per generation of the probe.
// New generation of probe - new worker.
// If the generation has not changed, it will re-create a worker. If context is done (we are deleting) that worker will die immediately.
func (m *ProbeManager) EnsureProbeWorker(ctx context.Context, k8sClient client.Client, probeCR *v1alpha1.DNSHealthCheckProbe, headers v1alpha1.AdditionalHeaders) {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probeCR))
logger.Info("ensure probe")

// if worker exists
if stop, ok := m.probes[keyForProbe(probeCR)]; ok {
// gen has not changed (spec has not changed) - nothing to do,
// or first reconcile of the probe but worker already in place
if probeCR.Status.ObservedGeneration == probeCR.Generation || probeCR.Status.ObservedGeneration == 0 {
logger.V(1).Info("health: probe worker exists for generation. Continuing", "probe", keyForProbe(probeCR))
logger.V(2).Info("already exists for generation and status. Continuing", "probe", keyForProbe(probeCR))
return
}
logger.V(1).Info("health: worker already exists. New generation of the probe: stopping existing worker", "probe", keyForProbe(probeCR))
logger.V(2).Info("old worker exists. New generation of the probe found: stopping existing worker", "probe", keyForProbe(probeCR))
stop()
}
// Either worker does not exist, or gen changed and old worker got killed. Creating a new one.
logger.V(1).Info("health: starting fresh worker for", "generation", probeCR.Generation, "probe", keyForProbe(probeCR))
probe := NewProbe(probeCR, headers)
m.probes[keyForProbe(probeCR)] = probe.Start(ctx, k8sClient)
logger.V(2).Info("health: starting fresh worker for", "generation", probeCR.Generation, "probe", keyForProbe(probeCR))
probe := NewProbe(headers)
m.probes[keyForProbe(probeCR)] = probe.Start(ctx, k8sClient, probeCR)
}

func keyForProbe(probe *v1alpha1.DNSHealthCheckProbe) string {
Expand Down
41 changes: 23 additions & 18 deletions internal/probes/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestWorker_ProbeSuccess(t *testing.T) {
Transport func() probes.RoundTripperFunc
ProbeConfig func() *v1alpha1.DNSHealthCheckProbe
ProbeHeaders v1alpha1.AdditionalHeaders
Validate func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int)
Validate func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int)
Ctx context.Context
ExpectedProbeCalls int
}{
Expand All @@ -64,15 +64,17 @@ func TestWorker_ProbeSuccess(t *testing.T) {
ProbeConfig: func() *v1alpha1.DNSHealthCheckProbe {
return testProbe.DeepCopy()
},
Validate: func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) {
if probe.Status.ConsecutiveFailures != 0 {
t.Fatalf("expected no failures but got %v", probe.Status.ConsecutiveFailures)
Validate: func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) {
if len(results) != expectedCalls {
t.Fatalf("expected %v results got %v", expectedCalls, len(results))
}
if probe.Status.Healthy == nil || *probe.Status.Healthy != true {
t.Fatalf("expected the probe to be healthy")
lastResult := results[expectedCalls-1]
// get the last probe result
if lastResult.Healthy == false {
t.Fatalf("expected the result of the probe to be healthy but it was not")
}
if tt.calls != expectedCalls {
t.Fatalf("expected the number of health probe http calls to be %v but got %v", expectedCalls, tt.calls)
if lastResult.Status != 200 {
t.Fatalf("expected the result status to be 200 but got %v", lastResult.Status)
}
},
},
Expand All @@ -89,15 +91,16 @@ func TestWorker_ProbeSuccess(t *testing.T) {
return testProbe.DeepCopy()
},
ExpectedProbeCalls: 3,
Validate: func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) {
if probe.Status.ConsecutiveFailures != expectedCalls {
t.Fatalf("expected %v failures but got %v", expectedCalls, probe.Status.ConsecutiveFailures)
Validate: func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) {
if len(results) != expectedCalls {
t.Fatalf("expected %v results got %v", expectedCalls, len(results))
}
if probe.Status.Healthy == nil || *probe.Status.Healthy != false {
t.Fatalf("expected the probe to be unhealthy")
lastResult := results[expectedCalls-1]
if lastResult.Healthy != false {
t.Fatalf("expected the result of the probe to be un-healthy but it was not")
}
if tt.calls != expectedCalls {
t.Fatalf("expected the number of health probe http calls to be %v but got %v", expectedCalls, tt.calls)
if lastResult.Status != 503 {
t.Fatalf("expected result status to be 503 but got %v", lastResult.Status)
}
},
},
Expand All @@ -106,7 +109,7 @@ func TestWorker_ProbeSuccess(t *testing.T) {
for _, test := range testCases {
t.Run(test.Name, func(t *testing.T) {
probeConfig := test.ProbeConfig()
w := probes.NewProbe(probeConfig, test.ProbeHeaders)
w := probes.NewProbe(test.ProbeHeaders)
tTransport := &testTransport{}
tTransport.f = test.Transport()
w.Transport = tTransport.countedHTTP
Expand All @@ -116,11 +119,13 @@ func TestWorker_ProbeSuccess(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(test.ExpectedProbeCalls)
// read from our channel
executedProbes := []probes.ProbeResult{}
go func() {
calls := 0
for range exChan {
for result := range exChan {
calls++
t.Logf("channel read %v", calls)
executedProbes = append(executedProbes, result)
wait.Done()
}
}()
Expand All @@ -135,7 +140,7 @@ func TestWorker_ProbeSuccess(t *testing.T) {
// cancel our context and wait for the probe to exit
cancel()
wait.Wait()
test.Validate(t, probeConfig, tTransport, test.ExpectedProbeCalls)
test.Validate(t, executedProbes, tTransport, test.ExpectedProbeCalls)
})
}
}
Loading