-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement connection heartbeating. #66
base: master
Are you sure you want to change the base?
Conversation
This commit also includes some code changes where occurrences of `try!` were replaced with the `?` operator, only in related code paths.
@@ -42,6 +43,9 @@ pub struct Options { | |||
pub locale: String, | |||
pub scheme: AMQPScheme, | |||
pub properties: Table, | |||
|
|||
/// In seconds. | |||
pub heartbeat: u16, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for the heartbeat
option to be a Duration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I like that. Option<Duration>
, per the discussion below.
thread::spawn(|| Session::reading_loop(con1, channels_clone)); | ||
|
||
// If the caller has configured heartbeats, then begin the heartbeat loop. | ||
if options.heartbeat > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than checking if heartbeat > 0
, I think it's preferable to use Option<T>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, considered doing that. The reason I went with zero is because, what if a user sets it to 0
? Seems to be a gray area. Not sure what that is actually supposed to practically mean, other than hammer the MQ with heartbeats or disable heartbeats :)
. If we go with Option<T>
we will just have to cover that edge case and document it in the README & docs.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also do something similar with Duration
where we just check that the duration is some amount of time >=
1 second, and only then will we run the heartbeats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing to consider, the AMQP 0.9.1 spec recommends using heartbeats in terms of seconds, and between 8-30 seconds at that.
Hey, I am interested in this change. What is the status of it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments regarding things rust did complain about when compiling this PR.
// Emit a new heartbeat frame & panic if any failure occurred. | ||
let res = connection.write(heartbeat_frame.clone()); | ||
if let Err(err) = res { | ||
connection.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result of that function call is being discarded. Can we do this explicit instead of implicit?
// If the caller has configured heartbeats, then begin the heartbeat loop. | ||
if options.heartbeat > 0 { | ||
let conn = connection.clone(); | ||
let chans = Arc::clone(&channels); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chans
var is never used. Is it created on purpose or is it a leftover from the development?
loop { | ||
// Emit a new heartbeat frame & panic if any failure occurred. | ||
let res = connection.write(heartbeat_frame.clone()); | ||
if let Err(err) = res { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err
is never used. Consider using _err
instead.
I started resolving some of @Antti's and my comments in my fork. https://github.com/andir/rust-amqp/commits/heartbeat Feel free to cherry-pick them into this PR. |
@andir @MicroJoe @Antti hey all, just wanted to reach out since I haven't moved on this PR for a while. I actually ended up going with a different AMQP lib for the stuff I was hacking on at the time. I needed futures based async as opposed to threading and such. @andir sounds like you have forked and have made some progress. That's awesome! Please feel free to open a PR which supersedes this one. I don't think I will have the time to address some of the deeper issues I was concerned with here. @Antti feel free to close this PR whenever you would like. I'll leave it open until I hear back from @andir on what their plans are for moving forward with the fork. Thanks all! |
open questions
Heartbeats are great, and are working as of this PR; however, until we implement an auto-reconnect algorithm, heartbeats are not super effective. Do we want to include an auto-reconnect implementation here as well? Or does it suffice to simply close the underlying TCP connection for now?
TODO:
0
disables heartbeats for the connection.Session.open_channel
. Use an internal mutex for keeping track of used channels, reclaiming dropped channels, and allowingSession
to beSync
so that we can follow the "channel per thread" more practically.Should close #50.