diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go index 4a23097299..4592ae5c2f 100644 --- a/pkg/messaging/mqtt/publisher.go +++ b/pkg/messaging/mqtt/publisher.go @@ -8,13 +8,14 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/mainflux/mainflux/pkg/messaging" ) -var _ messaging.Publisher = (*publisher)(nil) - var errPublishTimeout = errors.New("failed to publish due to timeout reached") +var _ messaging.Publisher = (*publisher)(nil) + type publisher struct { client mqtt.Client timeout time.Duration @@ -40,11 +41,9 @@ func (pub publisher) Publish(topic string, msg messaging.Message) error { return token.Error() } ok := token.WaitTimeout(pub.timeout) - if ok && token.Error() != nil { - return token.Error() - } if !ok { return errPublishTimeout } - return nil + + return token.Error() } diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go index 297749d57d..2ef27643f1 100644 --- a/pkg/messaging/mqtt/pubsub.go +++ b/pkg/messaging/mqtt/pubsub.go @@ -29,12 +29,13 @@ func newClient(address string, timeout time.Duration) (mqtt.Client, error) { } ok := token.WaitTimeout(timeout) - if ok && token.Error() != nil { - return nil, token.Error() - } if !ok { return nil, errConnect } + if token.Error() != nil { + return nil, token.Error() + } + return client, nil } diff --git a/pkg/messaging/mqtt/subscriber.go b/pkg/messaging/mqtt/subscriber.go index ba038c0325..b306bd9467 100644 --- a/pkg/messaging/mqtt/subscriber.go +++ b/pkg/messaging/mqtt/subscriber.go @@ -9,18 +9,19 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogo/protobuf/proto" + log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" ) -var _ messaging.Publisher = (*publisher)(nil) - var ( errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") ) +var _ messaging.Subscriber = (*subscriber)(nil) + type subscriber struct { client mqtt.Client timeout time.Duration @@ -48,13 +49,11 @@ func (sub subscriber) Subscribe(topic string, handler messaging.MessageHandler) return token.Error() } ok := token.WaitTimeout(sub.timeout) - if ok && token.Error() != nil { - return token.Error() - } if !ok { return errSubscribeTimeout } - return nil + + return token.Error() } func (sub subscriber) Unsubscribe(topic string) error { @@ -62,14 +61,13 @@ func (sub subscriber) Unsubscribe(topic string) error { if token.Error() != nil { return token.Error() } + ok := token.WaitTimeout(sub.timeout) - if ok && token.Error() != nil { - return token.Error() - } if !ok { return errUnsubscribeTimeout } - return nil + + return token.Error() } func (sub subscriber) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {