Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Received messages are not picked up by Tokio (task wakeup events lost?) #14

Open
Kobzol opened this issue Jan 23, 2019 · 9 comments
Open

Comments

@Kobzol
Copy link

Kobzol commented Jan 23, 2019

Hi, thanks for this awesome wrapper library. I want to use it for running (potentially) long lived ZeroMQ requests, however I have a problem - sometimes the program hangs and even though ZeroMQ messages are received, they are not picked up by the Tokio wrapper from tokio-zmq.

I have a DEALER socket and through it I send messages to a ROUTER (using TCP transport). The ROUTER will later send responses back (async client-server communication). I use several asynchronous Tokio handlers that put data into the socket (through a channel) and wait for a response before they continue (I use a HashMap of oneshot channels to keep track of the individual requests and after I get a response, I notify the corresponding handler through the oneshot channel so that it can continue).

The problem is that when I launch more than one of those handlers, it very quickly halts the program, because they the responses stop coming. The other side (ROUTER) is sending the responses (I checked with WireShark and also with strace). Strace shows that my program receives the ZeroMQ messages (recvfrom), however nothing else happens after that. My guess is that some wakeup events get lost and thus my futures hang.

I tried it with futures-zmq and the deadlock doesn't happen. I also rewrote the exact same communication to TCP/IP and the problem also doesn't happen. So I think it might be caused by tokio-zmq.

Here's a minimal working example of my code:

use std::collections::HashMap;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::sync::Mutex;

use futures::future;
use futures::future::Future;
use futures::future::IntoFuture;
use futures::future::ok;
use futures::stream::Stream;
use futures::sync::mpsc::unbounded;
use futures_zmq::{Dealer, prelude::*};
use futures_zmq::Multipart;
use zmq::Message;

struct RequestCtx {
    id: u32,
    sender: Option<futures::sync::oneshot::Sender<u32>>
}

fn main() {
    let ctx = Arc::new(zmq::Context::new());
    let rep = Dealer::builder(ctx.clone())
        .connect("tcp://127.0.0.1:5555")
        .build()
        .wait()
        .unwrap();

    let (sink, stream) = rep.sink_stream(8192).split();
    let (sender, receiver) = unbounded::<RequestCtx>();
    let map = Arc::new(Mutex::new(HashMap::<u32, RequestCtx>::new()));
    let counter = Arc::new(AtomicUsize::new(0));

    let channel_map = map.clone();
    // this process reads messages from a channel and sends them to the DEALER
    let channel_process = receiver
        .map(move |chan_val| {
            let id = chan_val.id;
            {
                eprintln!("Sending {}", id);
                let mut m = channel_map.lock().unwrap();
                eprintln!("SendingLock {}", id);
                m.insert(id.clone(), chan_val);
            }

            let data = id.to_string();
            Multipart::from(vec!(Message::from_slice(&data.as_bytes())))
    }).map_err(|_| {
        panic!();
        futures_zmq::Error::Polling
    }).forward(sink);

    let reader_map = map.clone();

    // this process reads from the DEALER and completes the waiting futures
    let runner = stream
        .map(move |mut multipart| {
            let response = multipart.pop_front().unwrap();
            let id: u32 = response.as_str().unwrap().parse().unwrap();

            eprintln!("Receive {}", id);
            let mut m = reader_map.lock().unwrap();
            eprintln!("ReceiveLock {}", id);
            m.remove(&id).unwrap()
        })
        .map_err(|_| panic!())
        .for_each(|mut req| {
            let sender = req.sender.take().unwrap();
            eprintln!("BeforeSend: {}", req.id);
            sender.send(req.id).unwrap();
            eprintln!("AfterSend: {}", req.id);
            ok(())
        });

    tokio::run(future::lazy(move || {
        tokio::spawn(channel_process.map(|_| ()).map_err(|_| panic!()));
        tokio::spawn(runner.map(|_| { () }).map_err(|_| panic!()));

        for _ in 0..4 {
            let counter = counter.clone();
            let sender = sender.clone();

            // this process repeatedly puts messages into a channel and waits until they are completed
            let process = futures::stream::repeat::<u32, ()>(5)
                .and_then(move |_| {
                    let id = counter.fetch_add(1, Ordering::SeqCst) as u32;
                    let (send, recv) = futures::sync::oneshot::channel();
                    let request = RequestCtx { id, sender: Some(send) };
                    let id = request.id;

                    sender.unbounded_send(request)
                        .into_future()
                        .map_err(|_| panic!())
                        .and_then(move |_| {
                            eprintln!("Waiting {}", id);
                            recv.map_err(|_| panic!())
                        })
                        .map(move |_| {
                            eprintln!("WaitingFinished {}", id);
                        })
                }).for_each(|_| ok(()));
            tokio::spawn(process);
        }

        Ok(())
    }));
}

If I use just one sending handler (for _ in 0..1), it works fine. However with 2 or more handlers it very quickly halts.

Here's a short Python script that can be used to simulate the other (ROUTER) side:

import asyncio

import zmq
from zmq.asyncio import Context

context = Context()

router = context.socket(zmq.ROUTER)
router.setsockopt(zmq.ROUTER_MANDATORY, 1)
router.set_hwm(10)
router.bind("tcp://127.0.0.1:5555")

async def echo():
    while True:
        data = await router.recv_multipart()
        print("message: {}".format(data[1].decode()))
        await router.send_multipart(data)

loop = asyncio.get_event_loop()
loop.create_task(echo())
loop.run_forever()

I set the HWM to a low value, because with that it is more easily reproducible.

@asonix
Copy link
Owner

asonix commented Jan 23, 2019

Thank you for opening this issue! I'll look into it tonight

@Kobzol
Copy link
Author

Kobzol commented Feb 6, 2019

Hi, have you found anything? futures_zmq has been working for me so far, but now I'm making a different ZeroMQ Rust application and there futures_zmq is several orders of magnitude slower than tokio_zmq.

I prepared a (hopefully) simpler test case. DEALER socket asynchronously receives messages, after it receives a message, it puts it into a request queue, the messages are then handled by threaded workers and computed responses are put into a response queue, which is forwarded back into the socket. I also prepared a simple Python script that sends X messages to the program and then waits for X responses back. It runs fine when I run it a few times, but after a few executions it always freezes. All of the messages from Python are sent, and all of the messages on the Rust side are forwarded to the sink, but some messages get lost between the DEALER sink and the Python receive.

use std::sync::Arc;
use std::thread;

use crossbeam::channel;
use futures::future::ok;
use futures::future::Future;
use futures::stream::Stream;
use futures::sync::mpsc;
use tokio::runtime::current_thread;
use tokio_zmq::{prelude::*, Dealer, Multipart};
use zmq::Message;

fn main() {
    let (request_sender, request_receiver) = channel::unbounded::<Multipart>();
    let (reply_sender, reply_receiver) = mpsc::unbounded::<Multipart>();

    let socket = Dealer::builder(Arc::new(zmq::Context::new()))
        .connect("tcp://localhost:9001")
        .build()
        .wait()
        .unwrap();
    let (sink, stream) = socket.sink_stream(8192).split();

    // worker
    for _ in 0..1 {
        let receiver = request_receiver.clone();
        let sender = reply_sender.clone();
        thread::spawn(move || {
            while let Ok(_) = receiver.recv() {
                let data = vec![1, 2, 3];
                let message = Message::from_slice(&data);
                let multipart = Multipart::from(vec![message]);
                sender.unbounded_send(multipart).unwrap();
            }
        });
    }

    // router
    let receive_process = stream
        .map(move |msg| {
            request_sender.send(msg).unwrap();
        })
        .for_each(|_| ok(()));

    let send_process = reply_receiver
        .map_err(|_| {
            panic!();
            tokio_zmq::Error::Sink
        })
        .forward(sink);

    current_thread::Runtime::new()
        .unwrap()
        .spawn(receive_process.map_err(|_| panic!()))
        .spawn(send_process.map(|_| ()).map_err(|_| panic!()))
        .run()
        .unwrap();
}
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.set_hwm(8192 * 2)
sock.bind("tcp://0.0.0.0:9001")

count = 5000

for _ in range(count):
    sock.send_multipart([b"msg"])

counter = 0
for _ in range(count):
    sock.recv_multipart()
    counter += 1
    if counter % 100 == 0:
        print(counter)

@Kobzol
Copy link
Author

Kobzol commented Feb 7, 2019

Another example of code that works with futures_zmq but not with tokio_zmq (simple broker that receives messages on a ROUTER and sends them forward via DEALER, responses go the opposite route).

use std::sync::Arc;

use futures::future::Future;
use futures::stream::Stream;
use tokio_zmq::{Dealer, prelude::*, Router};
use tokio::runtime::current_thread;

fn main() {
    let ctx = Arc::new(zmq::Context::new());
    let router = Router::builder(ctx.clone())
        .bind("tcp://0.0.0.0:9000")
        .build()
        .unwrap();

    let dealer = Dealer::builder(ctx.clone())
        .bind("tcp://0.0.0.0:9001")
        .build()
        .unwrap();

    let (rsink, rstream) = router.sink_stream(8192).split();
    let (dsink, dstream) = dealer.sink_stream(8192).split();

    let client_to_worker = rstream.forward(dsink);
    let worker_to_client = dstream.forward(rsink);

    current_thread::Runtime::new()
        .unwrap()
        .spawn(client_to_worker.map(|_| ()).map_err(|_| panic!()))
        .spawn(worker_to_client.map(|_| ()).map_err(|_| panic!()))
        .run()
        .unwrap();
}

@asonix
Copy link
Owner

asonix commented Feb 7, 2019

Thanks for putting all these together. I've been taking a look at things, and while I've refactored and made improvements in general, I still haven't been able to track down the lost wakeup issue. I'll keep looking into it, but it may take a larger rewrite than I was anticipating

@Kobzol
Copy link
Author

Kobzol commented Feb 7, 2019

I suppose it's tied to both reading and writing to the same socket, it didn't happen for me in other scenarios. I tried looking at the source code but I'd first have to delve deeper into Tokio to make proper sense of it :) Thank you.

@asonix
Copy link
Owner

asonix commented Jun 18, 2019

Hey! just wanted to let you know I fixed this!

@asonix asonix closed this as completed Jun 18, 2019
@Kobzol
Copy link
Author

Kobzol commented Jun 18, 2019

Thank you for the new version!
I tried the second example that I posted here (#14 (comment)) with tokio-zmq 0.10 and if I increase the count in the Python script from 5000 to 500000, it still hangs after a number of messages and then stops receiving messages altogether when I restart the Python script (sometimes it takes multiple tries, but I can reproduce it easily). I tried the same example with futures_zmq and with that all the messages go through (although it's an order of magnitude slower).

0.10 definitely helped and the problem now only happens with larger message counts (and only in release mode, since that means more messages at the same time). But I think that there's still some issue, since it still locks up.

I don't think that this is a HWM issue, since from the documentation of ZMQ_DEALER it seems to me that if the HWM is reached, the socket should block. But I don't suppose it is supposed to be blocked indefinitely, the Rust part fails to pickup the messages from the other side.

@asonix asonix reopened this Jun 18, 2019
@asonix
Copy link
Owner

asonix commented Jun 18, 2019

Could you run these with env_logger and RUST_LOG=tokio_zmq=trace? I think I know what might be going on but I'd like some confirmation

Edit: try again with 0.10.1

@Kobzol
Copy link
Author

Kobzol commented Jun 18, 2019

There's probably some race condition, if I enable too many logs, thus slowing it down, it doesn't lock up.

I'm not sure why, but I can't get tokio_zmq to output any logs with 0.10.0 and 0.10.1. Here is an output from tokio_zmq=0.9.0 which locked up after 52 messages with the first example. I pushed the example code and output from running the first example with tokio_zmq 0.9 here: https://github.com/kobzol/tokio-zmq-bug (added you as a collaborator).

The error still happens with tokio_zmq 0.10 and 0.10.1, it seems to be more rare with the second example though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants