From 6c6addc182c3be40da158efe952401ec027b6d53 Mon Sep 17 00:00:00 2001 From: Erik Fuller <16261515+erikfuller@users.noreply.github.com> Date: Fri, 1 Nov 2024 10:41:59 -0700 Subject: [PATCH] Add configurable max workers for route controller instances (#672) * Added environment variable to control max workers for route controllers. Target GC now uses RW lock to accommodate parallel Deploy operations. --- docs/guides/environment.md | 10 ++++++++++ helm/templates/deployment.yaml | 3 +++ helm/values.yaml | 1 + pkg/config/controller_config.go | 12 ++++++++++++ pkg/config/controller_config_test.go | 25 ++++++++++++++++++++----- pkg/controllers/route_controller.go | 6 +++++- pkg/deploy/stack_deployer.go | 18 ++++++------------ pkg/utils/gwlog/actions.go | 4 ++-- 8 files changed, 59 insertions(+), 20 deletions(-) diff --git a/docs/guides/environment.md b/docs/guides/environment.md index ae1b11b4..db041b08 100644 --- a/docs/guides/environment.md +++ b/docs/guides/environment.md @@ -102,3 +102,13 @@ When set as "true", the controller will not use the [AWS Resource Groups Tagging The Resource Groups Tagging API is only available on the public internet and customers using private clusters will need to enable this feature. When enabled, the controller will use VPC Lattice APIs to lookup tags which are not as performant and requires more API calls. The Helm chart sets this value to "false" by default. + +--- + +#### `ROUTE_MAX_CONCURRENT_RECONCILES` + +**Type:** *int* + +**Default:** 1 + +Maximum number of concurrently running reconcile loops per route type (HTTP, GRPC, TLS) \ No newline at end of file diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml index 3cbe7eef..f2923a7c 100644 --- a/helm/templates/deployment.yaml +++ b/helm/templates/deployment.yaml @@ -97,6 +97,9 @@ spec: value: {{ .Values.webhookEnabled | quote }} - name: DISABLE_TAGGING_SERVICE_API value: {{ .Values.disableTaggingServiceApi | quote }} + - name: ROUTE_MAX_CONCURRENT_RECONCILES + value: {{ .Values.routeMaxConcurrentReconciles | quote }} + terminationGracePeriodSeconds: 10 volumes: - name: webhook-cert diff --git a/helm/values.yaml b/helm/values.yaml index 85e8378f..a22193d8 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -85,6 +85,7 @@ defaultServiceNetwork: latticeEndpoint: webhookEnabled: true disableTaggingServiceApi: false +routeMaxConcurrentReconciles: # TLS cert/key for the webhook. If specified, values must be base64 encoded webhookTLS: diff --git a/pkg/config/controller_config.go b/pkg/config/controller_config.go index 36a73111..e16d50ef 100644 --- a/pkg/config/controller_config.go +++ b/pkg/config/controller_config.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "os" + "strconv" "strings" @@ -28,6 +29,7 @@ const ( AWS_ACCOUNT_ID = "AWS_ACCOUNT_ID" DEV_MODE = "DEV_MODE" WEBHOOK_ENABLED = "WEBHOOK_ENABLED" + ROUTE_MAX_CONCURRENT_RECONCILES = "ROUTE_MAX_CONCURRENT_RECONCILES" ) var VpcID = "" @@ -40,6 +42,7 @@ var WebhookEnabled = "" var DisableTaggingServiceAPI = false var ServiceNetworkOverrideMode = false +var RouteMaxConcurrentReconciles = 1 func ConfigInit() error { sess, _ := session.NewSession() @@ -95,6 +98,15 @@ func configInit(sess *session.Session, metadata EC2Metadata) error { return fmt.Errorf("cannot get cluster name: %s", err) } + routeMaxConcurrentReconciles := os.Getenv(ROUTE_MAX_CONCURRENT_RECONCILES) + if routeMaxConcurrentReconciles != "" { + routeMaxConcurrentReconcilesInt, err := strconv.Atoi(routeMaxConcurrentReconciles) + if err != nil { + return fmt.Errorf("invalid value for ROUTE_MAX_CONCURRENT_RECONCILES: %s", err) + } + RouteMaxConcurrentReconciles = routeMaxConcurrentReconcilesInt + } + return nil } diff --git a/pkg/config/controller_config_test.go b/pkg/config/controller_config_test.go index 1e54e7af..c194a945 100644 --- a/pkg/config/controller_config_test.go +++ b/pkg/config/controller_config_test.go @@ -46,6 +46,7 @@ func Test_config_init_no_env_var(t *testing.T) { os.Unsetenv(CLUSTER_VPC_ID) os.Unsetenv(DEFAULT_SERVICE_NETWORK) os.Unsetenv(AWS_ACCOUNT_ID) + os.Unsetenv(ROUTE_MAX_CONCURRENT_RECONCILES) err := configInit(nil, ec2MetadataUnavailable()) assert.NotNil(t, err) @@ -58,16 +59,30 @@ func Test_config_init_with_all_env_var(t *testing.T) { testClusterLocalGateway := "default" testAwsAccountId := "12345678" testClusterName := "cluster-name" + testMaxRouteReconciles := "5" + testMaxRouteReconcilesInt := 5 os.Setenv(REGION, testRegion) os.Setenv(CLUSTER_VPC_ID, testClusterVpcId) os.Setenv(DEFAULT_SERVICE_NETWORK, testClusterLocalGateway) os.Setenv(AWS_ACCOUNT_ID, testAwsAccountId) os.Setenv(CLUSTER_NAME, testClusterName) - configInit(nil, ec2MetadataUnavailable()) - assert.Equal(t, Region, testRegion) - assert.Equal(t, VpcID, testClusterVpcId) - assert.Equal(t, AccountID, testAwsAccountId) - assert.Equal(t, DefaultServiceNetwork, testClusterLocalGateway) + os.Setenv(ROUTE_MAX_CONCURRENT_RECONCILES, testMaxRouteReconciles) + err := configInit(nil, ec2MetadataUnavailable()) + assert.Nil(t, err) + assert.Equal(t, testRegion, Region) + assert.Equal(t, testClusterVpcId, VpcID) + assert.Equal(t, testAwsAccountId, AccountID) + assert.Equal(t, testClusterLocalGateway, DefaultServiceNetwork) assert.Equal(t, testClusterName, ClusterName) + assert.Equal(t, testMaxRouteReconcilesInt, RouteMaxConcurrentReconciles) +} + +func Test_bad_reconcile_value(t *testing.T) { + // Test variable + maxReconciles := "FOO" + + os.Setenv(ROUTE_MAX_CONCURRENT_RECONCILES, maxReconciles) + err := configInit(nil, ec2MetadataUnavailable()) + assert.NotNil(t, err) } diff --git a/pkg/controllers/route_controller.go b/pkg/controllers/route_controller.go index f252fd15..989738d8 100644 --- a/pkg/controllers/route_controller.go +++ b/pkg/controllers/route_controller.go @@ -19,6 +19,7 @@ package controllers import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -119,7 +120,10 @@ func RegisterAllRouteControllers( Watches(&gwv1beta1.Gateway{}, gwEventHandler). Watches(&corev1.Service{}, svcEventHandler.MapToRoute(routeInfo.routeType)). Watches(&anv1alpha1.ServiceImport{}, svcImportEventHandler.MapToRoute(routeInfo.routeType)). - Watches(&discoveryv1.EndpointSlice{}, svcEventHandler.MapToRoute(routeInfo.routeType)) + Watches(&discoveryv1.EndpointSlice{}, svcEventHandler.MapToRoute(routeInfo.routeType)). + WithOptions(controller.Options{ + MaxConcurrentReconciles: config.RouteMaxConcurrentReconciles, + }) if ok, err := k8s.IsGVKSupported(mgr, anv1alpha1.GroupVersion.String(), anv1alpha1.TargetGroupPolicyKind); ok { builder.Watches(&anv1alpha1.TargetGroupPolicy{}, svcEventHandler.MapToRoute(routeInfo.routeType)) diff --git a/pkg/deploy/stack_deployer.go b/pkg/deploy/stack_deployer.go index 66534fed..0abdc4cc 100644 --- a/pkg/deploy/stack_deployer.go +++ b/pkg/deploy/stack_deployer.go @@ -81,7 +81,7 @@ func NewLatticeServiceStackDeploy( tgGcSynth := lattice.NewTargetGroupSynthesizer(log, cloud, k8sClient, tgMgr, tgSvcExpBuilder, svcBuilder, nil) tgGcFn := NewTgGcFn(tgGcSynth) tgGc = &TgGc{ - lock: sync.Mutex{}, + lock: sync.RWMutex{}, log: log.Named("tg-gc"), ctx: context.TODO(), isDone: atomic.Bool{}, @@ -130,7 +130,7 @@ func NewTgGcFn(tgSynth *lattice.TargetGroupSynthesizer) TgGcCycleFn { } type TgGc struct { - lock sync.Mutex + lock sync.RWMutex log gwlog.Logger ctx context.Context isDone atomic.Bool @@ -189,16 +189,10 @@ func (d *latticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Sta listenerSynthesizer := lattice.NewListenerSynthesizer(d.log, d.listenerManager, d.targetGroupManager, stack) ruleSynthesizer := lattice.NewRuleSynthesizer(d.log, d.ruleManager, d.targetGroupManager, stack) - // We need to block GC when we deploy stack. Stack deployer first creates TG and then - // associate TG with Service. If GC will run in between it can delete newly created TG - // before association since it's dangling TG. This lock also prevents concurrent - // deployments, only one deployment can run at the time. - // - // TODO: This place can become a contention. May be debug log with lock waiting time? defer func() { - tgGc.lock.Unlock() + tgGc.lock.RUnlock() }() - tgGc.lock.Lock() + tgGc.lock.RLock() //Handle targetGroups creation request if err := targetGroupSynthesizer.SynthesizeCreate(ctx); err != nil { @@ -267,9 +261,9 @@ func NewTargetGroupStackDeploy( func (d *latticeTargetGroupStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { defer func() { - tgGc.lock.Unlock() + tgGc.lock.RUnlock() }() - tgGc.lock.Lock() + tgGc.lock.RLock() synthesizers := []ResourceSynthesizer{ lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sclient, d.targetGroupManager, d.svcExportTgBuilder, d.svcBuilder, stack), diff --git a/pkg/utils/gwlog/actions.go b/pkg/utils/gwlog/actions.go index 7de0edf7..92a38b97 100644 --- a/pkg/utils/gwlog/actions.go +++ b/pkg/utils/gwlog/actions.go @@ -1,4 +1,4 @@ package gwlog -const ReconcileStart = "RECONCILE_START_MARKER" -const ReconcileEnd = "RECONCILE_END_MARKER" +const ReconcileStart = "reconcile_start" +const ReconcileEnd = "reconcile_end"