Skip to content

Commit

Permalink
Fix paho failing tests (#174)
Browse files Browse the repository at this point in the history
fix: SignalDisconnect before SignalOffline
Handles case when Disconnect from client provides SessionExpiryInterval

fix: tag name for RetainAvailable parameter
fix: send the username to ACL on publish
refactor: check for topic alias when the packet arrived
  prevent double-checking for ACL
refactor: decrement children count on node delete
refactor: set publish stub before unsubscribing
fix: fix to #173
  Shutdown subscriber when the session expire fired.
  Prevents subscriber from leaking.
refactor: incoming PUBLISH messages
  handle flow control as per spec

fix: return of granted QoS
refactor: send disconnect from connection close stage 2 only
refactor: clone PUBLISH packet before sending from a topic manager
  • Loading branch information
troian authored Apr 14, 2020
1 parent 52e1126 commit 3e00ff1
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 205 deletions.
64 changes: 32 additions & 32 deletions clients/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type sessionEvents interface {
sessionOffline(string, sessionOfflineState)
connectionClosed(string, bool, mqttp.ReasonCode)
subscriberShutdown(string, vlsubscriber.IFace)
subscriberShutdown(string)
}

type sessionPreConfig struct {
Expand All @@ -34,12 +34,13 @@ type sessionPreConfig struct {

type sessionConfig struct {
sessionEvents
subscriber vlsubscriber.IFace
will *mqttp.Publish
expireIn *uint32
durable bool
sharedSubscriptions bool // nolint:structcheck
version mqttp.ProtocolVersion
subscriber vlsubscriber.IFace
will *mqttp.Publish
expireIn *uint32
durable bool
sharedSubscriptions bool // nolint:structcheck
subscriptionIDAllowed bool
version mqttp.ProtocolVersion
}

type sessionOfflineState struct {
Expand Down Expand Up @@ -119,40 +120,41 @@ func (s *session) SignalSubscribe(pkt *mqttp.Subscribe) (mqttp.IFace, error) {
// V5.0 [MQTT-3.8.2.1.2]
if prop := pkt.PropertyGet(mqttp.PropertySubscriptionIdentifier); prop != nil {
if v, e := prop.AsInt(); e == nil {
if !s.subscriptionIDAllowed {
return nil, mqttp.CodeSubscriptionIDNotSupported
}
subsID = v
} else {
return nil, mqttp.CodeProtocolError
}
}

err := pkt.ForEachTopic(func(t *mqttp.Topic) error {
// V5.0
// [MQTT-3.8.3-4] It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription
if t.Ops().NL() && (t.ShareName() != "") {
return mqttp.CodeProtocolError
}
if t.ShareName() != "" {
// [MQTT-3.8.3-4] It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription
if t.Ops().NL() {
return mqttp.CodeProtocolError
}

if !s.sharedSubscriptions && (t.ShareName() != "") {
return mqttp.CodeSharedSubscriptionNotSupported
if !s.sharedSubscriptions {
retCodes = append(retCodes, mqttp.CodeSharedSubscriptionNotSupported)
return nil
}
}

return nil
})

if err != nil {
return nil, err
}

_ = pkt.ForEachTopic(func(t *mqttp.Topic) error {
var reason mqttp.ReasonCode

if e := s.permissions.ACL(s.id, s.username, t.Filter(), vlauth.AccessRead); e == vlauth.StatusAllow {
params := vlsubscriber.SubscriptionParams{
ID: subsID,
Ops: t.Ops(),
}

if retained, ee := s.subscriber.Subscribe(t.Filter(), params); ee != nil {
if granted, retained, ee := s.subscriber.Subscribe(t.Filter(), params); ee != nil {
reason = mqttp.QosFailure
} else {
reason = mqttp.ReasonCode(params.Granted)
reason = mqttp.ReasonCode(granted)
retainedPublishes = append(retainedPublishes, retained...)
}
} else {
Expand All @@ -168,6 +170,10 @@ func (s *session) SignalSubscribe(pkt *mqttp.Subscribe) (mqttp.IFace, error) {
return nil
})

if err != nil {
return nil, err
}

if err = resp.AddReturnCodes(retCodes); err != nil {
return nil, err
}
Expand Down Expand Up @@ -242,11 +248,8 @@ func (s *session) SignalUnSubscribe(pkt *mqttp.UnSubscribe) (mqttp.IFace, error)
}

// SignalDisconnect process DISCONNECT packet from client
func (s *session) SignalDisconnect(pkt *mqttp.Disconnect) (mqttp.IFace, error) {
func (s *session) SignalDisconnect(pkt *mqttp.Disconnect) error {
var err error
var resp mqttp.IFace

err = mqttp.CodeSuccess

if s.version == mqttp.ProtocolV50 {
// FIXME: CodeRefusedBadUsernameOrPassword has same id as CodeDisconnectWithWill
Expand All @@ -262,9 +265,6 @@ func (s *session) SignalDisconnect(pkt *mqttp.Disconnect) (mqttp.IFace, error) {
// uses DISCONNECT with Reason Code 0x82 (Protocol Error) as described in section 4.13.
if (s.expireIn != nil && *s.expireIn == 0) && val != 0 {
err = mqttp.CodeProtocolError
p := mqttp.NewDisconnect(s.version)
p.SetReasonCode(mqttp.CodeProtocolError)
resp = p
} else {
s.expireIn = &val
}
Expand All @@ -274,7 +274,7 @@ func (s *session) SignalDisconnect(pkt *mqttp.Disconnect) (mqttp.IFace, error) {
s.will = nil
}

return resp, err
return err
}

// SignalOnline signal state is get online
Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *session) SignalConnectionClose(params connection.DisconnectParams) {
s.connectionClosed(s.id, s.durable, params.Reason)

if !state.keepContainer {
s.subscriberShutdown(s.id, s.subscriber)
s.subscriberShutdown(s.id)
s.subscriber = nil
}

Expand Down
26 changes: 15 additions & 11 deletions clients/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,14 @@ func (m *Manager) newSession(cn connection.Initial, params *connection.ConnectPa
if info, err = m.loadContainer(cn.Session(), params, acl); err == nil {
ses = info.ses
config := sessionConfig{
sessionEvents: m,
expireIn: params.ExpireIn,
will: params.Will,
durable: params.Durable,
version: params.Version,
subscriber: info.sub,
sessionEvents: m,
expireIn: params.ExpireIn,
will: params.Will,
durable: params.Durable,
version: params.Version,
sharedSubscriptions: m.Config.Options.SubsShared,
subscriptionIDAllowed: m.Config.Options.SubsID,
subscriber: info.sub,
}

_ = ses.configure(config)
Expand Down Expand Up @@ -724,11 +726,12 @@ func (m *Manager) connectionClosed(_ string, durable bool, _ mqttp.ReasonCode) {
m.Metrics.Clients().OnDisconnected(durable)
}

func (m *Manager) subscriberShutdown(id string, sub vlsubscriber.IFace) {
sub.Offline(true)
func (m *Manager) subscriberShutdown(id string) {
if val, ok := m.sessions.Load(id); ok {
wrap := val.(*container)
wrap.sub = nil
if wrap.sub != nil {
wrap.sub.Offline(true)
}
} else {
m.log.Error("subscriber shutdown. container not found", zap.String("ClientID", id))
}
Expand Down Expand Up @@ -774,8 +777,9 @@ func (m *Manager) sessionOffline(id string, state sessionOfflineState) {

func (m *Manager) sessionTimer(id string, expired bool) {
if expired {
_ = m.persistence.Delete([]byte(id))
m.subscriberShutdown(id)

_ = m.persistence.Delete([]byte(id))
m.sessions.Delete(id)
m.sessionsCount.Done()
m.expiryCount.Done()
Expand All @@ -794,7 +798,7 @@ func (m *Manager) configurePersistedSubscribers(ctx *loadContext) {
})

for topic, ops := range t.sub.topics {
if _, err := sub.Subscribe(topic, ops); err != nil {
if _, _, err := sub.Subscribe(topic, ops); err != nil {
m.log.Error("Couldn't subscribe", zap.Error(err))
}
}
Expand Down
4 changes: 2 additions & 2 deletions configuration/configTypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type MqttConfig struct {
Options struct {
ConnectTimeout int `yaml:"connectTimeout,omitempty"`
SessionPreempt bool `yaml:"sessionPreempt,omitempty" yaml:"sessionDups,omitempty"` // nolint:staticcheck
RetainAvailable bool `yaml:"retainAvailable,omitempty"`
RetainAvailable bool `yaml:"retainAvail,omitempty"`
SubsOverlap bool `yaml:"subsOverlap,omitempty"`
SubsID bool `yaml:"subsId,omitempty"`
SubsShared bool `yaml:"subsShared,omitempty"`
Expand All @@ -134,7 +134,7 @@ type MqttConfig struct {
}
}

// ListenersConfig
// ListenersConfig ...
type ListenersConfig struct {
DefaultAddr string `yaml:"defaultAddr,omitempty"`
MQTT map[string]map[string]PortConfig `yaml:"mqtt,omitempty"`
Expand Down
9 changes: 8 additions & 1 deletion connection/ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@ type ackQueue struct {
onRelease onRelease
}

func (a *ackQueue) store(pkt mqttp.IFace) {
func (a *ackQueue) store(pkt mqttp.IFace, replace bool) bool {
id, _ := pkt.ID()

if _, ok := a.messages.Load(id); ok && !replace {
return false
}

a.messages.Store(id, pkt)

return true
}

func (a *ackQueue) release(pkt mqttp.IFace) bool {
Expand Down
Loading

0 comments on commit 3e00ff1

Please sign in to comment.