From 8dad7eaa52cbc015a20fc19a3f6a9bbd36650a31 Mon Sep 17 00:00:00 2001 From: Tharsanan1 Date: Thu, 24 Oct 2024 13:57:50 +0530 Subject: [PATCH] Fix subscription based ai ratelimit db flow --- common-controller/internal/cache/datastore.go | 43 +++---- .../controllers/cp/subscription_controller.go | 18 +-- .../dp/airatelimitpolicy_controller.go | 9 ++ .../internal/operator/operator.go | 2 +- .../internal/xds/ratelimiter_cache.go | 109 +++++++++--------- common-controller/internal/xds/server.go | 6 +- test/cucumber-tests/CRs/artifacts.yaml | 4 + 7 files changed, 90 insertions(+), 101 deletions(-) diff --git a/common-controller/internal/cache/datastore.go b/common-controller/internal/cache/datastore.go index eb3b00ff1..6ee9b1ecc 100644 --- a/common-controller/internal/cache/datastore.go +++ b/common-controller/internal/cache/datastore.go @@ -29,12 +29,12 @@ import ( // RatelimitDataStore is a cache for rate limit policies. type RatelimitDataStore struct { - resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy - resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy - customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef - mu sync.Mutex - aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec - subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{} + resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy + resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy + customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef + mu sync.Mutex + aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec + subscriptionBasedAIRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec } // CreateNewOperatorDataStore creates a new RatelimitDataStore. @@ -103,24 +103,16 @@ func (ods *RatelimitDataStore) AddorUpdateAIRatelimitToStore(rateLimit types.Nam ods.aiRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec } -// MarkAIRatelimitAsSubscriptionEnabled add an entry to specify an AI RatelimitPolicy is associated with a subscription -func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionEnabled(nn types.NamespacedName) { - ods.mu.Lock() - defer ods.mu.Unlock() - if ods.subscriptionEnabledAIRatelimitPolicies == nil { - ods.subscriptionEnabledAIRatelimitPolicies = make(map[types.NamespacedName]struct{}) - } - ods.subscriptionEnabledAIRatelimitPolicies[nn] = struct{}{} -} - -// MarkAIRatelimitAsSubscriptionDisabled deletes the entry which was added to specify an AI RatelimitPolicy is associated with a subscription -func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionDisabled(nn types.NamespacedName) { +// AddorUpdateSubscriptionBasedAIRatelimitToStore adds a new ratelimit to the RatelimitDataStore. +func (ods *RatelimitDataStore) AddorUpdateSubscriptionBasedAIRatelimitToStore(rateLimit types.NamespacedName, + aiRatelimitSpec dpv1alpha3.AIRateLimitPolicySpec) { ods.mu.Lock() defer ods.mu.Unlock() - if ods.subscriptionEnabledAIRatelimitPolicies == nil { - return + logger.Infof("Adding/Updating AI ratelimit spec to cache") + if ods.subscriptionBasedAIRatelimitPolicySpecs == nil { + ods.subscriptionBasedAIRatelimitPolicySpecs = make(map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) } - delete(ods.subscriptionEnabledAIRatelimitPolicies, nn) + ods.subscriptionBasedAIRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec } // GetResolveRatelimitPolicy get cached ratelimit @@ -148,11 +140,12 @@ func (ods *RatelimitDataStore) GetAIRatelimitPolicySpecs() map[types.NamespacedN return ods.aiRatelimitPolicySpecs } -// GetSubscriptionEnabledAIRatelimitPolicies gets all the AIRatelimitPolicy stored in ods -func (ods *RatelimitDataStore) GetSubscriptionEnabledAIRatelimitPolicies() map[types.NamespacedName]struct{} { - return ods.subscriptionEnabledAIRatelimitPolicies +// GetSubscriptionBasedAIRatelimitPolicySpecs gets all the AIRatelimitPolicy stored in ods +func (ods *RatelimitDataStore) GetSubscriptionBasedAIRatelimitPolicySpecs() map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec { + return ods.subscriptionBasedAIRatelimitPolicySpecs } + // DeleteResolveRatelimitPolicy delete from ratelimit cache func (ods *RatelimitDataStore) DeleteResolveRatelimitPolicy(rateLimit types.NamespacedName) { ods.mu.Lock() @@ -182,7 +175,7 @@ func (ods *RatelimitDataStore) DeleteSubscriptionBasedAIRatelimitPolicySpec(subs ods.mu.Lock() defer ods.mu.Unlock() logger.Debug("Deleting AI ratelimit from cache") - delete(ods.aiRatelimitPolicySpecs, subscription) + delete(ods.subscriptionBasedAIRatelimitPolicySpecs, subscription) } // NamespacedName generates namespaced name for Kubernetes objects diff --git a/common-controller/internal/operator/controllers/cp/subscription_controller.go b/common-controller/internal/operator/controllers/cp/subscription_controller.go index 53d0491d9..783636198 100644 --- a/common-controller/internal/operator/controllers/cp/subscription_controller.go +++ b/common-controller/internal/operator/controllers/cp/subscription_controller.go @@ -42,7 +42,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - xds "github.com/wso2/apk/common-controller/internal/xds" cpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha3" dpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3" ) @@ -61,11 +60,10 @@ const ( ) // NewSubscriptionController creates a new Subscription controller instance. -func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore, ratelimitStore *cache.RatelimitDataStore) error { +func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore) error { r := &SubscriptionReconciler{ client: mgr.GetClient(), ods: subscriptionStore, - rlODS: ratelimitStore, } ctx := context.Background() conf := config.ReadConfigs() @@ -136,20 +134,6 @@ func (subscriptionReconciler *SubscriptionReconciler) Reconcile(ctx context.Cont } } } else { - if subscription.Spec.RatelimitRef.Name != "" { - nn := types.NamespacedName{ - Namespace: subscription.Namespace, - Name: subscription.Spec.RatelimitRef.Name, - } - var airl dpv1alpha3.AIRateLimitPolicy - if err := subscriptionReconciler.client.Get(ctx, nn, &airl); err == nil { - subscriptionReconciler.rlODS.AddorUpdateAIRatelimitToStore(nn, airl.Spec) - subscriptionReconciler.rlODS.MarkAIRatelimitAsSubscriptionEnabled(nn) - xds.UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionReconciler.rlODS.GetSubscriptionEnabledAIRatelimitPolicies(), subscriptionReconciler.rlODS.GetAIRatelimitPolicySpecs()) - conf := config.ReadConfigs() - xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label) - } - } sendSubUpdates(subscription) utils.SendAddSubscriptionEvent(subscription) subscriptionReconciler.ods.AddorUpdateSubscriptionToStore(subscriptionKey, subscription.Spec) diff --git a/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go b/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go index 3cfc23466..4df42aa06 100644 --- a/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go +++ b/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go @@ -19,6 +19,7 @@ package dp import ( "context" + "strings" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -101,7 +102,9 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re loggers.LoggerAPKOperator.Errorf("Error retrieving AIRatelimit") // It could be deletion event. So lets try to delete the related entried from the ods and update xds r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey) + r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey) xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs()) + xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs()) xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label) } else { loggers.LoggerAPKOperator.Infof("ratelimits found") @@ -112,9 +115,15 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re r.ods.AddorUpdateAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec) xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs()) xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label) + } else if strings.EqualFold(string(ratelimitPolicy.Spec.TargetRef.Kind), "Subscription") { + r.ods.AddorUpdateSubscriptionBasedAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec) + xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs()) + xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label) } else { r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey) + r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey) xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs()) + xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs()) xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label) } } diff --git a/common-controller/internal/operator/operator.go b/common-controller/internal/operator/operator.go index 77684918c..6950f8771 100644 --- a/common-controller/internal/operator/operator.go +++ b/common-controller/internal/operator/operator.go @@ -186,7 +186,7 @@ func InitOperator(metricsConfig config.Metrics) { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3115, logging.MAJOR, "Error creating Application controller, error: %v", err)) } - if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore, ratelimitStore); err != nil { + if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3116, logging.MAJOR, "Error creating Subscription controller, error: %v", err)) } diff --git a/common-controller/internal/xds/ratelimiter_cache.go b/common-controller/internal/xds/ratelimiter_cache.go index 0b0d0c251..e8be423ab 100644 --- a/common-controller/internal/xds/ratelimiter_cache.go +++ b/common-controller/internal/xds/ratelimiter_cache.go @@ -333,72 +333,71 @@ func (r *rateLimitPolicyCache) AddCustomRateLimitPolicies(customRateLimitPolicy } // ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache process the specs and update the cache -func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) { +func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) { aiRlDescriptors := make([]*rls_config.RateLimitDescriptor, 0) - for namespacedNameRl := range subscriptionEnabledAIRatelimitPolicies { - if airl, exists := aiRatelimitPolicySpecs[namespacedNameRl]; exists { - if airl.Override.TokenCount != nil { - // Add descriptor for RequestTokenCount - aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ - Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount, - Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), - Descriptors: []*rls_config.RateLimitDescriptor{ - { - Key: DescriptorKeyForSubscription, - RateLimit: &rls_config.RateLimitPolicy{ - Unit: getRateLimitUnit(airl.Override.TokenCount.Unit), - RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount), - }, + for namespacedNameRl, airl := range aiRatelimitPolicySpecs { + if airl.Override.TokenCount != nil { + // Add descriptor for RequestTokenCount + aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ + Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount, + Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), + Descriptors: []*rls_config.RateLimitDescriptor{ + { + Key: DescriptorKeyForSubscription, + RateLimit: &rls_config.RateLimitPolicy{ + Unit: getRateLimitUnit(airl.Override.TokenCount.Unit), + RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount), }, }, - }) - // Add descriptor for ResponseTokenCount - aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ - Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount, - Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), - Descriptors: []*rls_config.RateLimitDescriptor{ - { - Key: DescriptorKeyForSubscription, - RateLimit: &rls_config.RateLimitPolicy{ - Unit: getRateLimitUnit(airl.Override.TokenCount.Unit), - RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount), - }, + }, + }) + // Add descriptor for ResponseTokenCount + aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ + Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount, + Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), + Descriptors: []*rls_config.RateLimitDescriptor{ + { + Key: DescriptorKeyForSubscription, + RateLimit: &rls_config.RateLimitPolicy{ + Unit: getRateLimitUnit(airl.Override.TokenCount.Unit), + RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount), }, }, - }) - // Add descriptor for TotalTokenCount - aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ - Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount, - Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), - Descriptors: []*rls_config.RateLimitDescriptor{ - { - Key: DescriptorKeyForSubscription, - RateLimit: &rls_config.RateLimitPolicy{ - Unit: getRateLimitUnit(airl.Override.TokenCount.Unit), - RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount), - }, + }, + }) + // Add descriptor for TotalTokenCount + aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ + Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount, + Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), + Descriptors: []*rls_config.RateLimitDescriptor{ + { + Key: DescriptorKeyForSubscription, + RateLimit: &rls_config.RateLimitPolicy{ + Unit: getRateLimitUnit(airl.Override.TokenCount.Unit), + RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount), }, }, - }) - } - // Add descriptor for RequestCount - if airl.Override.RequestCount != nil { - aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ - Key: DescriptorKeyForSubscriptionBasedAIRequestCount, - Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), - Descriptors: []*rls_config.RateLimitDescriptor{ - { - Key: DescriptorKeyForSubscription, - RateLimit: &rls_config.RateLimitPolicy{ - Unit: getRateLimitUnit(airl.Override.RequestCount.Unit), - RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit), - }, + }, + }) + } + // Add descriptor for RequestCount + if airl.Override.RequestCount != nil { + aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ + Key: DescriptorKeyForSubscriptionBasedAIRequestCount, + Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), + Descriptors: []*rls_config.RateLimitDescriptor{ + { + Key: DescriptorKeyForSubscription, + RateLimit: &rls_config.RateLimitPolicy{ + Unit: getRateLimitUnit(airl.Override.RequestCount.Unit), + RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit), }, }, - }) - } + }, + }) } } + r.subscriptionBasedAIRatelimitDescriptors = aiRlDescriptors } diff --git a/common-controller/internal/xds/server.go b/common-controller/internal/xds/server.go index ff8d8375d..fca590a81 100644 --- a/common-controller/internal/xds/server.go +++ b/common-controller/internal/xds/server.go @@ -169,9 +169,9 @@ func UpdateRateLimitXDSCacheForAIRatelimitPolicies(aiRatelimitPolicySpecs map[ap rlsPolicyCache.ProcessAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs) } -// UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies. -func UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionEnabledAIRatelimitPolicies map[apimachiner_types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) { - rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies, aiRatelimitPolicySpecs) +// UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies. +func UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) { + rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs) } // DeleteAPILevelRateLimitPolicies delete the ratelimit xds cache diff --git a/test/cucumber-tests/CRs/artifacts.yaml b/test/cucumber-tests/CRs/artifacts.yaml index f05a42ca1..dfe9c4b48 100644 --- a/test/cucumber-tests/CRs/artifacts.yaml +++ b/test/cucumber-tests/CRs/artifacts.yaml @@ -1342,6 +1342,10 @@ spec: requestCount: requestsPerUnit: 6000 unit: Minute + targetRef: + kind: Subscription + name: llm-backend-header + group: gateway.networking.k8s.io --- apiVersion: dp.wso2.com/v1alpha3 kind: APIPolicy