From 2da7e7ecba5c5c892d0406d5d6ab14807e185f06 Mon Sep 17 00:00:00 2001 From: David Ortiz Date: Wed, 13 Jan 2021 19:17:21 +0100 Subject: [PATCH] controllers: add first version of rate limit controller --- controllers/ratelimit_controller.go | 152 +++++++++++++++++++- controllers/ratelimit_controller_test.go | 169 +++++++++++++++++++++++ controllers/suite_test.go | 27 ++++ main.go | 12 +- pkg/helpers/helpers.go | 10 ++ 5 files changed, 360 insertions(+), 10 deletions(-) create mode 100644 controllers/ratelimit_controller_test.go create mode 100644 pkg/helpers/helpers.go diff --git a/controllers/ratelimit_controller.go b/controllers/ratelimit_controller.go index 4a500ba5..f0c6634f 100644 --- a/controllers/ratelimit_controller.go +++ b/controllers/ratelimit_controller.go @@ -18,20 +18,56 @@ package controllers import ( "context" - + "github.com/3scale/limitador-operator/pkg/helpers" + "github.com/3scale/limitador-operator/pkg/limitador" "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "net/url" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "strconv" limitadorv1alpha1 "github.com/3scale/limitador-operator/api/v1alpha1" ) +const rateLimitFinalizer = "finalizer.ratelimit.limitador.3scale.net" + +// Assumes that there's only one Limitador per namespace. We might want to +// change this in the future. +type LimitadorServiceDiscovery interface { + URL(namespace string) (*url.URL, error) +} + +type defaultLimitadorServiceDiscovery struct{} + +func (LimitadorServiceDiscovery *defaultLimitadorServiceDiscovery) URL(namespace string) (*url.URL, error) { + serviceUrl := "http://" + limitador.ServiceName + "." + namespace + ".svc.cluster.local:" + + strconv.Itoa(limitador.ServiceHTTPPort) + + return url.Parse(serviceUrl) +} + // RateLimitReconciler reconciles a RateLimit object type RateLimitReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme + Log logr.Logger + Scheme *runtime.Scheme + limitadorDiscovery LimitadorServiceDiscovery +} + +func NewRateLimitReconciler(kubeClient client.Client, logger logr.Logger, scheme *runtime.Scheme) RateLimitReconciler { + limitadorServiceDiscovery := defaultLimitadorServiceDiscovery{} + + return RateLimitReconciler{ + Client: kubeClient, + Log: logger, + Scheme: scheme, + limitadorDiscovery: &limitadorServiceDiscovery, + } } // +kubebuilder:rbac:groups=limitador.3scale.net,resources=ratelimits,verbs=get;list;watch;create;update;patch;delete @@ -39,9 +75,44 @@ type RateLimitReconciler struct { func (r *RateLimitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { _ = context.Background() - _ = r.Log.WithValues("ratelimit", req.NamespacedName) + reqLogger := r.Log.WithValues("ratelimit", req.NamespacedName) + + limit := &limitadorv1alpha1.RateLimit{} + if err := r.Get(context.TODO(), req.NamespacedName, limit); err != nil { + if errors.IsNotFound(err) { + return ctrl.Result{}, nil + } + + reqLogger.Error(err, "Failed to get RateLimit object.") + return ctrl.Result{}, err + } + + isLimitMarkedToBeDeleted := limit.GetDeletionTimestamp() != nil + if isLimitMarkedToBeDeleted { + if helpers.Contains(limit.GetFinalizers(), rateLimitFinalizer) { + if err := r.finalizeRateLimit(limit); err != nil { + return ctrl.Result{}, err + } + + // Remove finalizer. Once all finalizers have been removed, the + // object will be deleted. + controllerutil.RemoveFinalizer(limit, rateLimitFinalizer) + if err := r.Update(context.TODO(), limit); err != nil { + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil + } + + if err := r.ensureFinalizerIsAdded(limit, reqLogger); err != nil { + return ctrl.Result{}, err + } - // your logic here + if err := r.createLimitInLimitador(limit); err != nil { + reqLogger.Error(err, "Failed to create rate limit in Limitador.") + return ctrl.Result{}, err + } return ctrl.Result{}, nil } @@ -49,5 +120,76 @@ func (r *RateLimitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { func (r *RateLimitReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&limitadorv1alpha1.RateLimit{}). + WithEventFilter(r.updateLimitPredicate()). Complete(r) } + +// This should be temporary. This is not how a filter should be used. However, +// with the current Limitador API, when updating a limit, we need both the +// current and the previous version. After updating the Limitador API to work +// with IDs, this won't be needed. +func (r *RateLimitReconciler) updateLimitPredicate() predicate.Predicate { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldVersion := e.ObjectOld.(*limitadorv1alpha1.RateLimit) + newVersion := e.ObjectNew.(*limitadorv1alpha1.RateLimit) + + if oldVersion.ObjectMeta.Generation == newVersion.ObjectMeta.Generation { + return false + } + + // The namespace should be the same in the old and the new version, + // so we can use either. + limitadorUrl, err := r.limitadorDiscovery.URL(newVersion.Namespace) + if err != nil { + return false + } + + limitadorClient := limitador.NewClient(*limitadorUrl) + + // Try to create the new version even if the old one can't be + // deleted. This might leave in Limitador limits that should no + // longer be there. As this function should only be temporary this + // should be fine for a first version of the controller. + _ = limitadorClient.DeleteLimit(&oldVersion.Spec) + + return true + }, + } +} + +func (r *RateLimitReconciler) createLimitInLimitador(limit *limitadorv1alpha1.RateLimit) error { + limitadorUrl, err := r.limitadorDiscovery.URL(limit.Namespace) + if err != nil { + return err + } + + limitadorClient := limitador.NewClient(*limitadorUrl) + return limitadorClient.CreateLimit(&limit.Spec) +} + +func (r *RateLimitReconciler) ensureFinalizerIsAdded(limit *limitadorv1alpha1.RateLimit, reqLogger logr.Logger) error { + numberOfFinalizers := len(limit.GetFinalizers()) + controllerutil.AddFinalizer(limit, rateLimitFinalizer) + if numberOfFinalizers == len(limit.GetFinalizers()) { + // The finalizer was already there, no need to update + return nil + } + + if err := r.Update(context.TODO(), limit); err != nil { + reqLogger.Error(err, "Failed to update the rate limit with finalizer") + return err + } + + return nil +} + +func (r *RateLimitReconciler) finalizeRateLimit(rateLimit *limitadorv1alpha1.RateLimit) error { + limitadorUrl, err := r.limitadorDiscovery.URL(rateLimit.Namespace) + if err != nil { + return err + } + + limitadorClient := limitador.NewClient(*limitadorUrl) + return limitadorClient.DeleteLimit(&rateLimit.Spec) +} diff --git a/controllers/ratelimit_controller_test.go b/controllers/ratelimit_controller_test.go new file mode 100644 index 00000000..b50c26d7 --- /dev/null +++ b/controllers/ratelimit_controller_test.go @@ -0,0 +1,169 @@ +package controllers + +import ( + "context" + limitadorv1alpha1 "github.com/3scale/limitador-operator/api/v1alpha1" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/ghttp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/uuid" + "sigs.k8s.io/controller-runtime/pkg/client" + "sync" + "time" +) + +var _ = Describe("RateLimit controller", func() { + const ( + timeout = time.Second * 10 + interval = time.Millisecond * 250 + ) + + // Used to generate a different limit on every test so they don't collide. + var newRateLimit = func() limitadorv1alpha1.RateLimit { + // The name can't start with a number. + name := "a" + string(uuid.NewUUID()) + + return limitadorv1alpha1.RateLimit{ + TypeMeta: metav1.TypeMeta{ + Kind: "RateLimit", + APIVersion: "limitador.3scale.net/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: limitadorv1alpha1.RateLimitSpec{ + Conditions: []string{"req.method == GET"}, + MaxValue: 10, + Namespace: "test-namespace", + Seconds: 60, + Variables: []string{"user_id"}, + }, + } + } + + // The next couple of functions are useful to verify that an HTTP request is + // made after a call to the kubernetesClient. + // The functions are wrappers for k8sClient.Create and k8sClient.Delete, so + // the signature is the same. + // We know that after creating, deleting, etc. a RateLimit CR, an HTTP + // request is made to create, delete, etc. the limit in Limitador. These + // functions are useful for waiting until the state is synchronized. + + // Wraps a function with the same signature as k8sClient.Create and waits + // for an HTTP request. + var runCreateAndWaitHTTPReq = func(f func(ctx context.Context, + object runtime.Object, + opts ...client.CreateOption, + ) error) func(ctx context.Context, object runtime.Object, opts ...client.CreateOption) error { + return func(ctx context.Context, object runtime.Object, opts ...client.CreateOption) error { + reqsAtStart := len(mockedHTTPServer.ReceivedRequests()) + + err := f(ctx, object, opts...) + if err != nil { + return err + } + + Eventually(func() bool { + return len(mockedHTTPServer.ReceivedRequests()) > reqsAtStart + }, timeout, interval).Should(BeTrue()) + + return nil + } + } + + // Wraps a function with the same signature as k8sClient.Delete and waits + // for an HTTP request. + var runDeleteAndWaitHTTPReq = func(f func(ctx context.Context, + object runtime.Object, + opts ...client.DeleteOption, + ) error) func(ctx context.Context, object runtime.Object, opts ...client.DeleteOption) error { + return func(ctx context.Context, object runtime.Object, opts ...client.DeleteOption) error { + reqsAtStart := len(mockedHTTPServer.ReceivedRequests()) + + err := f(ctx, object, opts...) + if err != nil { + return err + } + + Eventually(func() bool { + return len(mockedHTTPServer.ReceivedRequests()) > reqsAtStart + }, timeout, interval).Should(BeTrue()) + + return nil + } + } + + var addHandlerForLimitCreation = func(limitSpecJson string) { + mockedHTTPServer.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("POST", "/limits"), + ghttp.VerifyJSON(limitSpecJson), + ), + ) + } + + var addHandlerForLimitDeletion = func(limitSpecJson string) { + mockedHTTPServer.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("DELETE", "/limits"), + ghttp.VerifyJSON(limitSpecJson), + ), + ) + } + + // These tests make HTTP requests to the same mocked server. Running them in + // parallel makes it difficult to reason about them. + var sequentialTestLock sync.Mutex + + BeforeEach(func() { + sequentialTestLock.Lock() + defer sequentialTestLock.Unlock() + mockedHTTPServer.Reset() + }) + + Context("Creating a new RateLimit object", func() { + testLimit := newRateLimit() + testLimitSpecJson, _ := json.Marshal(testLimit.Spec) + + BeforeEach(func() { + addHandlerForLimitCreation(string(testLimitSpecJson)) + }) + + AfterEach(func() { + Expect(runDeleteAndWaitHTTPReq(k8sClient.Delete)( + context.TODO(), &testLimit, + )).Should(Succeed()) + }) + + It("Should create a limit in Limitador", func() { + Expect(runCreateAndWaitHTTPReq(k8sClient.Create)( + context.TODO(), &testLimit, + )).Should(Succeed()) + }) + }) + + Context("Deleting a RateLimit object", func() { + testLimit := newRateLimit() + testLimitSpecJson, _ := json.Marshal(testLimit.Spec) + + BeforeEach(func() { + addHandlerForLimitCreation(string(testLimitSpecJson)) + + Expect(runCreateAndWaitHTTPReq(k8sClient.Create)( + context.TODO(), &testLimit, + )).Should(Succeed()) + + addHandlerForLimitDeletion(string(testLimitSpecJson)) + }) + + It("Should delete the limit in Limitador", func() { + Expect(runDeleteAndWaitHTTPReq(k8sClient.Delete)( + context.TODO(), &testLimit, + )).Should(Succeed()) + }) + }) +}) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 798e624d..9ed1d7a5 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -19,8 +19,10 @@ package controllers import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/onsi/gomega/ghttp" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + "net/url" "path/filepath" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,6 +42,7 @@ import ( var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment +var mockedHTTPServer *ghttp.Server func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -49,6 +52,15 @@ func TestAPIs(t *testing.T) { []Reporter{printer.NewlineReporter{}}) } +// In the tests, this just points to our mocked HTTP server +type TestLimitadorServiceDiscovery struct { + url url.URL +} + +func (sd *TestLimitadorServiceDiscovery) URL(_ string) (*url.URL, error) { + return &sd.url, nil +} + var _ = BeforeSuite(func(done Done) { logf.SetLogger(zap.LoggerTo(GinkgoWriter, true)) @@ -82,6 +94,21 @@ var _ = BeforeSuite(func(done Done) { }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) + mockedHTTPServer = ghttp.NewServer() + mockedHTTPServerURL, err := url.Parse(mockedHTTPServer.URL()) + Expect(err).ToNot(HaveOccurred()) + + // Set this to true so we don't have to specify all the requests, including + // the ones for example done for cleanup in AfterEach() functions. + mockedHTTPServer.SetAllowUnhandledRequests(true) + + err = (&RateLimitReconciler{ + Client: k8sManager.GetClient(), + Log: ctrl.Log.WithName("limitador"), + limitadorDiscovery: &TestLimitadorServiceDiscovery{url: *mockedHTTPServerURL}, + }).SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + go func() { err = k8sManager.Start(ctrl.SetupSignalHandler()) Expect(err).ToNot(HaveOccurred()) diff --git a/main.go b/main.go index c5f0978e..4b17e504 100644 --- a/main.go +++ b/main.go @@ -67,14 +67,16 @@ func main() { os.Exit(1) } - if err = (&controllers.RateLimitReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("RateLimit"), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + rateLimitReconciler := controllers.NewRateLimitReconciler( + mgr.GetClient(), + ctrl.Log.WithName("controllers").WithName("RateLimit"), + mgr.GetScheme(), + ) + if err = rateLimitReconciler.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RateLimit") os.Exit(1) } + if err = (&controllers.LimitadorReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Limitador"), diff --git a/pkg/helpers/helpers.go b/pkg/helpers/helpers.go new file mode 100644 index 00000000..9fdb8aa6 --- /dev/null +++ b/pkg/helpers/helpers.go @@ -0,0 +1,10 @@ +package helpers + +func Contains(list []string, s string) bool { + for _, v := range list { + if v == s { + return true + } + } + return false +}