Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Fast requests" accumulate in broker queue and are never dispatched #73

Open
JoergSchneiderSimon opened this issue Apr 12, 2020 · 0 comments

Comments

@JoergSchneiderSimon
Copy link

if clients send requests faster than workers can handle them, some of the requests are never dispatched to the worker, because s_dispatch() is only called from handle_ready() and handle_request().
What appears to be missing is a call to s_dispatch() at the end of handle_final() in mdp_broker.c

To reproduce, run attached broker, worker and client.
then create multiple requests (say, 5 requests) with the client before handling them in the worker.
Only the first one will be processed by the client.
then create a new request in the client and handle a request in the worker.
Now request number 2 will be handled, etc.

mdp_broker_issue_demo.tar.gz

so IMHO, handle_final() should look like this (added a call to s_dispatch() after re-adding a worker as waiting):

static void
handle_worker_final (client_t *self)
{
    mdp_msg_t *msg = self->message;
    mdp_msg_t *client_msg = mdp_msg_new();
    // Set routing id, messageid, service, body
    zframe_t *address = mdp_msg_address(msg);

    mdp_msg_set_routing_id(client_msg, address);
    mdp_msg_set_id(client_msg, MDP_MSG_CLIENT_FINAL);
    const char *service_name = self->service_name;
    mdp_msg_set_service(client_msg, service_name);
    zmsg_t *body = mdp_msg_get_body(msg);
    mdp_msg_set_body(client_msg, &body);
    mdp_msg_send(client_msg, self->server->router);

    // Add the worker back to the list of waiting workers.
    char *identity = zframe_strhex(mdp_msg_routing_id(msg));

    worker_t *worker =
        (worker_t *) zhash_lookup(self->server->workers, identity);
    assert(worker);
    zlist_append(self->server->waiting, worker);
    service_t *service = (service_t *) zhash_lookup(self->server->services,
        worker->service->name);
    assert(service);
    zlist_append(service->waiting, worker);

    zstr_free(&identity);
    mdp_msg_destroy(&client_msg);
    s_service_dispatch(service);
}

cheers,

Joerg

@JoergSchneiderSimon JoergSchneiderSimon changed the title "Fast requests" accumulate in broker and are never dispatched "Fast requests" accumulate in broker queue and are never dispatched Apr 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant