Skip to content

Commit

Permalink
Fix subscription based ai ratelimit db flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Tharsanan1 committed Oct 24, 2024
1 parent 5037857 commit 8dad7ea
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 101 deletions.
43 changes: 18 additions & 25 deletions common-controller/internal/cache/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (

// RatelimitDataStore is a cache for rate limit policies.
type RatelimitDataStore struct {
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
subscriptionBasedAIRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
}

// CreateNewOperatorDataStore creates a new RatelimitDataStore.
Expand Down Expand Up @@ -103,24 +103,16 @@ func (ods *RatelimitDataStore) AddorUpdateAIRatelimitToStore(rateLimit types.Nam
ods.aiRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec
}

// MarkAIRatelimitAsSubscriptionEnabled add an entry to specify an AI RatelimitPolicy is associated with a subscription
func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionEnabled(nn types.NamespacedName) {
ods.mu.Lock()
defer ods.mu.Unlock()
if ods.subscriptionEnabledAIRatelimitPolicies == nil {
ods.subscriptionEnabledAIRatelimitPolicies = make(map[types.NamespacedName]struct{})
}
ods.subscriptionEnabledAIRatelimitPolicies[nn] = struct{}{}
}

// MarkAIRatelimitAsSubscriptionDisabled deletes the entry which was added to specify an AI RatelimitPolicy is associated with a subscription
func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionDisabled(nn types.NamespacedName) {
// AddorUpdateSubscriptionBasedAIRatelimitToStore adds a new ratelimit to the RatelimitDataStore.
func (ods *RatelimitDataStore) AddorUpdateSubscriptionBasedAIRatelimitToStore(rateLimit types.NamespacedName,
aiRatelimitSpec dpv1alpha3.AIRateLimitPolicySpec) {
ods.mu.Lock()
defer ods.mu.Unlock()
if ods.subscriptionEnabledAIRatelimitPolicies == nil {
return
logger.Infof("Adding/Updating AI ratelimit spec to cache")
if ods.subscriptionBasedAIRatelimitPolicySpecs == nil {
ods.subscriptionBasedAIRatelimitPolicySpecs = make(map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec)
}
delete(ods.subscriptionEnabledAIRatelimitPolicies, nn)
ods.subscriptionBasedAIRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec
}

// GetResolveRatelimitPolicy get cached ratelimit
Expand Down Expand Up @@ -148,11 +140,12 @@ func (ods *RatelimitDataStore) GetAIRatelimitPolicySpecs() map[types.NamespacedN
return ods.aiRatelimitPolicySpecs
}

// GetSubscriptionEnabledAIRatelimitPolicies gets all the AIRatelimitPolicy stored in ods
func (ods *RatelimitDataStore) GetSubscriptionEnabledAIRatelimitPolicies() map[types.NamespacedName]struct{} {
return ods.subscriptionEnabledAIRatelimitPolicies
// GetSubscriptionBasedAIRatelimitPolicySpecs gets all the AIRatelimitPolicy stored in ods
func (ods *RatelimitDataStore) GetSubscriptionBasedAIRatelimitPolicySpecs() map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec {
return ods.subscriptionBasedAIRatelimitPolicySpecs
}


// DeleteResolveRatelimitPolicy delete from ratelimit cache
func (ods *RatelimitDataStore) DeleteResolveRatelimitPolicy(rateLimit types.NamespacedName) {
ods.mu.Lock()
Expand Down Expand Up @@ -182,7 +175,7 @@ func (ods *RatelimitDataStore) DeleteSubscriptionBasedAIRatelimitPolicySpec(subs
ods.mu.Lock()
defer ods.mu.Unlock()
logger.Debug("Deleting AI ratelimit from cache")
delete(ods.aiRatelimitPolicySpecs, subscription)
delete(ods.subscriptionBasedAIRatelimitPolicySpecs, subscription)
}

// NamespacedName generates namespaced name for Kubernetes objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

xds "github.com/wso2/apk/common-controller/internal/xds"
cpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha3"
dpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3"
)
Expand All @@ -61,11 +60,10 @@ const (
)

// NewSubscriptionController creates a new Subscription controller instance.
func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore, ratelimitStore *cache.RatelimitDataStore) error {
func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore) error {
r := &SubscriptionReconciler{
client: mgr.GetClient(),
ods: subscriptionStore,
rlODS: ratelimitStore,
}
ctx := context.Background()
conf := config.ReadConfigs()
Expand Down Expand Up @@ -136,20 +134,6 @@ func (subscriptionReconciler *SubscriptionReconciler) Reconcile(ctx context.Cont
}
}
} else {
if subscription.Spec.RatelimitRef.Name != "" {
nn := types.NamespacedName{
Namespace: subscription.Namespace,
Name: subscription.Spec.RatelimitRef.Name,
}
var airl dpv1alpha3.AIRateLimitPolicy
if err := subscriptionReconciler.client.Get(ctx, nn, &airl); err == nil {
subscriptionReconciler.rlODS.AddorUpdateAIRatelimitToStore(nn, airl.Spec)
subscriptionReconciler.rlODS.MarkAIRatelimitAsSubscriptionEnabled(nn)
xds.UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionReconciler.rlODS.GetSubscriptionEnabledAIRatelimitPolicies(), subscriptionReconciler.rlODS.GetAIRatelimitPolicySpecs())
conf := config.ReadConfigs()
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
sendSubUpdates(subscription)
utils.SendAddSubscriptionEvent(subscription)
subscriptionReconciler.ods.AddorUpdateSubscriptionToStore(subscriptionKey, subscription.Spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dp

import (
"context"
"strings"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -101,7 +102,9 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re
loggers.LoggerAPKOperator.Errorf("Error retrieving AIRatelimit")
// It could be deletion event. So lets try to delete the related entried from the ods and update xds
r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey)
r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else {
loggers.LoggerAPKOperator.Infof("ratelimits found")
Expand All @@ -112,9 +115,15 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re
r.ods.AddorUpdateAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else if strings.EqualFold(string(ratelimitPolicy.Spec.TargetRef.Kind), "Subscription") {
r.ods.AddorUpdateSubscriptionBasedAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec)
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else {
r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey)
r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
Expand Down
2 changes: 1 addition & 1 deletion common-controller/internal/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func InitOperator(metricsConfig config.Metrics) {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3115, logging.MAJOR,
"Error creating Application controller, error: %v", err))
}
if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore, ratelimitStore); err != nil {
if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore); err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3116, logging.MAJOR,
"Error creating Subscription controller, error: %v", err))
}
Expand Down
109 changes: 54 additions & 55 deletions common-controller/internal/xds/ratelimiter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,72 +333,71 @@ func (r *rateLimitPolicyCache) AddCustomRateLimitPolicies(customRateLimitPolicy
}

// ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache process the specs and update the cache
func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
aiRlDescriptors := make([]*rls_config.RateLimitDescriptor, 0)
for namespacedNameRl := range subscriptionEnabledAIRatelimitPolicies {
if airl, exists := aiRatelimitPolicySpecs[namespacedNameRl]; exists {
if airl.Override.TokenCount != nil {
// Add descriptor for RequestTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount),
},
for namespacedNameRl, airl := range aiRatelimitPolicySpecs {
if airl.Override.TokenCount != nil {
// Add descriptor for RequestTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount),
},
},
})
// Add descriptor for ResponseTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount),
},
},
})
// Add descriptor for ResponseTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount),
},
},
})
// Add descriptor for TotalTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount),
},
},
})
// Add descriptor for TotalTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount),
},
},
})
}
// Add descriptor for RequestCount
if airl.Override.RequestCount != nil {
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.RequestCount.Unit),
RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit),
},
},
})
}
// Add descriptor for RequestCount
if airl.Override.RequestCount != nil {
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.RequestCount.Unit),
RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit),
},
},
})
}
},
})
}
}

r.subscriptionBasedAIRatelimitDescriptors = aiRlDescriptors
}

Expand Down
6 changes: 3 additions & 3 deletions common-controller/internal/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ func UpdateRateLimitXDSCacheForAIRatelimitPolicies(aiRatelimitPolicySpecs map[ap
rlsPolicyCache.ProcessAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs)
}

// UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies.
func UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionEnabledAIRatelimitPolicies map[apimachiner_types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies, aiRatelimitPolicySpecs)
// UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies.
func UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs)
}

// DeleteAPILevelRateLimitPolicies delete the ratelimit xds cache
Expand Down
4 changes: 4 additions & 0 deletions test/cucumber-tests/CRs/artifacts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,10 @@ spec:
requestCount:
requestsPerUnit: 6000
unit: Minute
targetRef:
kind: Subscription
name: llm-backend-header
group: gateway.networking.k8s.io
---
apiVersion: dp.wso2.com/v1alpha3
kind: APIPolicy
Expand Down

0 comments on commit 8dad7ea

Please sign in to comment.