Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #204 #205 #207 #206

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Instance values, command line user specifiable
#

CONFIG = max
CONFIG ?= release
CPPFLAGS =
CXXFLAGS =
LDFLAGS =
Expand Down Expand Up @@ -92,7 +92,7 @@ ifeq ($(CONFIG),max)
CONFIG_FLAGS = -O3 -funroll-loops -ffast-math -finline-functions -fomit-frame-pointer -DNDEBUG
endif
ifneq (,$(findstring $(CONFIG),release loadtest))
CONFIG_FLAGS = -O3 -funroll-loops -ffast-math -finline-functions -fomit-frame-pointer -DNO_DEBUG_LOG -DNO_TRACE_LOG -DNDEBUG
CONFIG_FLAGS = -g3 -O3 -funroll-loops -ffast-math -finline-functions -fomit-frame-pointer -DNO_DEBUG_LOG -DNO_TRACE_LOG -DNDEBUG
endif

COMMON_FLAGS = -MMD -std=c++11 -pipe -Wall -fPIC \
Expand Down
94 changes: 94 additions & 0 deletions src/tests/test_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <boost/test/unit_test.hpp>
#include <thread>
#include <exception>
#include <unistd.h>

#define private public

#include "zmqpp/context.hpp"
#include "zmqpp/message.hpp"
Expand Down Expand Up @@ -62,13 +65,48 @@ BOOST_AUTO_TEST_CASE(socket_removed_in_timer)
loop.add(output, [&socket_called]() -> bool { socket_called = true; return false; });
loop.add(std::chrono::milliseconds(0), 1, [&loop, &output]() -> bool {
loop.remove(output);
//output.close(); // Simple way fails. See socket_closed_after_remove_at_timer.
loop.add(std::chrono::milliseconds(10), 1, []() -> bool { return false; });
return true;
});

input.send("PING");

BOOST_CHECK_NO_THROW(loop.start());
BOOST_CHECK(loop.items_.size() == 0);
BOOST_CHECK(socket_called == false);
}

BOOST_AUTO_TEST_CASE(socket_closed_after_remove_at_timer)
{
zmqpp::context context;

zmqpp::socket output(context, zmqpp::socket_type::pair);
output.bind("inproc://test");
zmqpp::socket input(context, zmqpp::socket_type::pair);
input.connect("inproc://test");

zmqpp::loop loop;

bool socket_called = false;

loop.add(output, [&socket_called]() -> bool { socket_called = true; return false; },
zmqpp::poller::poll_in, [&output]()
{
output.close();
return true;
});
loop.add(std::chrono::milliseconds(0), 1, [&loop, &output]() -> bool {
loop.remove(output);
//output.close(); moved to loop.add(output,,cb2);
loop.add(std::chrono::milliseconds(10), 1, []() -> bool { return false; });
return true;
});

input.send("PING");

BOOST_CHECK_NO_THROW(loop.start());
BOOST_CHECK(loop.items_.size() == 0);
BOOST_CHECK(socket_called == false);
}

Expand Down Expand Up @@ -219,5 +257,61 @@ BOOST_AUTO_TEST_CASE(remove_socket_in_handler)
BOOST_CHECK_EQUAL(2, test2);
}

BOOST_AUTO_TEST_CASE(remove_fd_in_handler)
{
zmqpp::context context;

zmqpp::loop loop;
auto end_loop = []() -> bool { return false; };

int pipefd[2];
BOOST_CHECK_EQUAL(0, pipe(pipefd));

int test1 = 0;
loop.add(pipefd[0], [&](){
char buffer[10];
BOOST_CHECK_EQUAL(4, read(pipefd[0],buffer,10));
test1 = 1;
loop.remove(pipefd[0]);
return true;
});

BOOST_CHECK_EQUAL(4, write(pipefd[1],"haha",4));
//BOOST_CHECK_EQUAL(0, close(pipefd[1]));

loop.add(std::chrono::milliseconds(100), 1, end_loop);
BOOST_CHECK_NO_THROW(loop.start());

BOOST_CHECK_EQUAL(1, test1);
}

BOOST_AUTO_TEST_CASE(remove_invalid_fd_in_handler)
{
zmqpp::context context;

zmqpp::loop loop;
auto end_loop = []() -> bool { return false; };

int pipefd[2];
BOOST_CHECK_EQUAL(0, pipe(pipefd));

int test1 = 0;
loop.add(pipefd[0], [&](){
char buffer[10];
BOOST_CHECK_EQUAL(4, read(pipefd[0],buffer,10));
test1 = 1;
loop.remove(STDIN_FILENO);
return true;
});

BOOST_CHECK_EQUAL(4, write(pipefd[1],"haha",4));
//BOOST_CHECK_EQUAL(0, close(pipefd[1]));

loop.add(std::chrono::milliseconds(100), 1, end_loop);
BOOST_CHECK_NO_THROW(loop.start());

BOOST_CHECK_EQUAL(1, test1);
}


BOOST_AUTO_TEST_SUITE_END()
75 changes: 75 additions & 0 deletions src/tests/test_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,52 @@ BOOST_AUTO_TEST_CASE( copyable )
BOOST_CHECK_EQUAL("string", second.get(0));
}

BOOST_AUTO_TEST_CASE( append_all )
{
zmqpp::message second;
second.add("second");
BOOST_CHECK_EQUAL(1, second.parts());

{
zmqpp::message first;
first.add("string");
first.add("string2");
first.add("string3");
BOOST_CHECK_EQUAL(3, first.parts());

second.append(first);
}

BOOST_REQUIRE_EQUAL(4, second.parts());
BOOST_CHECK_EQUAL(strlen("second"), second.size(0));
BOOST_CHECK_EQUAL("second", second.get(0));
BOOST_CHECK_EQUAL("string", second.get(1));
BOOST_CHECK_EQUAL("string2", second.get(2));
BOOST_CHECK_EQUAL("string3", second.get(3));
}

BOOST_AUTO_TEST_CASE( append_partial )
{
zmqpp::message second;
second.add("second");
BOOST_CHECK_EQUAL(1, second.parts());

{
zmqpp::message first;
first.add("string");
first.add("string2");
first.add("string3");
BOOST_CHECK_EQUAL(3, first.parts());

second.append(first,1,2);
}

BOOST_REQUIRE_EQUAL(2, second.parts());
BOOST_CHECK_EQUAL(strlen("second"), second.size(0));
BOOST_CHECK_EQUAL("second", second.get(0));
BOOST_CHECK_EQUAL("string2", second.get(1));
}

#ifndef ZMQPP_IGNORE_LAMBDA_FUNCTION_TESTS
BOOST_AUTO_TEST_CASE( move_part )
{
Expand Down Expand Up @@ -365,6 +411,35 @@ BOOST_AUTO_TEST_CASE( stream_copy_input_string )
BOOST_CHECK_EQUAL("test part", message.get(0));
}

BOOST_AUTO_TEST_CASE( stream_copy_input_message )
{
zmqpp::message message("test msg1", "test msg1.2");
zmqpp::message message2("test msg2", "test msg2.1");

message << message2;

BOOST_REQUIRE_EQUAL(4, message.parts());
BOOST_CHECK_EQUAL(strlen("test msg1"), message.size(0));
BOOST_CHECK_EQUAL("test msg1", message.get(0));
BOOST_CHECK_EQUAL("test msg1.2", message.get(1));
BOOST_CHECK_EQUAL("test msg2", message.get(2));
BOOST_CHECK_EQUAL("test msg2.1", message.get(3));
}

BOOST_AUTO_TEST_CASE( stream_copy_input_frame )
{
zmqpp::message message("test msg1", "test msg1.2");
zmqpp::frame aframe("test msg2.1",strlen("test msg2.1"));

message << aframe;

BOOST_REQUIRE_EQUAL(3, message.parts());
BOOST_CHECK_EQUAL(strlen("test msg1"), message.size(0));
BOOST_CHECK_EQUAL("test msg1", message.get(0));
BOOST_CHECK_EQUAL("test msg1.2", message.get(1));
BOOST_CHECK_EQUAL("test msg2.1", message.get(2));
}

BOOST_AUTO_TEST_CASE( stream_multiple_parts )
{
zmqpp::message message;
Expand Down
42 changes: 30 additions & 12 deletions src/zmqpp/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,20 @@ namespace zmqpp
when += delay;
}

void loop::add(socket& socket, Callable callable, short const event /* = POLL_IN */)
short loop::events(raw_socket_t const descriptor) const{
zmq_pollitem_t item{nullptr, descriptor, 0, 0};
return poller_.events(item);
}

short loop::events(socket const& socket) const{
zmq_pollitem_t item{static_cast<void *> (socket), 0, 0, 0};
return poller_.events(item);
}

void loop::add(socket& socket, Callable callable, short const event /* = POLL_IN */, Callable after_remove_cb /* = Callable(nullptr)*/)
{
zmq_pollitem_t item{static_cast<void *> (socket), 0, event, 0};
add(item, callable);
add(item, callable, after_remove_cb);
}

void loop::add(raw_socket_t const descriptor, Callable callable, short const event /* = POLL_IN */)
Expand All @@ -59,11 +69,11 @@ namespace zmqpp
add(item, callable);
}

void loop::add(const zmq_pollitem_t& item, Callable callable)
void loop::add(const zmq_pollitem_t& item, Callable callable, Callable after_remove_cb)
{
poller_.add(item);
rebuild_poller_ = true;
items_.push_back(std::make_pair(item, callable));
items_.push_back(std::make_tuple(item, callable, after_remove_cb));
}

loop::timer_id_t loop::add(std::chrono::milliseconds delay, size_t times, Callable callable)
Expand Down Expand Up @@ -110,16 +120,24 @@ namespace zmqpp
sockRemoveLater_.push_back(&socket);
return;
}
items_.erase(std::remove_if(items_.begin(), items_.end(), [&socket](const PollItemCallablePair & pair) -> bool

std::vector<PollItemCallableTuple> cb_after_remove;

items_.erase(std::remove_if(items_.begin(), items_.end(),
[&socket, &cb_after_remove](const PollItemCallableTuple & tuple) -> bool
{
const zmq_pollitem_t &item = pair.first;
const zmq_pollitem_t &item = std::get<0>(tuple);
if (nullptr != item.socket && item.socket == static_cast<void *> (socket))
{
if(std::get<2>(tuple))
cb_after_remove.push_back(tuple);
return true;
}
return false;
}), items_.end());
poller_.remove(socket);
for (const PollItemCallableTuple& item : cb_after_remove)
std::get<2>(item)();
}

void loop::remove(raw_socket_t const descriptor)
Expand All @@ -130,9 +148,9 @@ namespace zmqpp
fdRemoveLater_.push_back(descriptor);
return;
}
items_.erase(std::remove_if(items_.begin(), items_.end(), [descriptor](const PollItemCallablePair & pair) -> bool
items_.erase(std::remove_if(items_.begin(), items_.end(), [descriptor](const PollItemCallableTuple & tuple) -> bool
{
const zmq_pollitem_t &item = pair.first;
const zmq_pollitem_t &item = std::get<0>(tuple);
if (nullptr == item.socket && item.fd == descriptor)
{
return true;
Expand Down Expand Up @@ -194,12 +212,12 @@ namespace zmqpp

bool loop::start_handle_poller()
{
for (const PollItemCallablePair &pair : items_)
for (const PollItemCallableTuple &tuple : items_)
{
const zmq_pollitem_t &pollitem = pair.first;
const zmq_pollitem_t &pollitem = std::get<0>(tuple);

if (poller_.has_input(pollitem) || poller_.has_error(pollitem) || poller_.has_output(pollitem))
if(!pair.second())
if (poller_.events(pollitem))
if(!std::get<1>(tuple)())
return false;
}
return true;
Expand Down
20 changes: 16 additions & 4 deletions src/zmqpp/loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ namespace zmqpp
* \param socket the socket to monitor.
* \param callable the function that will be called by the loop when a registered event occurs on socket.
* \param event the event flags to monitor on the socket.
* \param after_remove_cb will be called by loop after remove() completion.
* See tests/test_loop.cpp: socket_closed_in_timer and socket_closed_after_remove_at_timer
*/
void add(socket_t& socket, Callable callable, short const event = poller::poll_in);
void add(socket_t& socket, Callable callable, short const event = poller::poll_in, Callable after_remove_cb = Callable(nullptr));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this break ABI?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Method's name will be mangled differently. Clients of libzmqpp need to be recompiled. But no changes at source code of clients.


/*!
* Add a standard socket to the loop, providing a handler that will be called when the monitored events occur.
Expand Down Expand Up @@ -113,6 +115,16 @@ namespace zmqpp
*/
void start();


/**
* Get revents on a standard socket after poll().
*
* \param descriptor the standard socket to stop monitoring.
* Attention: it do a second search thru list of FDs
*/
short events(raw_socket_t const descriptor) const;
short events(socket const& socket) const;

private:
struct timer_t {
size_t times;
Expand All @@ -125,18 +137,18 @@ namespace zmqpp
void update();
};

typedef std::pair<zmq_pollitem_t, Callable> PollItemCallablePair;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does changing these types break api backward compatibility?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no. it's private type. But size of class will be changed, so recompile of clients is required.

bluca, please say me what is better:

  1. register onAfterRemove callback at add() time
  2. register onAfterRemove callback at remove() time

First method is implemented now. But second seems more clean for user and more simple to implement.

typedef std::tuple<zmq_pollitem_t, Callable, Callable> PollItemCallableTuple;
typedef std::pair<std::unique_ptr<timer_t>, Callable> TimerItemCallablePair;
static bool TimerItemCallablePairComp(const TimerItemCallablePair &lhs, const TimerItemCallablePair &rhs);

std::vector<PollItemCallablePair> items_;
std::vector<PollItemCallableTuple> items_;
std::list<TimerItemCallablePair> timers_;
std::vector<const socket_t *> sockRemoveLater_;
std::vector<raw_socket_t> fdRemoveLater_;
std::vector<timer_id_t> timerRemoveLater_;


void add(const zmq_pollitem_t &item, Callable callable);
void add(const zmq_pollitem_t &item, Callable callable, Callable after_remove_cb = Callable(nullptr));
void add(std::unique_ptr<timer_t>, Callable callable);

bool start_handle_timers();
Expand Down
Loading