Skip to content

Commit

Permalink
Add publisher service in the Eventing CR status
Browse files Browse the repository at this point in the history
  • Loading branch information
marcobebway committed Dec 5, 2023
1 parent d892cf6 commit 61d5148
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 6 deletions.
8 changes: 8 additions & 0 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package v1alpha1

import (
"fmt"

kcorev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -92,6 +94,7 @@ type EventingStatus struct {
ActiveBackend BackendType `json:"activeBackend"`
BackendConfigHash int64 `json:"specHash"`
State string `json:"state"`
PublisherService string `json:"publisherService"`
Conditions []kmetav1.Condition `json:"conditions,omitempty"`
}

Expand Down Expand Up @@ -220,3 +223,8 @@ func (e *Eventing) SyncStatusActiveBackend() {
func (e *Eventing) IsSpecBackendTypeChanged() bool {
return e.Status.ActiveBackend != e.Spec.Backend.Type
}

// SyncPublisherService sets the PublisherService in the Eventing Status.
func (e *Eventing) SyncPublisherService(name, namespace string) {
e.Status.PublisherService = fmt.Sprintf("%s.%s", name, namespace)
}
34 changes: 34 additions & 0 deletions api/operator/v1alpha1/eventing_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,37 @@ func TestIsSpecBackendTypeChanged(t *testing.T) {
})
}
}

func TestSyncPublisherService(t *testing.T) {
// given
t.Parallel()
testCases := []struct {
name string
givenEventing *Eventing
givenServiceName string
givenServiceNamespace string
wantEventingStatus EventingStatus
}{
{
name: "should set the correct publisher service in the status",
givenEventing: &Eventing{
Status: EventingStatus{},
},
givenServiceName: "test-service",
givenServiceNamespace: "test-namespace",
wantEventingStatus: EventingStatus{
PublisherService: "test-service.test-namespace",
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// when
tc.givenEventing.SyncPublisherService(tc.givenServiceName, tc.givenServiceNamespace)

// then
require.Equal(t, tc.wantEventingStatus, tc.givenEventing.Status)
})
}
}
3 changes: 3 additions & 0 deletions config/crd/bases/operator.kyma-project.io_eventings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,16 @@ spec:
- type
type: object
type: array
publisherService:
type: string
specHash:
format: int64
type: integer
state:
type: string
required:
- activeBackend
- publisherService
- specHash
- state
type: object
Expand Down
11 changes: 7 additions & 4 deletions internal/controller/operator/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,29 +560,32 @@ func (r *Reconciler) checkNATSAvailability(ctx context.Context, eventing *operat

func (r *Reconciler) handlePublisherProxy(
ctx context.Context,
eventing *operatorv1alpha1.Eventing,
eventingCR *operatorv1alpha1.Eventing,
backendType operatorv1alpha1.BackendType,
) (*kappsv1.Deployment, error) {
// get nats config with NATS server url
var natsConfig *env.NATSConfig
if backendType == operatorv1alpha1.NatsBackendType {
var err error
natsConfig, err = r.natsConfigHandler.GetNatsConfig(ctx, *eventing)
natsConfig, err = r.natsConfigHandler.GetNatsConfig(ctx, *eventingCR)
if err != nil {
return nil, err
}
}
// CreateOrUpdate deployment for eventing publisher proxy deployment
deployment, err := r.eventingManager.DeployPublisherProxy(ctx, eventing, natsConfig, backendType)
deployment, err := r.eventingManager.DeployPublisherProxy(ctx, eventingCR, natsConfig, backendType)
if err != nil {
return nil, err
}

// deploy publisher proxy resources.
if err = r.eventingManager.DeployPublisherProxyResources(ctx, eventing, deployment); err != nil {
if err = r.eventingManager.DeployPublisherProxyResources(ctx, eventingCR, deployment); err != nil {
return deployment, err
}

// Update the publisher service in the Eventing CR status after the publisher proxy resources are deployed.
eventingCR.SyncPublisherService(eventing.GetPublisherPublishServiceName(*eventingCR), eventingCR.Namespace)

return deployment, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ func Test_CreateEventingCR_NATS(t *testing.T) {
// check if webhook configurations are updated with correct CABundle.
testEnvironment.EnsureCABundleInjectedIntoWebhooks(t)
}

// check PublisherService in the EventingCR status
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenEventing.Name, givenNamespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)
switch eventingCR.Status.State {
case operatorv1alpha1.StateReady, operatorv1alpha1.StateProcessing:
{
wantPublisherService := fmt.Sprintf(
"%s.%s",
eventing.GetPublisherPublishServiceName(*eventingCR), eventingCR.Namespace,
)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
require.Equal(t, "", eventingCR.Status.PublisherService)
}
}
})
}
}
Expand Down Expand Up @@ -274,19 +293,35 @@ func Test_UpdateEventingCR(t *testing.T) {
}()

// get Eventing CR.
eventing, err := testEnvironment.GetEventingFromK8s(tc.givenExistingEventing.Name, givenNamespace)
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenExistingEventing.Name, givenNamespace)
require.NoError(t, err)

// when
// update NATS CR.
newEventing := eventing.DeepCopy()
newEventing := eventingCR.DeepCopy()
newEventing.Spec = tc.givenNewEventingForUpdate.Spec
testEnvironment.EnsureK8sResourceUpdated(t, newEventing)

// then
testEnvironment.EnsureEventingSpecPublisherReflected(t, newEventing)
testEnvironment.EnsureEventingReplicasReflected(t, newEventing)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, tc.givenExistingEventing)

// check PublisherService in the EventingCR status
switch eventingCR.Status.State {
case operatorv1alpha1.StateReady, operatorv1alpha1.StateProcessing:
{
wantPublisherService := fmt.Sprintf(
"%s.%s",
eventing.GetPublisherPublishServiceName(*eventingCR), eventingCR.Namespace,
)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
require.Equal(t, "", eventingCR.Status.PublisherService)
}
}
})
}
}
Expand Down Expand Up @@ -680,6 +715,25 @@ func Test_CreateEventingCR_EventMesh(t *testing.T) {
// check if webhook configurations are updated with correct CABundle.
testEnvironment.EnsureCABundleInjectedIntoWebhooks(t)
}

// check PublisherService in the EventingCR status
eventingCR, err := testEnvironment.GetEventingFromK8s(tc.givenEventing.Name, givenNamespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)
switch eventingCR.Status.State {
case operatorv1alpha1.StateReady, operatorv1alpha1.StateProcessing:
{
wantPublisherService := fmt.Sprintf(
"%s.%s",
eventing.GetPublisherPublishServiceName(*eventingCR), eventingCR.Namespace,
)
require.Equal(t, wantPublisherService, eventingCR.Status.PublisherService)
}
default:
{
require.Equal(t, "", eventingCR.Status.PublisherService)
}
}
})
}
}
Expand Down

0 comments on commit 61d5148

Please sign in to comment.