From f8ce94e9bb85b4d0fd0a6dfac8970bf3741e1d1d Mon Sep 17 00:00:00 2001 From: __touk__ Date: Wed, 9 Feb 2022 10:19:09 +0100 Subject: [PATCH] NOISSUE - Refactor MQTT subscriber (#1561) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * correct suscriber interface validator + refactore token error handling Signed-off-by: tzzed * apply review suggestion Signed-off-by: tzzed Co-authored-by: Dušan Borovčanin --- pkg/messaging/mqtt/publisher.go | 11 +++++------ pkg/messaging/mqtt/pubsub.go | 7 ++++--- pkg/messaging/mqtt/subscriber.go | 18 ++++++++---------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go index 4a23097299..4592ae5c2f 100644 --- a/pkg/messaging/mqtt/publisher.go +++ b/pkg/messaging/mqtt/publisher.go @@ -8,13 +8,14 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/mainflux/mainflux/pkg/messaging" ) -var _ messaging.Publisher = (*publisher)(nil) - var errPublishTimeout = errors.New("failed to publish due to timeout reached") +var _ messaging.Publisher = (*publisher)(nil) + type publisher struct { client mqtt.Client timeout time.Duration @@ -40,11 +41,9 @@ func (pub publisher) Publish(topic string, msg messaging.Message) error { return token.Error() } ok := token.WaitTimeout(pub.timeout) - if ok && token.Error() != nil { - return token.Error() - } if !ok { return errPublishTimeout } - return nil + + return token.Error() } diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go index 297749d57d..2ef27643f1 100644 --- a/pkg/messaging/mqtt/pubsub.go +++ b/pkg/messaging/mqtt/pubsub.go @@ -29,12 +29,13 @@ func newClient(address string, timeout time.Duration) (mqtt.Client, error) { } ok := token.WaitTimeout(timeout) - if ok && token.Error() != nil { - return nil, token.Error() - } if !ok { return nil, errConnect } + if token.Error() != nil { + return nil, token.Error() + } + return client, nil } diff --git a/pkg/messaging/mqtt/subscriber.go b/pkg/messaging/mqtt/subscriber.go index ba038c0325..b306bd9467 100644 --- a/pkg/messaging/mqtt/subscriber.go +++ b/pkg/messaging/mqtt/subscriber.go @@ -9,18 +9,19 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogo/protobuf/proto" + log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" ) -var _ messaging.Publisher = (*publisher)(nil) - var ( errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") ) +var _ messaging.Subscriber = (*subscriber)(nil) + type subscriber struct { client mqtt.Client timeout time.Duration @@ -48,13 +49,11 @@ func (sub subscriber) Subscribe(topic string, handler messaging.MessageHandler) return token.Error() } ok := token.WaitTimeout(sub.timeout) - if ok && token.Error() != nil { - return token.Error() - } if !ok { return errSubscribeTimeout } - return nil + + return token.Error() } func (sub subscriber) Unsubscribe(topic string) error { @@ -62,14 +61,13 @@ func (sub subscriber) Unsubscribe(topic string) error { if token.Error() != nil { return token.Error() } + ok := token.WaitTimeout(sub.timeout) - if ok && token.Error() != nil { - return token.Error() - } if !ok { return errUnsubscribeTimeout } - return nil + + return token.Error() } func (sub subscriber) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {