diff --git a/include/mqtt/thread_queue.h b/include/mqtt/thread_queue.h index 1f3ac0c8..bcad2287 100644 --- a/include/mqtt/thread_queue.h +++ b/include/mqtt/thread_queue.h @@ -115,14 +115,9 @@ class thread_queue /** General purpose guard */ using unique_guard = std::unique_lock; - /** Throw an excpetion if the queue is closed. */ - void check_closed() { - if (closed_) throw queue_closed{}; - } - - /** Throw an excpetion if the queue is done. */ - void check_done() { - if (closed_ && que_.empty()) throw queue_closed{}; + /** Checks if the queue is done (unsafe) */ + bool is_done() const { + return closed_ && que_.empty(); } public: @@ -162,7 +157,7 @@ class thread_queue * a sufficient number */ void capacity(size_type cap) { - guard g(lock_); + guard g{lock_}; cap_ = cap; } /** @@ -170,7 +165,7 @@ class thread_queue * @return The number of items in the queue. */ size_type size() const { - guard g(lock_); + guard g{lock_}; return que_.size(); } /** @@ -180,18 +175,12 @@ class thread_queue * it is empty. */ void close() { - guard g{lock_}; + unique_guard g{lock_}; closed_ = true; - } - /* - void close(value_type finalVal) { - unique_guard g(lock_); - if (closed_) return; - que_.emplace(std::move(finalVal)); g.unlock(); - notEmptyCond_.notify_one(); + notFullCond_.notify_all(); + notEmptyCond_.notify_all(); } - */ /** * Determines if the queue is closed. * Once closed, the queue will not accept any new items, but receievers @@ -204,14 +193,14 @@ class thread_queue return closed_; } /** - * Determines if all possible operations are done on the queue. If the - * queue is closed and empty, then no further useful operations can be - * done on it. + * Determines if all possible operations are done on the queue. + * If the queue is closed and empty, then no further useful operations + * can be done on it. * @return @true if the queue is closed and empty, @em false otherwise. */ bool done() const { guard g{lock_}; - return closed_ && que_.empty(); + return is_done(); } /** * Put an item into the queue. @@ -220,10 +209,10 @@ class thread_queue * @param val The value to add to the queue. */ void put(value_type val) { - unique_guard g(lock_); + unique_guard g{lock_}; notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; }); - check_closed(); + if (closed_) throw queue_closed{}; que_.emplace(std::move(val)); g.unlock(); notEmptyCond_.notify_one(); @@ -235,9 +224,8 @@ class thread_queue * item was not added because the queue is currently full. */ bool try_put(value_type val) { - unique_guard g(lock_); - check_closed(); - if (que_.size() >= cap_) + unique_guard g{lock_}; + if (que_.size() >= cap_ || closed_) return false; que_.emplace(std::move(val)); @@ -256,11 +244,12 @@ class thread_queue */ template bool try_put_for(value_type val, const std::chrono::duration& relTime) { - unique_guard g(lock_); - bool to = !notFullCond_.wait_for(g, relTime, - [this] { return que_.size() < cap_ || closed_; }); - check_closed(); - if (to) + unique_guard g{lock_}; + bool to = !notFullCond_.wait_for( + g, relTime, + [this] { return que_.size() < cap_ || closed_; } + ); + if (to || closed_) return false; que_.emplace(std::move(val)); @@ -282,12 +271,13 @@ class thread_queue bool try_put_until( value_type val, const std::chrono::time_point& absTime ) { - unique_guard g(lock_); - bool to = !notFullCond_.wait_until(g, absTime, - [this] { return que_.size() < cap_ || closed_; }); + unique_guard g{lock_}; + bool to = !notFullCond_.wait_until( + g, absTime, + [this] { return que_.size() < cap_ || closed_; } + ); - check_closed(); - if (to) + if (to || closed_) return false; que_.emplace(std::move(val)); @@ -301,18 +291,20 @@ class thread_queue * added to the queue by another thread, * @param val Pointer to a variable to receive the value. */ - void get(value_type* val) { + bool get(value_type* val) { if (!val) - return; + return false; - unique_guard g(lock_); + unique_guard g{lock_}; notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; }); - check_done(); + if (que_.empty()) // We must be done + return false; *val = std::move(que_.front()); que_.pop(); g.unlock(); notFullCond_.notify_one(); + return true; } /** * Retrieve a value from the queue. @@ -321,9 +313,10 @@ class thread_queue * @return The value removed from the queue */ value_type get() { - unique_guard g(lock_); + unique_guard g{lock_}; notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; }); - check_done(); + if (que_.empty()) // We must be done + throw queue_closed{}; value_type val = std::move(que_.front()); que_.pop(); @@ -343,12 +336,9 @@ class thread_queue if (!val) return false; - unique_guard g(lock_); - if (que_.empty()) { - if (closed_) - throw queue_closed{}; + unique_guard g{lock_}; + if (que_.empty()) return false; - } *val = std::move(que_.front()); que_.pop(); @@ -371,11 +361,13 @@ class thread_queue if (!val) return false; - unique_guard g(lock_); - bool to = !notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; }); + unique_guard g{lock_}; + notEmptyCond_.wait_for( + g, relTime, + [this] { return !que_.empty() || closed_; } + ); - check_done(); - if (to) + if (que_.empty()) return false; *val = std::move(que_.front()); @@ -401,10 +393,12 @@ class thread_queue if (!val) return false; - unique_guard g(lock_); - bool to = !notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty(); }); - check_done(); - if (to) + unique_guard g{lock_}; + notEmptyCond_.wait_until( + g, absTime, [this] { return !que_.empty() || closed_; } + ); + + if (que_.empty()) return false; *val = std::move(que_.front()); diff --git a/test/unit/test_thread_queue.cpp b/test/unit/test_thread_queue.cpp index efb2d1fe..3e969747 100644 --- a/test/unit/test_thread_queue.cpp +++ b/test/unit/test_thread_queue.cpp @@ -74,6 +74,21 @@ TEST_CASE("thread_queue tryget", "[thread_queue]") REQUIRE(n == 3); } +TEST_CASE("thread_queue tryput", "[thread_queue]") +{ + thread_queue que{2}; + + REQUIRE(que.try_put(1)); + REQUIRE(que.try_put(2)); + + // Queue full. Put should fail + REQUIRE(!que.try_put(3)); + REQUIRE(!que.try_put_for(3, 5ms)); + + auto timeout = steady_clock::now() + 15ms; + REQUIRE(!que.try_put_until(3, timeout)); +} + TEST_CASE("thread_queue mt put/get", "[thread_queue]") { thread_queue que; @@ -134,21 +149,39 @@ TEST_CASE("thread_queue close", "[thread_queue]") REQUIRE(que.size() == 2); REQUIRE_THROWS_AS(que.put(3), queue_closed); - REQUIRE_THROWS_AS(que.try_put(3), queue_closed); - REQUIRE_THROWS_AS(que.try_put_for(3, 10ms), queue_closed); - REQUIRE_THROWS_AS(que.try_put_until(3, steady_clock::now() + 10ms), queue_closed); + REQUIRE(!que.try_put(3)); + REQUIRE(!que.try_put_for(3, 10ms)); + REQUIRE(!que.try_put_until(3, steady_clock::now() + 10ms)); // But can get any items already in there. REQUIRE(que.get() == 1); REQUIRE(que.get() == 2); - // When done (closed and empty), should throw on a get() + // When done (closed and empty), should throw on a get(), + // or fail on a try_get REQUIRE(que.empty()); REQUIRE(que.done()); int n; REQUIRE_THROWS_AS(que.get(), queue_closed); - REQUIRE_THROWS_AS(que.try_get(&n), queue_closed); - REQUIRE_THROWS_AS(que.try_get_for(&n, 10ms), queue_closed); - REQUIRE_THROWS_AS(que.try_get_until(&n, steady_clock::now() + 10ms), queue_closed); + REQUIRE(!que.try_get(&n)); + REQUIRE(!que.try_get_for(&n, 10ms)); + REQUIRE(!que.try_get_until(&n, steady_clock::now() + 10ms)); +} + +TEST_CASE("thread_queue close_signals", "[thread_queue]") +{ + thread_queue que; + REQUIRE(!que.closed()); + + auto thr = std::thread([&que] { + std::this_thread::sleep_for(10ms); + que.close(); + }); + + // Should initially block, but then throw when the queue + // is closed by the other thread. + REQUIRE_THROWS_AS(que.get(), queue_closed); + + thr.join(); }