-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Received messages are not picked up by Tokio (task wakeup events lost?) #14
Comments
Thank you for opening this issue! I'll look into it tonight |
Hi, have you found anything? I prepared a (hopefully) simpler test case. DEALER socket asynchronously receives messages, after it receives a message, it puts it into a request queue, the messages are then handled by threaded workers and computed responses are put into a response queue, which is forwarded back into the socket. I also prepared a simple Python script that sends X messages to the program and then waits for X responses back. It runs fine when I run it a few times, but after a few executions it always freezes. All of the messages from Python are sent, and all of the messages on the Rust side are forwarded to the sink, but some messages get lost between the DEALER sink and the Python receive. use std::sync::Arc;
use std::thread;
use crossbeam::channel;
use futures::future::ok;
use futures::future::Future;
use futures::stream::Stream;
use futures::sync::mpsc;
use tokio::runtime::current_thread;
use tokio_zmq::{prelude::*, Dealer, Multipart};
use zmq::Message;
fn main() {
let (request_sender, request_receiver) = channel::unbounded::<Multipart>();
let (reply_sender, reply_receiver) = mpsc::unbounded::<Multipart>();
let socket = Dealer::builder(Arc::new(zmq::Context::new()))
.connect("tcp://localhost:9001")
.build()
.wait()
.unwrap();
let (sink, stream) = socket.sink_stream(8192).split();
// worker
for _ in 0..1 {
let receiver = request_receiver.clone();
let sender = reply_sender.clone();
thread::spawn(move || {
while let Ok(_) = receiver.recv() {
let data = vec![1, 2, 3];
let message = Message::from_slice(&data);
let multipart = Multipart::from(vec![message]);
sender.unbounded_send(multipart).unwrap();
}
});
}
// router
let receive_process = stream
.map(move |msg| {
request_sender.send(msg).unwrap();
})
.for_each(|_| ok(()));
let send_process = reply_receiver
.map_err(|_| {
panic!();
tokio_zmq::Error::Sink
})
.forward(sink);
current_thread::Runtime::new()
.unwrap()
.spawn(receive_process.map_err(|_| panic!()))
.spawn(send_process.map(|_| ()).map_err(|_| panic!()))
.run()
.unwrap();
} import zmq
ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.set_hwm(8192 * 2)
sock.bind("tcp://0.0.0.0:9001")
count = 5000
for _ in range(count):
sock.send_multipart([b"msg"])
counter = 0
for _ in range(count):
sock.recv_multipart()
counter += 1
if counter % 100 == 0:
print(counter) |
Another example of code that works with futures_zmq but not with tokio_zmq (simple broker that receives messages on a ROUTER and sends them forward via DEALER, responses go the opposite route). use std::sync::Arc;
use futures::future::Future;
use futures::stream::Stream;
use tokio_zmq::{Dealer, prelude::*, Router};
use tokio::runtime::current_thread;
fn main() {
let ctx = Arc::new(zmq::Context::new());
let router = Router::builder(ctx.clone())
.bind("tcp://0.0.0.0:9000")
.build()
.unwrap();
let dealer = Dealer::builder(ctx.clone())
.bind("tcp://0.0.0.0:9001")
.build()
.unwrap();
let (rsink, rstream) = router.sink_stream(8192).split();
let (dsink, dstream) = dealer.sink_stream(8192).split();
let client_to_worker = rstream.forward(dsink);
let worker_to_client = dstream.forward(rsink);
current_thread::Runtime::new()
.unwrap()
.spawn(client_to_worker.map(|_| ()).map_err(|_| panic!()))
.spawn(worker_to_client.map(|_| ()).map_err(|_| panic!()))
.run()
.unwrap();
} |
Thanks for putting all these together. I've been taking a look at things, and while I've refactored and made improvements in general, I still haven't been able to track down the lost wakeup issue. I'll keep looking into it, but it may take a larger rewrite than I was anticipating |
I suppose it's tied to both reading and writing to the same socket, it didn't happen for me in other scenarios. I tried looking at the source code but I'd first have to delve deeper into Tokio to make proper sense of it :) Thank you. |
Hey! just wanted to let you know I fixed this! |
Thank you for the new version! 0.10 definitely helped and the problem now only happens with larger message counts (and only in release mode, since that means more messages at the same time). But I think that there's still some issue, since it still locks up. I don't think that this is a HWM issue, since from the documentation of |
Could you run these with env_logger and Edit: try again with 0.10.1 |
There's probably some race condition, if I enable too many logs, thus slowing it down, it doesn't lock up. I'm not sure why, but I can't get The error still happens with tokio_zmq 0.10 and 0.10.1, it seems to be more rare with the second example though. |
Hi, thanks for this awesome wrapper library. I want to use it for running (potentially) long lived ZeroMQ requests, however I have a problem - sometimes the program hangs and even though ZeroMQ messages are received, they are not picked up by the Tokio wrapper from
tokio-zmq
.I have a DEALER socket and through it I send messages to a ROUTER (using TCP transport). The ROUTER will later send responses back (async client-server communication). I use several asynchronous Tokio handlers that put data into the socket (through a channel) and wait for a response before they continue (I use a HashMap of oneshot channels to keep track of the individual requests and after I get a response, I notify the corresponding handler through the oneshot channel so that it can continue).
The problem is that when I launch more than one of those handlers, it very quickly halts the program, because they the responses stop coming. The other side (ROUTER) is sending the responses (I checked with WireShark and also with strace). Strace shows that my program receives the ZeroMQ messages (
recvfrom
), however nothing else happens after that. My guess is that some wakeup events get lost and thus my futures hang.I tried it with
futures-zmq
and the deadlock doesn't happen. I also rewrote the exact same communication to TCP/IP and the problem also doesn't happen. So I think it might be caused bytokio-zmq
.Here's a minimal working example of my code:
If I use just one sending handler (
for _ in 0..1
), it works fine. However with 2 or more handlers it very quickly halts.Here's a short Python script that can be used to simulate the other (ROUTER) side:
I set the HWM to a low value, because with that it is more easily reproducible.
The text was updated successfully, but these errors were encountered: