diff --git a/pkg/backend/jetstream/jetstream.go b/pkg/backend/jetstream/jetstream.go index aeeafb831..721726069 100644 --- a/pkg/backend/jetstream/jetstream.go +++ b/pkg/backend/jetstream/jetstream.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap" eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" - ecenv "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" backendmetrics "github.com/kyma-project/eventing-manager/pkg/backend/metrics" @@ -43,7 +42,8 @@ const ( ) func NewJetStream(config env.NATSConfig, metricsCollector *backendmetrics.Collector, - cleaner cleaner.Cleaner, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger) *JetStream { + cleaner cleaner.Cleaner, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger, +) *JetStream { return &JetStream{ Config: config, logger: logger, @@ -161,7 +161,8 @@ func (js *JetStream) DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subs // GetJetStreamSubjects returns a list of subjects appended with prefix if needed. func (js *JetStream) GetJetStreamSubjects(source string, subjects []string, - typeMatching eventingv1alpha2.TypeMatching) []string { + typeMatching eventingv1alpha2.TypeMatching, +) []string { var result []string for _, subject := range subjects { result = append(result, js.GetJetStreamSubject(source, subject, typeMatching)) @@ -377,7 +378,8 @@ func (js *JetStream) syncSubscriptionEventTypes(subscription *eventingv1alpha2.S // syncSubscriptionEventType syncs controller runtime subscriptions to subscription CR event types and to JetStream // subscriptions/consumers. func (js *JetStream) syncSubscriptionEventType(key SubscriptionSubjectIdentifier, - subscription *eventingv1alpha2.Subscription, subscriber Subscriber) error { + subscription *eventingv1alpha2.Subscription, subscriber Subscriber, +) error { // don't try to delete invalid subscriber and its consumer if subscriber has type in subscription CR it belongs to. // This means that it will be bound to the existing JetStream consumer in later steps. if !subscriber.IsValid() && js.runtimeSubscriptionExistsInKymaSub(key, subscription) { @@ -392,7 +394,8 @@ func (js *JetStream) cleanupUnnecessaryJetStreamSubscribers( jsSub Subscriber, subscription *eventingv1alpha2.Subscription, log *zap.SugaredLogger, - key SubscriptionSubjectIdentifier) error { + key SubscriptionSubjectIdentifier, +) error { consumer, err := js.jsCtx.ConsumerInfo(js.Config.JSStreamName, key.ConsumerName()) if err != nil { if errors.Is(err, nats.ErrConsumerNotFound) { @@ -419,7 +422,8 @@ func (js *JetStream) cleanupUnnecessaryJetStreamSubscribers( // runtimeSubscriptionExistsInKymaSub returns true if runtime subscriber subject exists in subscription CR. func (js *JetStream) runtimeSubscriptionExistsInKymaSub(runtimeSubscriptionKey SubscriptionSubjectIdentifier, - subscription *eventingv1alpha2.Subscription) bool { + subscription *eventingv1alpha2.Subscription, +) bool { for _, subject := range subscription.Status.Types { jsSubject := js.getJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching) jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject) @@ -432,7 +436,8 @@ func (js *JetStream) runtimeSubscriptionExistsInKymaSub(runtimeSubscriptionKey S // consumerSubjectExistsInKymaSub checks if the specified consumer is used by the subscription. func (js *JetStream) consumerSubjectExistsInKymaSub(consumer *nats.ConsumerInfo, - subscription *eventingv1alpha2.Subscription) bool { + subscription *eventingv1alpha2.Subscription, +) bool { return utils.ContainsString( js.GetJetStreamSubjects( subscription.Spec.Source, @@ -463,7 +468,8 @@ func (js *JetStream) deleteSubscriptionFromJetStream(jsSub Subscriber, jsSubKey // deleteSubscriptionFromJetStreamOnly deletes the subscription from NATS server and from in-memory db. // Note: The consumer will not be deleted, meaning there should be no message loss. func (js *JetStream) deleteSubscriptionFromJetStreamOnly(jsSub Subscriber, - jsSubKey SubscriptionSubjectIdentifier) error { + jsSubKey SubscriptionSubjectIdentifier, +) error { if jsSub.IsValid() { // The Unsubscribe function should not delete the consumer because it was added manually. if err := jsSub.Unsubscribe(); err != nil { @@ -581,7 +587,8 @@ func (js *JetStream) deleteConsumerFromJetStream(name string) error { // syncConsumerAndSubscription makes sure there is a consumer and subscription created on the NATS Backend. // these also must be bound to each other to ensure that NATS JetStream eventing logic works as expected. func (js *JetStream) syncConsumerAndSubscription(subscription *eventingv1alpha2.Subscription, - asyncCallback func(m *nats.Msg)) error { + asyncCallback func(m *nats.Msg), +) error { for _, eventType := range subscription.Status.Types { jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, eventType.CleanType, subscription.Spec.TypeMatching) jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject) @@ -621,14 +628,15 @@ func (js *JetStream) syncConsumerAndSubscription(subscription *eventingv1alpha2. // getOrCreateConsumer fetches the ConsumerInfo from NATS Server or creates it in case it doesn't exist. func (js *JetStream) getOrCreateConsumer(subscription *eventingv1alpha2.Subscription, - subject eventingv1alpha2.EventType) (*nats.ConsumerInfo, error) { + subject eventingv1alpha2.EventType, +) (*nats.ConsumerInfo, error) { jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching) jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject) consumerInfo, err := js.jsCtx.ConsumerInfo(js.Config.JSStreamName, jsSubKey.ConsumerName()) if err != nil { if errors.Is(err, nats.ErrConsumerNotFound) { - ecSubsConfig := ecenv.DefaultSubscriptionConfig(js.subsConfig) + ecSubsConfig := env.DefaultSubscriptionConfig(js.subsConfig) consumerInfo, err = js.jsCtx.AddConsumer( js.Config.JSStreamName, js.getConsumerConfig(jsSubKey, jsSubject, subscription.GetMaxInFlightMessages(&ecSubsConfig)), @@ -645,11 +653,12 @@ func (js *JetStream) getOrCreateConsumer(subscription *eventingv1alpha2.Subscrip // createNATSSubscription creates a NATS Subscription and binds it to the already existing consumer. func (js *JetStream) createNATSSubscription(subscription *eventingv1alpha2.Subscription, - subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg)) error { + subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg), +) error { jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching) jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject) - ecSubsConfig := ecenv.DefaultSubscriptionConfig(js.subsConfig) + ecSubsConfig := env.DefaultSubscriptionConfig(js.subsConfig) jsSubscription, err := js.jsCtx.Subscribe( jsSubject, asyncCallback, @@ -672,7 +681,8 @@ func (js *JetStream) createNATSSubscription(subscription *eventingv1alpha2.Subsc // bindInvalidSubscriptions tries to bind the invalid NATS Subscription to the existing consumer. func (js *JetStream) bindInvalidSubscriptions(subscription *eventingv1alpha2.Subscription, - subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg)) error { + subject eventingv1alpha2.EventType, asyncCallback func(m *nats.Msg), +) error { jsSubject := js.GetJetStreamSubject(subscription.Spec.Source, subject.CleanType, subscription.Spec.TypeMatching) jsSubKey := NewSubscriptionSubjectIdentifier(subscription, jsSubject) // bind the existing consumer to a new subscription on JetStream @@ -692,8 +702,9 @@ func (js *JetStream) bindInvalidSubscriptions(subscription *eventingv1alpha2.Sub // syncConsumerMaxInFlight checks that the latest Subscription's maxInFlight value // is propagated to the NATS consumer as MaxAckPending. func (js *JetStream) syncConsumerMaxInFlight(subscription *eventingv1alpha2.Subscription, - consumerInfo nats.ConsumerInfo) error { - ecSubsConfig := ecenv.DefaultSubscriptionConfig(js.subsConfig) + consumerInfo nats.ConsumerInfo, +) error { + ecSubsConfig := env.DefaultSubscriptionConfig(js.subsConfig) maxInFlight := subscription.GetMaxInFlightMessages(&ecSubsConfig) if consumerInfo.Config.MaxAckPending == maxInFlight {