Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers get stuck when waiting for Paho Token #31

Open
pricelessrabbit opened this issue Mar 13, 2021 · 5 comments · May be fixed by #32
Open

Workers get stuck when waiting for Paho Token #31

pricelessrabbit opened this issue Mar 13, 2021 · 5 comments · May be fixed by #32
Assignees

Comments

@pricelessrabbit
Copy link
Contributor

STR

  • setup an export with QoS 2
  • set the workers number to a low one
  • start to send messages to mainflux
  • simulate network partition
  • reconnect the export

expected

  • all the messages sent during the disconnection are delivered to mainflux

actual

  • only 1 message per worker is delivered. For example with 3 workers, only 3 messages are persisted and sent when exporter reconnects. All the other messages are lost

Seems that the issue is in the Token.wait() used here

when a worker publish to mqtt and qos = 2, paho wait for reconnection to send the message so the worker get stuck in the wait() callback and cant manage further messages.

I managed to fix it using Token.WaitTimeout

token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
	sentInTime := token.WaitTimeout(3 * time.Second)
	if !sentInTime {
		e.logger.Warn(fmt.Sprintf("MQTT message in topic %s sending timeout exceed: persisted if QOS >= 1", topic))
		return nil
	}
	if sentInTime && token.Error() != nil {
		e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))
		return token.Error()
	}
	return nil

In that way when waiting exceeds 3 seconds the worker returns online

@mteodor
Copy link
Contributor

mteodor commented Mar 15, 2021

@pricelessrabbit Thank you
could you try this

go func() {
		token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
		if token.Wait() && token.Error() != nil {
			e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))
			
		}
		
	}()
return nil
	```

@pricelessrabbit
Copy link
Contributor Author

hi @mteodor tried both the async goroutine and WaitTimeout and both solutions works, but in case of the goroutine, i'm a bit worried in case of a long disconnection:
in case of a 1-day disconnection with 1 message per second there will be 86k of waiting goroutines. Is it ok?

@mteodor
Copy link
Contributor

mteodor commented Mar 15, 2021

@pricelessrabbit than combination of both seems best solution, what do you think? Could you make a PR for this?

@pricelessrabbit
Copy link
Contributor Author

i have this in production in my fork now and it works as expected. if it looks ok to you ill open a PR. pls tell me if i have to change the logging or any other change thanks

go func() {
		token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
		publishedInTime := token.WaitTimeout(3 * time.Second)
		if publishedInTime && token.Error() != nil {
			e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))
			return
		}
		if !publishedInTime {
			e.logger.Warn(fmt.Sprintf("Message in topic %s is taking a long time to be published", topic))
		}
	}()

@mteodor
Copy link
Contributor

mteodor commented Mar 15, 2021

Yes, please send this PR
we'll propose changes through review

@drasko drasko changed the title Workers get stuck when waiting for Phao Token Workers get stuck when waiting for Paho Token Mar 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants