Skip to content

Commit

Permalink
Merge pull request #2617 from Tharsanan1/redis
Browse files Browse the repository at this point in the history
Support password only redis connection
  • Loading branch information
CrowleyRajapakse authored Oct 24, 2024
2 parents e32dd7a + 8dad7ea commit 3a82a64
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 128 deletions.
43 changes: 18 additions & 25 deletions common-controller/internal/cache/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (

// RatelimitDataStore is a cache for rate limit policies.
type RatelimitDataStore struct {
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
subscriptionBasedAIRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
}

// CreateNewOperatorDataStore creates a new RatelimitDataStore.
Expand Down Expand Up @@ -103,24 +103,16 @@ func (ods *RatelimitDataStore) AddorUpdateAIRatelimitToStore(rateLimit types.Nam
ods.aiRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec
}

// MarkAIRatelimitAsSubscriptionEnabled add an entry to specify an AI RatelimitPolicy is associated with a subscription
func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionEnabled(nn types.NamespacedName) {
ods.mu.Lock()
defer ods.mu.Unlock()
if ods.subscriptionEnabledAIRatelimitPolicies == nil {
ods.subscriptionEnabledAIRatelimitPolicies = make(map[types.NamespacedName]struct{})
}
ods.subscriptionEnabledAIRatelimitPolicies[nn] = struct{}{}
}

// MarkAIRatelimitAsSubscriptionDisabled deletes the entry which was added to specify an AI RatelimitPolicy is associated with a subscription
func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionDisabled(nn types.NamespacedName) {
// AddorUpdateSubscriptionBasedAIRatelimitToStore adds a new ratelimit to the RatelimitDataStore.
func (ods *RatelimitDataStore) AddorUpdateSubscriptionBasedAIRatelimitToStore(rateLimit types.NamespacedName,
aiRatelimitSpec dpv1alpha3.AIRateLimitPolicySpec) {
ods.mu.Lock()
defer ods.mu.Unlock()
if ods.subscriptionEnabledAIRatelimitPolicies == nil {
return
logger.Infof("Adding/Updating AI ratelimit spec to cache")
if ods.subscriptionBasedAIRatelimitPolicySpecs == nil {
ods.subscriptionBasedAIRatelimitPolicySpecs = make(map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec)
}
delete(ods.subscriptionEnabledAIRatelimitPolicies, nn)
ods.subscriptionBasedAIRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec
}

// GetResolveRatelimitPolicy get cached ratelimit
Expand Down Expand Up @@ -148,11 +140,12 @@ func (ods *RatelimitDataStore) GetAIRatelimitPolicySpecs() map[types.NamespacedN
return ods.aiRatelimitPolicySpecs
}

// GetSubscriptionEnabledAIRatelimitPolicies gets all the AIRatelimitPolicy stored in ods
func (ods *RatelimitDataStore) GetSubscriptionEnabledAIRatelimitPolicies() map[types.NamespacedName]struct{} {
return ods.subscriptionEnabledAIRatelimitPolicies
// GetSubscriptionBasedAIRatelimitPolicySpecs gets all the AIRatelimitPolicy stored in ods
func (ods *RatelimitDataStore) GetSubscriptionBasedAIRatelimitPolicySpecs() map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec {
return ods.subscriptionBasedAIRatelimitPolicySpecs
}


// DeleteResolveRatelimitPolicy delete from ratelimit cache
func (ods *RatelimitDataStore) DeleteResolveRatelimitPolicy(rateLimit types.NamespacedName) {
ods.mu.Lock()
Expand Down Expand Up @@ -182,7 +175,7 @@ func (ods *RatelimitDataStore) DeleteSubscriptionBasedAIRatelimitPolicySpec(subs
ods.mu.Lock()
defer ods.mu.Unlock()
logger.Debug("Deleting AI ratelimit from cache")
delete(ods.aiRatelimitPolicySpecs, subscription)
delete(ods.subscriptionBasedAIRatelimitPolicySpecs, subscription)
}

// NamespacedName generates namespaced name for Kubernetes objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

xds "github.com/wso2/apk/common-controller/internal/xds"
cpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha3"
dpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3"
)
Expand All @@ -61,11 +60,10 @@ const (
)

// NewSubscriptionController creates a new Subscription controller instance.
func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore, ratelimitStore *cache.RatelimitDataStore) error {
func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore) error {
r := &SubscriptionReconciler{
client: mgr.GetClient(),
ods: subscriptionStore,
rlODS: ratelimitStore,
}
ctx := context.Background()
conf := config.ReadConfigs()
Expand Down Expand Up @@ -136,20 +134,6 @@ func (subscriptionReconciler *SubscriptionReconciler) Reconcile(ctx context.Cont
}
}
} else {
if subscription.Spec.RatelimitRef.Name != "" {
nn := types.NamespacedName{
Namespace: subscription.Namespace,
Name: subscription.Spec.RatelimitRef.Name,
}
var airl dpv1alpha3.AIRateLimitPolicy
if err := subscriptionReconciler.client.Get(ctx, nn, &airl); err == nil {
subscriptionReconciler.rlODS.AddorUpdateAIRatelimitToStore(nn, airl.Spec)
subscriptionReconciler.rlODS.MarkAIRatelimitAsSubscriptionEnabled(nn)
xds.UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionReconciler.rlODS.GetSubscriptionEnabledAIRatelimitPolicies(), subscriptionReconciler.rlODS.GetAIRatelimitPolicySpecs())
conf := config.ReadConfigs()
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
sendSubUpdates(subscription)
utils.SendAddSubscriptionEvent(subscription)
subscriptionReconciler.ods.AddorUpdateSubscriptionToStore(subscriptionKey, subscription.Spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dp

import (
"context"
"strings"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -101,7 +102,9 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re
loggers.LoggerAPKOperator.Errorf("Error retrieving AIRatelimit")
// It could be deletion event. So lets try to delete the related entried from the ods and update xds
r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey)
r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else {
loggers.LoggerAPKOperator.Infof("ratelimits found")
Expand All @@ -112,9 +115,15 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re
r.ods.AddorUpdateAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else if strings.EqualFold(string(ratelimitPolicy.Spec.TargetRef.Kind), "Subscription") {
r.ods.AddorUpdateSubscriptionBasedAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec)
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else {
r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey)
r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
Expand Down
2 changes: 1 addition & 1 deletion common-controller/internal/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func InitOperator(metricsConfig config.Metrics) {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3115, logging.MAJOR,
"Error creating Application controller, error: %v", err))
}
if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore, ratelimitStore); err != nil {
if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore); err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3116, logging.MAJOR,
"Error creating Subscription controller, error: %v", err))
}
Expand Down
38 changes: 28 additions & 10 deletions common-controller/internal/web/notify_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,30 @@ func initRedisClient() error {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

rdb = redis.NewClient(&redis.Options{
Addr: redisAddr,
Username: redisUsername,
Password: redisPassword,
options := &redis.Options{
Addr: redisAddr,
Password: redisPassword,
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
},
})
}
if redisUsername != "" {
options.Username = redisUsername
}
rdb = redis.NewClient(options)
} else {
rdb = redis.NewClient(&redis.Options{
Addr: redisAddr,
Username: redisUsername,
options := &redis.Options{
Addr: redisAddr,
Password: redisPassword,
})
}
// Only set Username if it's not empty
if redisUsername != "" {
options.Username = redisUsername
}
rdb = redis.NewClient(options)
}
return nil;
}
Expand Down Expand Up @@ -175,7 +183,17 @@ func storeTokenInRedis(token string, expiry int64) error {
key := generateKey(token)
err := rdb.Do(context.Background(), "set", key, expiry, "EXAT", expiry).Err()
if err != nil {
return err
loggers.LoggerAPI.Warnf("Error occured while trying to set key with expiry. Error: %+v. \n Trying to use SET and EXPIREAT command...", err)
err = rdb.Do(context.Background(), "set", key, expiry).Err()
if err != nil {
loggers.LoggerAPI.Errorf("Error occured while setting the key. Error %+v", err)
return err
}
err = rdb.Do(context.Background(), "expireat", key, expiry).Err()
if err != nil {
loggers.LoggerAPI.Errorf("Error occured while setting the expiry. Error %+v", err)
return err
}
}
publishValue := fmt.Sprintf("%s%s%d", token, tokenExpiryDivider, expiry)
err = rdb.Do(context.Background(), "publish", redisRevokedTokenChannel, publishValue).Err()
Expand Down
109 changes: 54 additions & 55 deletions common-controller/internal/xds/ratelimiter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,72 +333,71 @@ func (r *rateLimitPolicyCache) AddCustomRateLimitPolicies(customRateLimitPolicy
}

// ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache process the specs and update the cache
func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
aiRlDescriptors := make([]*rls_config.RateLimitDescriptor, 0)
for namespacedNameRl := range subscriptionEnabledAIRatelimitPolicies {
if airl, exists := aiRatelimitPolicySpecs[namespacedNameRl]; exists {
if airl.Override.TokenCount != nil {
// Add descriptor for RequestTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount),
},
for namespacedNameRl, airl := range aiRatelimitPolicySpecs {
if airl.Override.TokenCount != nil {
// Add descriptor for RequestTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount),
},
},
})
// Add descriptor for ResponseTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount),
},
},
})
// Add descriptor for ResponseTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount),
},
},
})
// Add descriptor for TotalTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount),
},
},
})
// Add descriptor for TotalTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount),
},
},
})
}
// Add descriptor for RequestCount
if airl.Override.RequestCount != nil {
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.RequestCount.Unit),
RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit),
},
},
})
}
// Add descriptor for RequestCount
if airl.Override.RequestCount != nil {
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.RequestCount.Unit),
RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit),
},
},
})
}
},
})
}
}

r.subscriptionBasedAIRatelimitDescriptors = aiRlDescriptors
}

Expand Down
6 changes: 3 additions & 3 deletions common-controller/internal/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ func UpdateRateLimitXDSCacheForAIRatelimitPolicies(aiRatelimitPolicySpecs map[ap
rlsPolicyCache.ProcessAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs)
}

// UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies.
func UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionEnabledAIRatelimitPolicies map[apimachiner_types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies, aiRatelimitPolicySpecs)
// UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies.
func UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs)
}

// DeleteAPILevelRateLimitPolicies delete the ratelimit xds cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class EnvVarConfig {
public static final String DEFAULT_XDS_MAX_RETRIES = Integer.toString(Constants.MAX_XDS_RETRIES);
public static final String DEFAULT_XDS_RETRY_PERIOD = Integer.toString(Constants.XDS_DEFAULT_RETRY);
public static final String DEFAULT_HOSTNAME = "Unassigned";
public static final String DEFAULT_REDIS_USERNAME = "default";
public static final String DEFAULT_REDIS_USERNAME = "";
public static final String DEFAULT_REDIS_PASSWORD = "";
public static final String DEFAULT_REDIS_HOST = "redis-master";
public static final int DEFAULT_REDIS_PORT = 6379;
Expand Down
Loading

0 comments on commit 3a82a64

Please sign in to comment.