Skip to content

Commit

Permalink
Add publisher service in the Eventing CR status (#283)
Browse files Browse the repository at this point in the history
* Add publisher service in the Eventing CR status

* Update tests
  • Loading branch information
marcobebway authored Dec 8, 2023
1 parent 6c641dd commit 5c4f107
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 21 deletions.
1 change: 1 addition & 0 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type EventingStatus struct {
ActiveBackend BackendType `json:"activeBackend"`
BackendConfigHash int64 `json:"specHash"`
State string `json:"state"`
PublisherService string `json:"publisherService,omitempty"`
Conditions []kmetav1.Condition `json:"conditions,omitempty"`
}

Expand Down
11 changes: 11 additions & 0 deletions api/operator/v1alpha1/status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"fmt"
"reflect"

natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1"
Expand Down Expand Up @@ -134,3 +135,13 @@ func (es *EventingStatus) IsEqual(status EventingStatus) bool {
return reflect.DeepEqual(thisWithoutCond, statusWithoutCond) &&
natsv1alpha1.ConditionsEquals(es.Conditions, status.Conditions)
}

// ClearPublisherService clears the PublisherService.
func (es *EventingStatus) ClearPublisherService() {
es.PublisherService = ""
}

// SetPublisherService sets the PublisherService from the given service name and namespace.
func (es *EventingStatus) SetPublisherService(name, namespace string) {
es.PublisherService = fmt.Sprintf("%s.%s", name, namespace)
}
68 changes: 68 additions & 0 deletions api/operator/v1alpha1/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,71 @@ func TestClearConditions(t *testing.T) {
// then
require.Len(t, givenEventingStatus.Conditions, 0)
}

func TestClearPublisherService(t *testing.T) {
// given
t.Parallel()
testCases := []struct {
name string
givenStatus EventingStatus
givenServiceName string
givenServiceNamespace string
wantStatus EventingStatus
}{
{
name: "should clear the publisher service",
givenStatus: EventingStatus{
PublisherService: "test-service.test-namespace",
},
givenServiceName: "test-service",
givenServiceNamespace: "test-namespace",
wantStatus: EventingStatus{
PublisherService: "",
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// when
tc.givenStatus.ClearPublisherService()

// then
require.Equal(t, tc.wantStatus, tc.givenStatus)
})
}
}

func TestSetPublisherService(t *testing.T) {
// given
t.Parallel()
testCases := []struct {
name string
givenStatus EventingStatus
givenServiceName string
givenServiceNamespace string
wantStatus EventingStatus
}{
{
name: "should set the correct publisher service",
givenStatus: EventingStatus{
PublisherService: "",
},
givenServiceName: "test-service",
givenServiceNamespace: "test-namespace",
wantStatus: EventingStatus{
PublisherService: "test-service.test-namespace",
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// when
tc.givenStatus.SetPublisherService(tc.givenServiceName, tc.givenServiceNamespace)

// then
require.Equal(t, tc.wantStatus, tc.givenStatus)
})
}
}
2 changes: 2 additions & 0 deletions config/crd/bases/operator.kyma-project.io_eventings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ spec:
- type
type: object
type: array
publisherService:
type: string
specHash:
format: int64
type: integer
Expand Down
5 changes: 3 additions & 2 deletions docs/user/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ Use the following sample CRs as guidance. Each can be applied immediately when y
| **conditions.​message** (required) | string | message is a human readable message indicating details about the transition. This may be an empty string. |
| **conditions.​observedGeneration** | integer | observedGeneration represents the .metadata.generation that the condition was set based upon. For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date with respect to the current state of the instance. |
| **conditions.​reason** (required) | string | reason contains a programmatic identifier indicating the reason for the condition's last transition. Producers of specific condition types may define expected values and meanings for this field, and whether the values are considered a guaranteed API. The value should be a CamelCase string. This field may not be empty. |
| **conditions.​status** (required) | string | status of the condition, one of `True`, `False`, `Unknown`. |
| **conditions.​status** (required) | string | status of the condition, one of `True`, `False`, `Unknown`. |
| **conditions.​type** (required) | string | type of condition in CamelCase or in foo.example.com/CamelCase. --- Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be useful (see .node.status.conditions), the ability to deconflict is important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) |
| **publisherService** | string | |
| **specHash** (required) | integer | |
| **state** (required) | string | |

<!-- TABLE-END -->
<!-- TABLE-END -->
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
)

const (
projectRootDir = "../../../../../../"
eventTypePrefix = "test-prefix"
projectRootDir = "../../../../../../"
)

var testEnvironment *testutilsintegration.TestEnvironment //nolint:gochecknoglobals // used in tests
Expand Down Expand Up @@ -231,6 +230,9 @@ func Test_CreateEventingCR_NATS(t *testing.T) {
// check if webhook configurations are updated with correct CABundle.
testEnvironment.EnsureCABundleInjectedIntoWebhooks(t)
}

// check the publisher service in the Eventing CR status
testEnvironment.EnsurePublishServiceInEventingStatus(t, tc.givenEventing.Name, tc.givenEventing.Namespace)
})
}
}
Expand Down Expand Up @@ -291,29 +293,30 @@ func Test_UpdateEventingCR(t *testing.T) {
}()

// get Eventing CR.
eventing, err := testEnvironment.GetEventingFromK8s(tc.givenExistingEventing.Name, givenNamespace)
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenExistingEventing.Name, givenNamespace)
require.NoError(t, err)

// when
// update NATS CR.
newEventing := eventing.DeepCopy()
newEventing := eventingCR.DeepCopy()
newEventing.Spec = tc.givenNewEventingForUpdate.Spec
testEnvironment.EnsureK8sResourceUpdated(t, newEventing)

// then
testEnvironment.EnsureEventingSpecPublisherReflected(t, newEventing)
testEnvironment.EnsureEventingReplicasReflected(t, newEventing)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, tc.givenExistingEventing)

// check the publisher service in the Eventing CR status
testEnvironment.EnsurePublishServiceInEventingStatus(t, eventingCR.Name, eventingCR.Namespace)
})
}
}

func Test_ReconcileSameEventingCR(t *testing.T) {
t.Parallel()

////
// given
////
eventingcontroller.IsDeploymentReady = func(deployment *kappsv1.Deployment) bool { return true }

eventingCR := utils.NewEventingCR(
Expand Down Expand Up @@ -356,9 +359,7 @@ func Test_ReconcileSameEventingCR(t *testing.T) {
const runs = 3
resourceVersionBefore := eppDeployment.ObjectMeta.ResourceVersion
for r := 0; r < runs; r++ {
////
// when
////
runId := fmt.Sprintf("run-%d", r)

eventingCR, err = testEnvironment.GetEventingFromK8s(eventingCR.Name, namespace)
Expand All @@ -374,9 +375,7 @@ func Test_ReconcileSameEventingCR(t *testing.T) {
require.NotNil(t, eventingCR)
require.Equal(t, eventingCR.ObjectMeta.Labels["reconcile"], runId)

////
// then
////
testEnvironment.EnsureEventingSpecPublisherReflected(t, eventingCR)
testEnvironment.EnsureEventingReplicasReflected(t, eventingCR)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, eventingCR)
Expand All @@ -387,6 +386,9 @@ func Test_ReconcileSameEventingCR(t *testing.T) {

resourceVersionAfter := eppDeployment.ObjectMeta.ResourceVersion
require.Equal(t, resourceVersionBefore, resourceVersionAfter)

// check the publisher service in the Eventing CR status
testEnvironment.EnsurePublishServiceInEventingStatus(t, eventingCR.Name, eventingCR.Namespace)
}
}

Expand Down Expand Up @@ -725,6 +727,9 @@ func Test_CreateEventingCR_EventMesh(t *testing.T) {
// check if webhook configurations are updated with correct CABundle.
testEnvironment.EnsureCABundleInjectedIntoWebhooks(t)
}

// check the publisher service in the Eventing CR status
testEnvironment.EnsurePublishServiceInEventingStatus(t, tc.givenEventing.Name, givenNamespace)
})
}
}
Expand Down
27 changes: 18 additions & 9 deletions internal/controller/operator/eventing/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
kctrl "sigs.k8s.io/controller-runtime"

operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/eventing"
)

const RequeueTimeForStatusCheck = 10
Expand Down Expand Up @@ -184,29 +185,37 @@ func (r *Reconciler) updateStatus(ctx context.Context, oldEventing, newEventing
return nil
}

func (r *Reconciler) handleEventingState(ctx context.Context, deployment *kappsv1.Deployment, eventing *operatorv1alpha1.Eventing, log *zap.SugaredLogger) (kctrl.Result, error) {
func (r *Reconciler) handleEventingState(ctx context.Context, deployment *kappsv1.Deployment,
eventingCR *operatorv1alpha1.Eventing, log *zap.SugaredLogger,
) (kctrl.Result, error) {
// Clear the publisher service until the publisher proxy is ready.
eventingCR.Status.ClearPublisherService()

// checking if publisher proxy is ready.
// get k8s deployment for publisher proxy
deployment, err := r.kubeClient.GetDeployment(ctx, deployment.Name, deployment.Namespace)
if err != nil {
eventing.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
eventingCR.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
operatorv1alpha1.ConditionReasonDeploymentStatusSyncFailed, err.Error())
return kctrl.Result{}, r.syncStatusWithPublisherProxyErr(ctx, eventing, err, log)
return kctrl.Result{}, r.syncStatusWithPublisherProxyErr(ctx, eventingCR, err, log)
}

if !IsDeploymentReady(deployment) {
eventing.Status.SetStateProcessing()
eventing.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
eventingCR.Status.SetStateProcessing()
eventingCR.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
operatorv1alpha1.ConditionReasonProcessing, operatorv1alpha1.ConditionPublisherProxyProcessingMessage)
log.Info("Reconciliation successful: waiting for publisher proxy to get ready...")
return kctrl.Result{RequeueAfter: RequeueTimeForStatusCheck * time.Second}, r.syncEventingStatus(ctx, eventing, log)
return kctrl.Result{RequeueAfter: RequeueTimeForStatusCheck * time.Second}, r.syncEventingStatus(ctx, eventingCR, log)
}
//
eventing.Status.SetPublisherProxyReadyToTrue()

eventingCR.Status.SetPublisherProxyReadyToTrue()

// Set the publisher service after the publisher proxy is ready.
eventingCR.Status.SetPublisherService(eventing.GetPublisherPublishServiceName(*eventingCR), eventingCR.Namespace)

// @TODO: emit events for any change in conditions
log.Info("Reconciliation successful")
return kctrl.Result{}, r.syncEventingStatus(ctx, eventing, log)
return kctrl.Result{}, r.syncEventingStatus(ctx, eventingCR, log)
}

// to be able to mock this function in tests.
Expand Down
21 changes: 21 additions & 0 deletions test/utils/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/rand"
"fmt"
"log"
"path/filepath"
"strings"
Expand Down Expand Up @@ -880,6 +881,26 @@ func (env TestEnvironment) EnsureOAuthSecretCreated(t *testing.T, eventing *v1al
env.EnsureK8sResourceCreated(t, secret)
}

func (env TestEnvironment) EnsurePublishServiceInEventingStatus(t *testing.T, name, namespace string) {
eventingCR, err := env.GetEventingFromK8s(name, namespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)

switch eventingCR.Status.State {
case v1alpha1.StateReady:
{
serviceName := eventing.GetPublisherPublishServiceName(*eventingCR)
wantPublisherService := fmt.Sprintf("%s.%s", serviceName, namespace)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
const wantPublisherService = ""
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
}
}

func (env TestEnvironment) DeleteServiceFromK8s(name, namespace string) error {
return env.k8sClient.Delete(env.Context, &kcorev1.Service{
ObjectMeta: kmetav1.ObjectMeta{
Expand Down

0 comments on commit 5c4f107

Please sign in to comment.