From 2df460e1e2167de789384209344f7d81ad917b1f Mon Sep 17 00:00:00 2001 From: Valdas Rakutis <36191877+UndeadRat22@users.noreply.github.com> Date: Tue, 21 Mar 2023 19:28:47 +0200 Subject: [PATCH] Fix: add a close channel to handle shutdowns gracefully (#111) --- .../controller/informers/podmetrics.go | 53 +++++++++------ .../controller/informers/podmetrics_test.go | 64 +++++++++++++++++++ 2 files changed, 99 insertions(+), 18 deletions(-) create mode 100644 internal/services/controller/informers/podmetrics_test.go diff --git a/internal/services/controller/informers/podmetrics.go b/internal/services/controller/informers/podmetrics.go index 52901e3d..0e4c1f8d 100644 --- a/internal/services/controller/informers/podmetrics.go +++ b/internal/services/controller/informers/podmetrics.go @@ -16,15 +16,19 @@ import ( "k8s.io/utils/clock" ) +const fetchInterval = 30 * time.Second + type PodMetricsInformer interface { Informer() cache.SharedIndexInformer } type metricsWatch struct { - resultChan chan watch.Event - client versioned.Interface - log logrus.FieldLogger - listOptions metav1.ListOptions + resultChan chan watch.Event + closeChan chan struct{} + client versioned.Interface + log logrus.FieldLogger + listOptions metav1.ListOptions + fetchInterval time.Duration } func NewMetricsWatch( @@ -33,22 +37,31 @@ func NewMetricsWatch( client versioned.Interface, listOptions metav1.ListOptions, ) watch.Interface { - metrics := &metricsWatch{ - resultChan: make(chan watch.Event), - log: log, - client: client, - listOptions: withDefaultTimeout(listOptions), - } + metrics := newMetricsWatch(log, client, listOptions, fetchInterval) go metrics.Start(ctx) return metrics } +func newMetricsWatch(log logrus.FieldLogger, + client versioned.Interface, + listOptions metav1.ListOptions, + fetchInterval time.Duration) *metricsWatch { + return &metricsWatch{ + closeChan: make(chan struct{}, 1), + resultChan: make(chan watch.Event), + log: log, + client: client, + listOptions: withDefaultTimeout(listOptions), + fetchInterval: fetchInterval, + } +} + func (m *metricsWatch) Start(ctx context.Context) { m.log.Infof("Starting pod metrics polling") - const fetchInterval = 30 * time.Second - backoff := wait.NewExponentialBackoffManager(fetchInterval, 5*time.Minute, fetchInterval, 2, 0.2, clock.RealClock{}) + + backoff := wait.NewExponentialBackoffManager(m.fetchInterval, 5*time.Minute, m.fetchInterval, 2, 0.2, clock.RealClock{}) wait.BackoffUntil(func() { result, err := m.client.MetricsV1beta1(). PodMetricses(""). @@ -66,17 +79,21 @@ func (m *metricsWatch) Start(ctx context.Context) { select { case <-ctx.Done(): return - case m.resultChan <- watch.Event{ - Type: watch.Modified, - Object: &metrics, - }: + case <-m.closeChan: + close(m.resultChan) + return + default: + m.resultChan <- watch.Event{ + Type: watch.Modified, + Object: &metrics, + } } } - }, backoff, true, ctx.Done()) + }, backoff, true, m.closeChan) } func (m *metricsWatch) Stop() { - close(m.resultChan) + m.closeChan <- struct{}{} } func (m *metricsWatch) ResultChan() <-chan watch.Event { diff --git a/internal/services/controller/informers/podmetrics_test.go b/internal/services/controller/informers/podmetrics_test.go new file mode 100644 index 00000000..2b2ef43d --- /dev/null +++ b/internal/services/controller/informers/podmetrics_test.go @@ -0,0 +1,64 @@ +package informers + +import ( + "context" + "testing" + "time" + + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stesting "k8s.io/client-go/testing" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + fake_metrics "k8s.io/metrics/pkg/client/clientset/versioned/fake" +) + +func Test_metricsWatch(t *testing.T) { + t.Run("should not panic when metrics watch is started after Stop is called", func(t *testing.T) { + ctx := context.Background() + log := logrus.New() + + metrics := &v1beta1.PodMetricsList{Items: []v1beta1.PodMetrics{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "p1", + Namespace: "", + }, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container1", + Usage: v1.ResourceList{ + "memory": resource.MustParse("10Gi"), + "cpu": resource.MustParse("10"), + }, + }, + }, + }, + }} + + metricsClient := fake_metrics.NewSimpleClientset() + metricsClient.PrependReactor("*", "*", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, metrics, nil + }) + + w := newMetricsWatch(log, metricsClient, metav1.ListOptions{}, time.Second) + + // Manually stop the watch before starting the watch. + w.Stop() + go w.Start(ctx) + + read: + for { + select { + case <-w.ResultChan(): + continue + default: + break read + } + } + + <-time.After(time.Second * 2) + }) +}