Skip to content

Commit

Permalink
Fix: add a close channel to handle shutdowns gracefully (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
UndeadRat22 authored Mar 21, 2023
1 parent 2dca1ee commit 2df460e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 18 deletions.
53 changes: 35 additions & 18 deletions internal/services/controller/informers/podmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ import (
"k8s.io/utils/clock"
)

const fetchInterval = 30 * time.Second

type PodMetricsInformer interface {
Informer() cache.SharedIndexInformer
}

type metricsWatch struct {
resultChan chan watch.Event
client versioned.Interface
log logrus.FieldLogger
listOptions metav1.ListOptions
resultChan chan watch.Event
closeChan chan struct{}
client versioned.Interface
log logrus.FieldLogger
listOptions metav1.ListOptions
fetchInterval time.Duration
}

func NewMetricsWatch(
Expand All @@ -33,22 +37,31 @@ func NewMetricsWatch(
client versioned.Interface,
listOptions metav1.ListOptions,
) watch.Interface {
metrics := &metricsWatch{
resultChan: make(chan watch.Event),
log: log,
client: client,
listOptions: withDefaultTimeout(listOptions),
}
metrics := newMetricsWatch(log, client, listOptions, fetchInterval)

go metrics.Start(ctx)

return metrics
}

func newMetricsWatch(log logrus.FieldLogger,
client versioned.Interface,
listOptions metav1.ListOptions,
fetchInterval time.Duration) *metricsWatch {
return &metricsWatch{
closeChan: make(chan struct{}, 1),
resultChan: make(chan watch.Event),
log: log,
client: client,
listOptions: withDefaultTimeout(listOptions),
fetchInterval: fetchInterval,
}
}

func (m *metricsWatch) Start(ctx context.Context) {
m.log.Infof("Starting pod metrics polling")
const fetchInterval = 30 * time.Second
backoff := wait.NewExponentialBackoffManager(fetchInterval, 5*time.Minute, fetchInterval, 2, 0.2, clock.RealClock{})

backoff := wait.NewExponentialBackoffManager(m.fetchInterval, 5*time.Minute, m.fetchInterval, 2, 0.2, clock.RealClock{})
wait.BackoffUntil(func() {
result, err := m.client.MetricsV1beta1().
PodMetricses("").
Expand All @@ -66,17 +79,21 @@ func (m *metricsWatch) Start(ctx context.Context) {
select {
case <-ctx.Done():
return
case m.resultChan <- watch.Event{
Type: watch.Modified,
Object: &metrics,
}:
case <-m.closeChan:
close(m.resultChan)
return
default:
m.resultChan <- watch.Event{
Type: watch.Modified,
Object: &metrics,
}
}
}
}, backoff, true, ctx.Done())
}, backoff, true, m.closeChan)
}

func (m *metricsWatch) Stop() {
close(m.resultChan)
m.closeChan <- struct{}{}
}

func (m *metricsWatch) ResultChan() <-chan watch.Event {
Expand Down
64 changes: 64 additions & 0 deletions internal/services/controller/informers/podmetrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package informers

import (
"context"
"testing"
"time"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stesting "k8s.io/client-go/testing"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fake_metrics "k8s.io/metrics/pkg/client/clientset/versioned/fake"
)

func Test_metricsWatch(t *testing.T) {
t.Run("should not panic when metrics watch is started after Stop is called", func(t *testing.T) {
ctx := context.Background()
log := logrus.New()

metrics := &v1beta1.PodMetricsList{Items: []v1beta1.PodMetrics{
{
ObjectMeta: metav1.ObjectMeta{
Name: "p1",
Namespace: "",
},
Containers: []v1beta1.ContainerMetrics{
{
Name: "container1",
Usage: v1.ResourceList{
"memory": resource.MustParse("10Gi"),
"cpu": resource.MustParse("10"),
},
},
},
},
}}

metricsClient := fake_metrics.NewSimpleClientset()
metricsClient.PrependReactor("*", "*", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, metrics, nil
})

w := newMetricsWatch(log, metricsClient, metav1.ListOptions{}, time.Second)

// Manually stop the watch before starting the watch.
w.Stop()
go w.Start(ctx)

read:
for {
select {
case <-w.ResultChan():
continue
default:
break read
}
}

<-time.After(time.Second * 2)
})
}

0 comments on commit 2df460e

Please sign in to comment.