Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide CloudEvents around the management of ScaledObjects resources #5953

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Add --ca-dir flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)|[#5797](https://github.com/kedacore/keda/issues/5797))
- **General**: Introduce new Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904))
- **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522))
- **General**: Remove deprecated Kustomize commonLabels ([#5888](https://github.com/kedacore/keda/pull/5888))
- **General**: Support for Kubernetes v1.30 ([#5828](https://github.com/kedacore/keda/issues/5828))

Expand Down
3 changes: 3 additions & 0 deletions apis/eventing/v1alpha1/cloudevent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (

// ScaledObjectFailedType is for event when creating ScaledObject failed
ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1"

// ScaledObjectFailedType is for event when removed ScaledObject
ScaledObjectRemovedType CloudEventType = "keda.scaledobject.removed.v1"
)

var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType}
1 change: 0 additions & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func main() {
if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: eventRecorder,
ScaleClient: scaleClient,
ScaleHandler: scaledHandler,
EventEmitter: eventEmitter,
Expand Down
20 changes: 9 additions & 11 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -70,7 +69,6 @@ import (
type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
ScaleClient scale.ScalesGetter
ScaleHandler scaling.ScaleHandler
EventEmitter eventemitter.EventHandler
Expand Down Expand Up @@ -119,8 +117,8 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
if r.Scheme == nil {
return fmt.Errorf("ScaledObjectReconciler.Scheme is not initialized")
}
if r.Recorder == nil {
return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized")
if r.EventEmitter == nil {
return fmt.Errorf("ScaledObjectReconciler.EventEmitter is not initialized")
}
// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -184,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}
}
Expand All @@ -196,18 +194,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed")
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg)
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -359,7 +357,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
if err != nil {
msg := "Failed to parse Group, Version, Kind, Resource"
logger.Error(err, msg, "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, msg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return gvkr, err
}
gvkString := gvkr.GVKString()
Expand Down Expand Up @@ -396,12 +394,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, message.ScaleTargetNotFoundMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, message.ScaleTargetNoSubresourceMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
return gvkr, errScale
}
isScalableCache.Store(gr.String(), true)
Expand Down
4 changes: 3 additions & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventreason"
)

Expand Down Expand Up @@ -86,7 +88,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectRemovedType, eventreason.ScaledObjectDeleted, message.ScaledObjectRemoved)
return nil
}

Expand Down
1 change: 0 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ var _ = BeforeSuite(func() {
err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ const (
ScaleTargetNotFoundMsg = "Target resource doesn't exist"

ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource"

ScaledObjectRemoved = "ScaledObject was deleted"
)
6 changes: 3 additions & 3 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type EventEmitter struct {
type EventHandler interface {
DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error
HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error
Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string)
Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string)
}

// EventDataHandler defines the behavior for different event handlers
Expand Down Expand Up @@ -325,7 +325,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource
}

// Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming.
func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) {
func (e *EventEmitter) Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) {
e.recorder.Event(object, eventType, reason, message)

e.eventHandlersCacheLock.RLock()
Expand All @@ -337,7 +337,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
objectName, _ := meta.NewAccessor().Name(object)
objectType, _ := meta.NewAccessor().Kind(object)
eventData := eventdata.EventData{
Namespace: namesapce.Namespace,
Namespace: namesapce,
CloudEventType: cloudeventType,
ObjectName: strings.ToLower(objectName),
ObjectType: strings.ToLower(objectType),
Expand Down
107 changes: 106 additions & 1 deletion tests/internals/cloudevent_source/cloudevent_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var _ = godotenv.Load("../../.env")
var (
namespace = fmt.Sprintf("%s-ns", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
deploymentName = fmt.Sprintf("%s-d", testName)
clientName = fmt.Sprintf("%s-client", testName)
cloudeventSourceName = fmt.Sprintf("%s-ce", testName)
cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName)
Expand All @@ -43,6 +44,7 @@ var (
type templateData struct {
TestNamespace string
ScaledObject string
DeploymentName string
ClientName string
CloudEventSourceName string
CloudeventSourceErrName string
Expand Down Expand Up @@ -210,6 +212,56 @@ spec:
excludedEventTypes:
- keda.scaledobject.failed.v1
`

deploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: {{.DeploymentName}}
spec:
replicas: 1
selector:
matchLabels:
pod: {{.DeploymentName}}
template:
metadata:
labels:
pod: {{.DeploymentName}}
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'
`

scaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObject}}
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 5
cooldownPeriod: 5
minReplicaCount: 1
maxReplicaCount: 10
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 15
triggers:
- type: cron
metadata:
timezone: Etc/UTC
start: 3 * * * *
end: 5 * * * *
desiredReplicas: '4'
`
)

func TestScaledObjectGeneral(t *testing.T) {
Expand All @@ -223,6 +275,7 @@ func TestScaledObjectGeneral(t *testing.T) {
assert.True(t, WaitForAllPodRunningInNamespace(t, kc, namespace, 5, 20), "all pods should be running")

testErrEventSourceEmitValue(t, kc, data)
testEventSourceEmitValue(t, kc, data)
testErrEventSourceExcludeValue(t, kc, data)
testErrEventSourceIncludeValue(t, kc, data)
testErrEventSourceCreation(t, kc, data)
Expand Down Expand Up @@ -258,8 +311,16 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
foundEvents = append(foundEvents, cloudEvent)
data := map[string]string{}
err := cloudEvent.DataAs(&data)
t.Log("--- test emitting eventsource about scaledobject err---", "message", data["message"])

assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject doesn't have correct scaleTargetRef specification")
assert.Condition(t, func() bool {
if data["message"] == "ScaledObject doesn't have correct scaleTargetRef specification" || data["message"] == "Target resource doesn't exist" {
return true
}
return false
}, "get filtered event")

assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")
Expand All @@ -272,6 +333,49 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
assert.NotEmpty(t, foundEvents)
}

func testEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- test emitting eventsource about scaledobject removed---")
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate)

// wait 15 seconds to ensure event propagation
time.Sleep(5 * time.Second)
KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
time.Sleep(10 * time.Second)

out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectDeleted"))
assert.NotEmpty(t, out)
assert.Empty(t, outErr)
assert.NoError(t, err, "dont expect error requesting ")

cloudEvents := []cloudevents.Event{}
err = json.Unmarshal([]byte(out), &cloudEvents)

assert.NoError(t, err, "dont expect error unmarshaling the cloudEvents")
assert.Greater(t, len(cloudEvents), 0, "cloudEvents should have at least 1 item")

foundEvents := []cloudevents.Event{}

for _, cloudEvent := range cloudEvents {
if cloudEvent.Subject() == expectedSubject {
foundEvents = append(foundEvents, cloudEvent)
data := map[string]string{}
err := cloudEvent.DataAs(&data)

assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject was deleted")
assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.removed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")

if lastCloudEventTime.Before(cloudEvent.Time()) {
lastCloudEventTime = cloudEvent.Time()
}
}
}
assert.NotEmpty(t, foundEvents)
}

// tests error events not emitted by
func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- test emitting eventsource about scaledobject err with exclude filter---")
Expand Down Expand Up @@ -362,6 +466,7 @@ func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: namespace,
ScaledObject: scaledObjectName,
DeploymentName: deploymentName,
ClientName: clientName,
CloudEventSourceName: cloudeventSourceName,
CloudeventSourceErrName: cloudeventSourceErrName,
Expand Down
Loading