Skip to content

Commit

Permalink
Fix linter.
Browse files Browse the repository at this point in the history
  • Loading branch information
friedrichwilken committed Dec 14, 2023
1 parent 17fed54 commit 49e80b5
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions pkg/backend/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.uber.org/zap"

eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
ecenv "github.com/kyma-project/eventing-manager/pkg/env"

"github.com/kyma-project/eventing-manager/pkg/backend/cleaner"
backendmetrics "github.com/kyma-project/eventing-manager/pkg/backend/metrics"
Expand All @@ -43,7 +42,8 @@ const (
)

func NewJetStream(config env.NATSConfig, metricsCollector *backendmetrics.Collector,
cleaner cleaner.Cleaner, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger) *JetStream {
cleaner cleaner.Cleaner, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger,
) *JetStream {
return &JetStream{
Config: config,
logger: logger,
Expand Down Expand Up @@ -161,7 +161,8 @@ func (js *JetStream) DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subs

// GetJetStreamSubjects returns a list of subjects appended with prefix if needed.
func (js *JetStream) GetJetStreamSubjects(source string, subjects []string,
typeMatching eventingv1alpha2.TypeMatching) []string {
typeMatching eventingv1alpha2.TypeMatching,
) []string {
var result []string
for _, subject := range subjects {
result = append(result, js.GetJetStreamSubject(source, subject, typeMatching))
Expand Down Expand Up @@ -377,7 +378,8 @@ func (js *JetStream) syncSubscriptionEventTypes(subscription *eventingv1alpha2.S
// syncSubscriptionEventType syncs controller runtime subscriptions to subscription CR event types and to JetStream
// subscriptions/consumers.
func (js *JetStream) syncSubscriptionEventType(key SubscriptionSubjectIdentifier,
subscription *eventingv1alpha2.Subscription, subscriber Subscriber) error {
subscription *eventingv1alpha2.Subscription, subscriber Subscriber,
) error {
// don't try to delete invalid subscriber and its consumer if subscriber has type in subscription CR it belongs to.
// This means that it will be bound to the existing JetStream consumer in later steps.
if !subscriber.IsValid() && js.runtimeSubscriptionExistsInKymaSub(key, subscription) {
Expand All @@ -392,7 +394,8 @@ func (js *JetStream) cleanupUnnecessaryJetStreamSubscribers(
jsSub Subscriber,
subscription *eventingv1alpha2.Subscription,
log *zap.SugaredLogger,
key SubscriptionSubjectIdentifier) error {
key SubscriptionSubjectIdentifier,
) error {
consumer, err := js.jsCtx.ConsumerInfo(js.Config.JSStreamName, key.ConsumerName())
if err != nil {
if errors.Is(err, nats.ErrConsumerNotFound) {
Expand All @@ -419,7 +422,8 @@ func (js *JetStream) cleanupUnnecessaryJetStreamSubscribers(

// runtimeSubscriptionExistsInKymaSub returns true if runtime subscriber subject exists in subscription CR.
func (js *JetStream) runtimeSubscriptionExistsInKymaSub(runtimeSubscriptionKey SubscriptionSubjectIdentifier,
subscription *eventingv1alpha2.Subscription) bool {
subscription *eventingv1alpha2.Subscription,
) bool {
for _, subject := range subscription.Status.Types {
jsSubject := js.getJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching)
jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject)
Expand All @@ -432,7 +436,8 @@ func (js *JetStream) runtimeSubscriptionExistsInKymaSub(runtimeSubscriptionKey S

// consumerSubjectExistsInKymaSub checks if the specified consumer is used by the subscription.
func (js *JetStream) consumerSubjectExistsInKymaSub(consumer *nats.ConsumerInfo,
subscription *eventingv1alpha2.Subscription) bool {
subscription *eventingv1alpha2.Subscription,
) bool {
return utils.ContainsString(
js.GetJetStreamSubjects(
subscription.Spec.Source,
Expand Down Expand Up @@ -463,7 +468,8 @@ func (js *JetStream) deleteSubscriptionFromJetStream(jsSub Subscriber, jsSubKey
// deleteSubscriptionFromJetStreamOnly deletes the subscription from NATS server and from in-memory db.
// Note: The consumer will not be deleted, meaning there should be no message loss.
func (js *JetStream) deleteSubscriptionFromJetStreamOnly(jsSub Subscriber,
jsSubKey SubscriptionSubjectIdentifier) error {
jsSubKey SubscriptionSubjectIdentifier,
) error {
if jsSub.IsValid() {
// The Unsubscribe function should not delete the consumer because it was added manually.
if err := jsSub.Unsubscribe(); err != nil {
Expand Down Expand Up @@ -581,7 +587,8 @@ func (js *JetStream) deleteConsumerFromJetStream(name string) error {
// syncConsumerAndSubscription makes sure there is a consumer and subscription created on the NATS Backend.
// these also must be bound to each other to ensure that NATS JetStream eventing logic works as expected.
func (js *JetStream) syncConsumerAndSubscription(subscription *eventingv1alpha2.Subscription,
asyncCallback func(m *nats.Msg)) error {
asyncCallback func(m *nats.Msg),
) error {
for _, eventType := range subscription.Status.Types {
jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, eventType.CleanType, subscription.Spec.TypeMatching)
jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject)
Expand Down Expand Up @@ -621,14 +628,15 @@ func (js *JetStream) syncConsumerAndSubscription(subscription *eventingv1alpha2.

// getOrCreateConsumer fetches the ConsumerInfo from NATS Server or creates it in case it doesn't exist.
func (js *JetStream) getOrCreateConsumer(subscription *eventingv1alpha2.Subscription,
subject eventingv1alpha2.EventType) (*nats.ConsumerInfo, error) {
subject eventingv1alpha2.EventType,
) (*nats.ConsumerInfo, error) {
jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching)
jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject)

consumerInfo, err := js.jsCtx.ConsumerInfo(js.Config.JSStreamName, jsSubKey.ConsumerName())
if err != nil {
if errors.Is(err, nats.ErrConsumerNotFound) {
ecSubsConfig := ecenv.DefaultSubscriptionConfig(js.subsConfig)
ecSubsConfig := env.DefaultSubscriptionConfig(js.subsConfig)
consumerInfo, err = js.jsCtx.AddConsumer(
js.Config.JSStreamName,
js.getConsumerConfig(jsSubKey, jsSubject, subscription.GetMaxInFlightMessages(&ecSubsConfig)),
Expand All @@ -645,11 +653,12 @@ func (js *JetStream) getOrCreateConsumer(subscription *eventingv1alpha2.Subscrip

// createNATSSubscription creates a NATS Subscription and binds it to the already existing consumer.
func (js *JetStream) createNATSSubscription(subscription *eventingv1alpha2.Subscription,
subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg)) error {
subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg),
) error {
jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching)
jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject)

ecSubsConfig := ecenv.DefaultSubscriptionConfig(js.subsConfig)
ecSubsConfig := env.DefaultSubscriptionConfig(js.subsConfig)
jsSubscription, err := js.jsCtx.Subscribe(
jsSubject,
asyncCallback,
Expand All @@ -672,7 +681,8 @@ func (js *JetStream) createNATSSubscription(subscription *eventingv1alpha2.Subsc

// bindInvalidSubscriptions tries to bind the invalid NATS Subscription to the existing consumer.
func (js *JetStream) bindInvalidSubscriptions(subscription *eventingv1alpha2.Subscription,
subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg)) error {
subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg),
) error {
jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching)
jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject)
// bind the existing consumer to a new subscription on JetStream
Expand All @@ -692,8 +702,9 @@ func (js *JetStream) bindInvalidSubscriptions(subscription *eventingv1alpha2.Sub
// syncConsumerMaxInFlight checks that the latest Subscription's maxInFlight value
// is propagated to the NATS consumer as MaxAckPending.
func (js *JetStream) syncConsumerMaxInFlight(subscription *eventingv1alpha2.Subscription,
consumerInfo nats.ConsumerInfo) error {
ecSubsConfig := ecenv.DefaultSubscriptionConfig(js.subsConfig)
consumerInfo nats.ConsumerInfo,
) error {
ecSubsConfig := env.DefaultSubscriptionConfig(js.subsConfig)
maxInFlight := subscription.GetMaxInFlightMessages(&ecSubsConfig)

if consumerInfo.Config.MaxAckPending == maxInFlight {
Expand Down

0 comments on commit 49e80b5

Please sign in to comment.