-
Notifications
You must be signed in to change notification settings - Fork 9
/
profile_monitor.go
194 lines (171 loc) · 6.4 KB
/
profile_monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package controller
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"github.com/intel/intent-driven-orchestration/pkg/api/intents/v1alpha1"
"github.com/intel/intent-driven-orchestration/pkg/common"
clientSet "github.com/intel/intent-driven-orchestration/pkg/generated/clientset/versioned"
informers "github.com/intel/intent-driven-orchestration/pkg/generated/informers/externalversions/intents/v1alpha1"
lister "github.com/intel/intent-driven-orchestration/pkg/generated/listers/intents/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
// KPIProfileMonitor is the part implementing the monitoring the KPIProfiles.
type KPIProfileMonitor struct {
profileClient clientSet.Interface
profileLister lister.KPIProfileLister
profileSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
update chan<- common.Profile
defaultProfiles map[string]map[string]string
syncHandler func(key string) error // Enables us to test this easily.
}
// NewKPIProfileMonitor returns a new monitor instance.
func NewKPIProfileMonitor(cfg common.MonitorConfig, profileClient clientSet.Interface, profileInformer informers.KPIProfileInformer, ch chan<- common.Profile) *KPIProfileMonitor {
// parse default configs.
tmp, err := os.ReadFile(cfg.Profile.Queries)
if err != nil {
klog.Fatal("Unable to read config file with the default profile definitions: ", err)
}
var result map[string]map[string]string
err = json.Unmarshal(tmp, &result)
if err != nil {
klog.Fatal("Unable to parse default profile definitions: ", err)
}
// the actual monitor.
mon := &KPIProfileMonitor{
profileClient: profileClient,
profileLister: profileInformer.Lister(),
profileSynced: profileInformer.Informer().HasSynced,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "KPIProfiles"}),
update: ch,
defaultProfiles: result,
}
mon.syncHandler = mon.processProfile
// TODO: check for event broadcasting.
// handle add, update & delete.
_, _ = profileInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mon.enqueueItem,
UpdateFunc: func(old, new interface{}) {
if old.(*v1alpha1.KPIProfile).ResourceVersion == new.(*v1alpha1.KPIProfile).ResourceVersion {
// no change --> nothing to do.
return
}
mon.enqueueItem(new)
},
DeleteFunc: func(obj interface{}) {
var key string
var err error
key, err = cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
klog.Infof("Will remove profile '%s'.", key)
mon.update <- common.Profile{Key: key, ProfileType: common.Obsolete, External: true}
},
})
return mon
}
// enqueueItem adds items to the work queue.
func (mon *KPIProfileMonitor) enqueueItem(obj interface{}) {
var key string
var err error
key, err = cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
mon.queue.Add(key)
}
// Run the basic monitors.
func (mon *KPIProfileMonitor) Run(nWorkers int, stopper <-chan struct{}) {
defer runtime.HandleCrash()
defer mon.queue.ShutDown()
if ok := cache.WaitForCacheSync(stopper, mon.profileSynced); !ok {
return
}
for i := 0; i < nWorkers; i++ {
go wait.Until(mon.runWorker, time.Second, stopper)
}
klog.V(1).Infof("Started %d worker(s).", nWorkers)
<-stopper
}
// runWorker will run forever and process items of a queue.
func (mon *KPIProfileMonitor) runWorker() {
for mon.processNextWorkItem() {
}
}
// processNextWorkItem will handle item in the queue.
func (mon *KPIProfileMonitor) processNextWorkItem() bool {
obj, done := mon.queue.Get()
if done {
return false
}
defer mon.queue.Done(obj)
// process obj.
err := mon.syncHandler(obj.(string))
if err == nil {
mon.queue.Forget(obj)
return true
}
// Failed --> add back to queue, but rate limited!
runtime.HandleError(fmt.Errorf("processing of %v failed with: %v", obj, err))
mon.queue.AddRateLimited(obj)
return true
}
// processProfile checks if it can resolve a profile, and if so marks it as resolved.
func (mon *KPIProfileMonitor) processProfile(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: '%s'", key))
//lint:ignore nilerr n.a.
return nil // ignore
}
profile, err := mon.profileLister.KPIProfiles(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("KPI profile '%s' does not longer exists", key))
return nil
}
return err
}
// look into the profile we got and if it is valid push it into the channel.
var parsedProfile common.Profile
if _, found := mon.defaultProfiles[key]; found {
tmp := mon.defaultProfiles[key]
parsedProfile = common.Profile{Key: key, ProfileType: common.ProfileTypeFromText(profile.Spec.KPIType), Query: tmp["query"], Address: tmp["endpoint"]}
mon.updateStatus(profile, true, "ok")
mon.update <- parsedProfile
} else {
if _, found := profile.Spec.Props["endpoint"]; found && profile.Spec.Query != "" {
// FIXME - make sure whatever is put in query is safe, secure & valid (regex maybe?)
parsedProfile = common.Profile{Key: key, ProfileType: common.ProfileTypeFromText(profile.Spec.KPIType), Query: profile.Spec.Query, External: true, Address: profile.Spec.Props["endpoint"]}
mon.updateStatus(profile, true, "ok")
mon.update <- parsedProfile
} else {
mon.updateStatus(profile, false, "Both a endpoint and a query need to be defined.")
mon.update <- common.Profile{Key: key, ProfileType: common.Obsolete, External: true}
}
}
return nil
}
// updateStatus actualUpdates the status of the CRD.
func (mon *KPIProfileMonitor) updateStatus(profile *v1alpha1.KPIProfile, resolved bool, reason string) {
profileCopy := profile.DeepCopy()
profileCopy.Status.Resolved = resolved
profileCopy.Status.Reason = reason
klog.Infof("Set status for profile '%s/%s' to '%t' - reason: '%s'.", profile.Namespace, profile.Name, resolved, reason)
_, err := mon.profileClient.IdoV1alpha1().KPIProfiles(profile.Namespace).UpdateStatus(context.TODO(), profileCopy, metaV1.UpdateOptions{})
if err != nil {
runtime.HandleError(fmt.Errorf("unable to update status subresource: %s", err))
}
}