Skip to content

Commit

Permalink
Add publisher service in the Eventing CR status
Browse files Browse the repository at this point in the history
  • Loading branch information
marcobebway committed Dec 6, 2023
1 parent cb1501f commit e8ab0ec
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 15 deletions.
13 changes: 13 additions & 0 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package v1alpha1

import (
"fmt"

kcorev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -92,6 +94,7 @@ type EventingStatus struct {
ActiveBackend BackendType `json:"activeBackend"`
BackendConfigHash int64 `json:"specHash"`
State string `json:"state"`
PublisherService string `json:"publisherService,omitempty"`
Conditions []kmetav1.Condition `json:"conditions,omitempty"`
}

Expand Down Expand Up @@ -220,3 +223,13 @@ func (e *Eventing) SyncStatusActiveBackend() {
func (e *Eventing) IsSpecBackendTypeChanged() bool {
return e.Status.ActiveBackend != e.Spec.Backend.Type
}

// ClearPublisherService clears the PublisherService in the Eventing Status.
func (e *Eventing) ClearPublisherService() {
e.Status.PublisherService = ""
}

// SyncPublisherService sets the PublisherService in the Eventing Status based on the given service name and namespace.
func (e *Eventing) SyncPublisherService(name, namespace string) {
e.Status.PublisherService = fmt.Sprintf("%s.%s", name, namespace)
}
72 changes: 72 additions & 0 deletions api/operator/v1alpha1/eventing_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,75 @@ func TestIsSpecBackendTypeChanged(t *testing.T) {
})
}
}

func TestClearPublisherService(t *testing.T) {
// given
t.Parallel()
testCases := []struct {
name string
givenEventing *Eventing
givenServiceName string
givenServiceNamespace string
wantEventingStatus EventingStatus
}{
{
name: "should clear the publisher service in the status",
givenEventing: &Eventing{
Status: EventingStatus{
PublisherService: "test-service.test-namespace",
},
},
givenServiceName: "test-service",
givenServiceNamespace: "test-namespace",
wantEventingStatus: EventingStatus{
PublisherService: "",
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// when
tc.givenEventing.ClearPublisherService()

// then
require.Equal(t, tc.wantEventingStatus, tc.givenEventing.Status)
})
}
}

func TestSyncPublisherService(t *testing.T) {
// given
t.Parallel()
testCases := []struct {
name string
givenEventing *Eventing
givenServiceName string
givenServiceNamespace string
wantEventingStatus EventingStatus
}{
{
name: "should set the correct publisher service in the status",
givenEventing: &Eventing{
Status: EventingStatus{
PublisherService: "",
},
},
givenServiceName: "test-service",
givenServiceNamespace: "test-namespace",
wantEventingStatus: EventingStatus{
PublisherService: "test-service.test-namespace",
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// when
tc.givenEventing.SyncPublisherService(tc.givenServiceName, tc.givenServiceNamespace)

// then
require.Equal(t, tc.wantEventingStatus, tc.givenEventing.Status)
})
}
}
6 changes: 4 additions & 2 deletions config/crd/bases/operator.kyma-project.io_eventings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ spec:
requests:
cpu: 40m
memory: 256Mi
description: EventingSpec defines the desired state of Eventing
description: EventingSpec defines the desired state of Eventing.
properties:
annotations:
additionalProperties:
Expand Down Expand Up @@ -265,7 +265,7 @@ spec:
- backend
type: object
status:
description: EventingStatus defines the observed state of Eventing
description: EventingStatus defines the observed state of Eventing.
properties:
activeBackend:
type: string
Expand Down Expand Up @@ -337,6 +337,8 @@ spec:
- type
type: object
type: array
publisherService:
type: string
specHash:
format: int64
type: integer
Expand Down
5 changes: 3 additions & 2 deletions docs/user/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ Use the following sample CRs as guidance. Each can be applied immediately when y
| **conditions.​message** (required) | string | message is a human readable message indicating details about the transition. This may be an empty string. |
| **conditions.​observedGeneration** | integer | observedGeneration represents the .metadata.generation that the condition was set based upon. For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date with respect to the current state of the instance. |
| **conditions.​reason** (required) | string | reason contains a programmatic identifier indicating the reason for the condition's last transition. Producers of specific condition types may define expected values and meanings for this field, and whether the values are considered a guaranteed API. The value should be a CamelCase string. This field may not be empty. |
| **conditions.​status** (required) | string | status of the condition, one of `True`, `False`, `Unknown`. |
| **conditions.​status** (required) | string | status of the condition, one of `True`, `False`, `Unknown`. |
| **conditions.​type** (required) | string | type of condition in CamelCase or in foo.example.com/CamelCase. --- Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be useful (see .node.status.conditions), the ability to deconflict is important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) |
| **publisherService** | string | |
| **specHash** (required) | integer | |
| **state** (required) | string | |

<!-- TABLE-END -->
<!-- TABLE-END -->
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,24 @@ func Test_CreateEventingCR_NATS(t *testing.T) {
// check if webhook configurations are updated with correct CABundle.
testEnvironment.EnsureCABundleInjectedIntoWebhooks(t)
}

// check PublisherService in the EventingCR status
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenEventing.Name, givenNamespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)
switch eventingCR.Status.State {
case operatorv1alpha1.StateReady:
{
serviceName := eventing.GetPublisherPublishServiceName(*eventingCR)
wantPublisherService := fmt.Sprintf("%s.%s", serviceName, eventingCR.Namespace)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
const wantPublisherService = ""
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
}
})
}
}
Expand Down Expand Up @@ -274,19 +292,34 @@ func Test_UpdateEventingCR(t *testing.T) {
}()

// get Eventing CR.
eventing, err := testEnvironment.GetEventingFromK8s(tc.givenExistingEventing.Name, givenNamespace)
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenExistingEventing.Name, givenNamespace)
require.NoError(t, err)

// when
// update NATS CR.
newEventing := eventing.DeepCopy()
newEventing := eventingCR.DeepCopy()
newEventing.Spec = tc.givenNewEventingForUpdate.Spec
testEnvironment.EnsureK8sResourceUpdated(t, newEventing)

// then
testEnvironment.EnsureEventingSpecPublisherReflected(t, newEventing)
testEnvironment.EnsureEventingReplicasReflected(t, newEventing)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, tc.givenExistingEventing)

// check PublisherService in the EventingCR status
switch eventingCR.Status.State {
case operatorv1alpha1.StateReady:
{
serviceName := eventing.GetPublisherPublishServiceName(*eventingCR)
wantPublisherService := fmt.Sprintf("%s.%s", serviceName, eventingCR.Namespace)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
const wantPublisherService = ""
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
}
})
}
}
Expand Down Expand Up @@ -680,6 +713,24 @@ func Test_CreateEventingCR_EventMesh(t *testing.T) {
// check if webhook configurations are updated with correct CABundle.
testEnvironment.EnsureCABundleInjectedIntoWebhooks(t)
}

// check PublisherService in the EventingCR status
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenEventing.Name, givenNamespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)
switch eventingCR.Status.State {
case operatorv1alpha1.StateReady:
{
serviceName := eventing.GetPublisherPublishServiceName(*eventingCR)
wantPublisherService := fmt.Sprintf("%s.%s", serviceName, eventingCR.Namespace)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
const wantPublisherService = ""
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
}
})
}
}
Expand Down
26 changes: 17 additions & 9 deletions internal/controller/operator/eventing/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
kctrl "sigs.k8s.io/controller-runtime"

operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/eventing"
)

const RequeueTimeForStatusCheck = 10
Expand Down Expand Up @@ -153,29 +154,36 @@ func (r *Reconciler) updateStatus(ctx context.Context, oldEventing, newEventing
return nil
}

func (r *Reconciler) handleEventingState(ctx context.Context, deployment *kappsv1.Deployment, eventing *operatorv1alpha1.Eventing, log *zap.SugaredLogger) (kctrl.Result, error) {
func (r *Reconciler) handleEventingState(ctx context.Context, deployment *kappsv1.Deployment,
eventingCR *operatorv1alpha1.Eventing, log *zap.SugaredLogger) (kctrl.Result, error) {

Check failure on line 158 in internal/controller/operator/eventing/status.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
// Clear the publisher service until the publisher proxy is ready.
eventingCR.ClearPublisherService()

// checking if publisher proxy is ready.
// get k8s deployment for publisher proxy
deployment, err := r.kubeClient.GetDeployment(ctx, deployment.Name, deployment.Namespace)
if err != nil {
eventing.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
eventingCR.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
operatorv1alpha1.ConditionReasonDeploymentStatusSyncFailed, err.Error())
return kctrl.Result{}, r.syncStatusWithPublisherProxyErr(ctx, eventing, err, log)
return kctrl.Result{}, r.syncStatusWithPublisherProxyErr(ctx, eventingCR, err, log)
}

if !IsDeploymentReady(deployment) {
eventing.Status.SetStateProcessing()
eventing.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
eventingCR.Status.SetStateProcessing()
eventingCR.Status.UpdateConditionPublisherProxyReady(kmetav1.ConditionFalse,
operatorv1alpha1.ConditionReasonProcessing, operatorv1alpha1.ConditionPublisherProxyProcessingMessage)
log.Info("Reconciliation successful: waiting for publisher proxy to get ready...")
return kctrl.Result{RequeueAfter: RequeueTimeForStatusCheck * time.Second}, r.syncEventingStatus(ctx, eventing, log)
return kctrl.Result{RequeueAfter: RequeueTimeForStatusCheck * time.Second}, r.syncEventingStatus(ctx, eventingCR, log)
}
//
eventing.Status.SetPublisherProxyReadyToTrue()

eventingCR.Status.SetPublisherProxyReadyToTrue()

// Set the publisher service after the publisher proxy is ready.
eventingCR.SyncPublisherService(eventing.GetPublisherPublishServiceName(*eventingCR), eventingCR.Namespace)

// @TODO: emit events for any change in conditions
log.Info("Reconciliation successful")
return kctrl.Result{}, r.syncEventingStatus(ctx, eventing, log)
return kctrl.Result{}, r.syncEventingStatus(ctx, eventingCR, log)
}

// to be able to mock this function in tests.
Expand Down

0 comments on commit e8ab0ec

Please sign in to comment.