Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #1091: optimize tagging controller workqueue handling #1094

Open
wants to merge 1 commit into
base: release-1.30
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions pkg/controllers/tagging/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,15 @@ limitations under the License.
package tagging

import (
"sync"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"sync"
)

var register sync.Once

var (
workItemDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds",
Help: "workitem latency of workitem being in the queue and time it takes to process",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20),
},
[]string{"latency_type"})

workItemError = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "cloudprovider_aws_tagging_controller_work_item_errors_total",
Expand All @@ -43,15 +35,10 @@ var (
// registerMetrics registers tagging-controller metrics.
func registerMetrics() {
register.Do(func() {
legacyregistry.MustRegister(workItemDuration)
legacyregistry.MustRegister(workItemError)
})
}

func recordWorkItemLatencyMetrics(latencyType string, timeTaken float64) {
workItemDuration.With(metrics.Labels{"latency_type": latencyType}).Observe(timeTaken)
}

func recordWorkItemErrorMetrics(errorType string, instanceID string) {
workItemError.With(metrics.Labels{"error_type": errorType, "instance_id": instanceID}).Inc()
}
100 changes: 57 additions & 43 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand All @@ -42,16 +43,21 @@ func init() {
registerMetrics()
}

// workItem contains the node and an action for that node
// taggingControllerNode contains the node details required for tag/untag of node resources.
type taggingControllerNode struct {
providerID string
name string
}

// workItem contains the node name, provider id and an action for that node.
type workItem struct {
node *v1.Node
action func(node *v1.Node) error
requeuingCount int
enqueueTime time.Time
name string
providerID string
action string
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
return fmt.Sprintf("[Node: %s, Action: %s]", w.name, w.action)
}

const (
Expand All @@ -62,17 +68,15 @@ const (
// The label for depicting total number of errors a work item encounter and succeed
totalErrorsWorkItemErrorMetric = "total_errors"

// The label for depicting total time when work item gets queued to processed
workItemProcessingTimeWorkItemMetric = "work_item_processing_time"

// The label for depicting total time when work item gets queued to dequeued
workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time"

// The label for depicting total number of errors a work item encounter and fail
errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted"

// The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API.
newNodeEventualConsistencyGracePeriod = time.Minute * 5

addTag = "ADD"

deleteTag = "DELETE"
)

// Controller is the controller implementation for tagging cluster resources.
Expand Down Expand Up @@ -150,7 +154,7 @@ func NewTaggingController(
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.tagNodesResources)
tc.enqueueNode(node, addTag)
},
UpdateFunc: func(oldObj, newObj interface{}) {
node := newObj.(*v1.Node)
Expand All @@ -163,11 +167,11 @@ func NewTaggingController(
return
}

tc.enqueueNode(node, tc.tagNodesResources)
tc.enqueueNode(node, addTag)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.untagNodeResources)
tc.enqueueNode(node, deleteTag)
},
})

Expand Down Expand Up @@ -213,21 +217,17 @@ func (tc *Controller) process() bool {
err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)

workItem, ok := obj.(*workItem)
workItem, ok := obj.(workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %f seconds", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
err = fmt.Errorf("error in getting instanceID for node %s, error: %v", workItem.name, err)
utilruntime.HandleError(err)
return nil
}
Expand All @@ -239,26 +239,31 @@ func (tc *Controller) process() bool {
tc.workqueue.Forget(obj)
return nil
}

err = workItem.action(workItem.node)

if workItem.action == addTag {
err = tc.tagNodesResources(&taggingControllerNode{
name: workItem.name,
providerID: workItem.providerID,
})
} else {
err = tc.untagNodeResources(&taggingControllerNode{
name: workItem.name,
providerID: workItem.providerID,
})
}
if err != nil {
if workItem.requeuingCount < maxRequeuingCount {
numRetries := tc.workqueue.NumRequeues(workItem)
if numRetries < maxRequeuingCount {
// Put the item back on the workqueue to handle any transient errors.
workItem.requeuingCount++
tc.workqueue.AddRateLimited(workItem)

recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID))
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount)
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), numRetries)
}

klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error())
recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID))
} else {
klog.Infof("Finished processing %s", workItem)
timeTaken = time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken)
klog.Infof("Processing latency %f seconds", timeTaken)
}

tc.workqueue.Forget(obj)
Expand All @@ -275,11 +280,19 @@ func (tc *Controller) process() bool {

// tagNodesResources tag node resources
// If we want to tag more resources, modify this function appropriately
func (tc *Controller) tagNodesResources(node *v1.Node) error {
func (tc *Controller) tagNodesResources(node *taggingControllerNode) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
err := tc.tagEc2Instance(node)
v1node, err := tc.nodeInformer.Lister().Get(node.name)
if err != nil {
// If node not found, just ignore it as its okay to not add tags when the node object is deleted.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
err = tc.tagEc2Instance(v1node)
if err != nil {
return err
}
Expand Down Expand Up @@ -332,7 +345,7 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {

// untagNodeResources untag node resources
// If we want to untag more resources, modify this function appropriately
func (tc *Controller) untagNodeResources(node *v1.Node) error {
func (tc *Controller) untagNodeResources(node *taggingControllerNode) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
Expand All @@ -348,13 +361,13 @@ func (tc *Controller) untagNodeResources(node *v1.Node) error {

// untagEc2Instances deletes the provided tags to each EC2 instances in
// the cluster.
func (tc *Controller) untagEc2Instance(node *v1.Node) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
func (tc *Controller) untagEc2Instance(node *taggingControllerNode) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.providerID).MapToAWSInstanceID()

err := tc.cloud.UntagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.name, err)
return err
}

Expand All @@ -365,12 +378,13 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {

// enqueueNode takes in the object and an
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
action: action,
requeuingCount: 0,
enqueueTime: time.Now(),
func (tc *Controller) enqueueNode(node *v1.Node, action string) {
// if the struct has fields which are all comparable then the workqueue add will handle make sure multiple adds of the same object
// will only have one item in the workqueue.
item := workItem{
name: node.GetName(),
providerID: node.Spec.ProviderID,
action: action,
}

if tc.rateLimitEnabled {
Expand Down
86 changes: 80 additions & 6 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -221,27 +222,32 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 5*time.Millisecond),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
}

if testcase.toBeTagged {
tc.enqueueNode(testcase.currNode, tc.tagNodesResources)
tc.enqueueNode(testcase.currNode, addTag)
} else {
tc.enqueueNode(testcase.currNode, tc.untagNodeResources)
tc.enqueueNode(testcase.currNode, deleteTag)
}

if tc.rateLimitEnabled {
// If rate limit is enabled, sleep for 10 ms to wait for the item to be added to the queue since the base delay is 5 ms.
time.Sleep(10 * time.Millisecond)
}

cnt := 0
for tc.workqueue.Len() > 0 {
tc.process()

cnt++
// sleep briefly because of exponential backoff when requeueing failed workitem
// resulting in workqueue to be empty if checked immediately
time.Sleep(1500 * time.Millisecond)
time.Sleep(7 * time.Millisecond)
}

for _, msg := range testcase.expectedMessages {
Expand All @@ -256,12 +262,80 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
if !strings.Contains(logBuf.String(), "requeuing count exceeded") {
t.Errorf("\nExceeded requeue count but did not stop: \n%v\n", logBuf.String())
}
if cnt != maxRequeuingCount+1 {
t.Errorf("the node got requeued %d, more than the max requeuing count of %d", cnt, maxRequeuingCount)
}
}
}
})
}
}

func TestMultipleEnqueues(t *testing.T) {
awsServices := awsv1.NewFakeAWSServices(TestClusterID)
fakeAws, _ := awsv1.NewAWSCloud(config.CloudConfig{}, awsServices)

testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
}
testNode1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-0002",
},
}
clientset := fake.NewSimpleClientset(testNode, testNode1)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()

if err := syncNodeStore(nodeInformer, clientset); err != nil {
t.Errorf("unexpected error: %v", err)
}

tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
tc.enqueueNode(testNode, addTag)
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// adding the same node with similar operation shouldn't add to the workqueue
tc.enqueueNode(testNode, addTag)
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// adding the same node with different operation should add to the workqueue
tc.enqueueNode(testNode, deleteTag)
if tc.workqueue.Len() != 2 {
t.Errorf("invalid work queue length, expected 2, got %d", tc.workqueue.Len())
}
// adding the different node should add to the workqueue
tc.enqueueNode(testNode1, addTag)
if tc.workqueue.Len() != 3 {
t.Errorf("invalid work queue length, expected 3, got %d", tc.workqueue.Len())
}
// should handle the add tag properly
tc.process()
if tc.workqueue.Len() != 2 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// should handle the delete tag properly
tc.process()
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
}

func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error {
nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down
Loading