diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index 0570cd9de05..74788bcada4 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -626,6 +626,14 @@ class SharedMemGlobal return node_->is_port_ok; } + /** + * Checks if a port is OK and is opened for reading with listeners active + */ + inline bool port_has_listeners() const + { + return node_->is_port_ok && node_->is_opened_for_reading && node_->num_listeners > 0; + } + inline uint32_t port_id() const { return node_->port_id; diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp index 2e0dc7135e1..8410f975767 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp @@ -841,13 +841,26 @@ class SharedMemManager : return *this; } + /** + * Checks if a port is OK and opened for reading with listeners active + */ + bool has_listeners() const + { + return global_port_->port_has_listeners(); + } + /** * Try to enqueue a buffer in the port. + * @param[in, out] buffer reference to the SHM buffer to push to + * @param[out] is_port_ok true if the port is ok * @returns false If the port's queue is full so buffer couldn't be enqueued. */ bool try_push( - const std::shared_ptr& buffer) + const std::shared_ptr& buffer, + bool& is_port_ok) { + is_port_ok = true; + assert(std::dynamic_pointer_cast(buffer)); SharedMemBuffer* shared_mem_buffer = std::static_pointer_cast(buffer).get(); @@ -881,6 +894,7 @@ class SharedMemManager : << e.what()); regenerate_port(); + is_port_ok = false; ret = false; } else diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index 8d35d9a879b..ea627dd1d2b 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -428,6 +428,10 @@ bool SharedMemTransport::send( { using namespace eprosima::fastdds::statistics::rtps; +#if !defined(_WIN32) + cleanup_output_ports(); +#endif // if !defined(_WIN32) + fastrtps::rtps::LocatorsIterator& it = *destination_locators_begin; bool ret = true; @@ -478,6 +482,22 @@ bool SharedMemTransport::send( } +void SharedMemTransport::cleanup_output_ports() +{ + auto it = opened_ports_.begin(); + while (it != opened_ports_.end()) + { + if (it->second->has_listeners()) + { + ++it; + } + else + { + it = opened_ports_.erase(it); + } + } +} + std::shared_ptr SharedMemTransport::find_port( uint32_t port_id) { @@ -505,9 +525,22 @@ bool SharedMemTransport::push_discard( { try { - if (!find_port(remote_locator.port)->try_push(buffer)) + bool is_port_ok = false; + const size_t num_retries = 2; + for (size_t i = 0; i < num_retries && !is_port_ok; ++i) { - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped"); + if (!find_port(remote_locator.port)->try_push(buffer, is_port_ok)) + { + if (is_port_ok) + { + EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped"); + } + else + { + EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "Port " << remote_locator.port << " inconsistent. Port dropped"); + opened_ports_.erase(remote_locator.port); + } + } } } catch (const std::exception& error) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h index 85573173dd3..63f79172527 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h @@ -247,6 +247,8 @@ class SharedMemTransport : public TransportInterface const std::shared_ptr& buffer, const Locator& remote_locator); + void cleanup_output_ports(); + std::shared_ptr find_port( uint32_t port_id); diff --git a/test/unittest/transport/SharedMemTests.cpp b/test/unittest/transport/SharedMemTests.cpp index 72d4eb51469..7e2f043048f 100644 --- a/test/unittest/transport/SharedMemTests.cpp +++ b/test/unittest/transport/SharedMemTests.cpp @@ -1299,7 +1299,11 @@ TEST_F(SHMTransportTests, port_listener_dead_recover) ASSERT_TRUE(buf != nullptr); memset(buf->data(), 0, buf->size()); *static_cast(buf->data()) = 1u; - ASSERT_TRUE(port_sender->try_push(buf)); + { + bool is_port_ok = false; + ASSERT_TRUE(port_sender->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } // Wait until message received while (thread_listener2_state.load() < 1u) @@ -1324,10 +1328,18 @@ TEST_F(SHMTransportTests, port_listener_dead_recover) *static_cast(buf->data()) = 2u; // This push must fail because port is not OK - ASSERT_FALSE(port_sender->try_push(buf)); + { + bool is_port_ok = false; + ASSERT_FALSE(port_sender->try_push(buf, is_port_ok)); + ASSERT_FALSE(is_port_ok); + } // This push must success because port was regenerated in the last try_push call. - ASSERT_TRUE(port_sender->try_push(buf)); + { + bool is_port_ok = false; + ASSERT_TRUE(port_sender->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } // Wait until port is regenerated while (thread_listener2_state.load() < 3u) @@ -1458,8 +1470,16 @@ TEST_F(SHMTransportTests, port_not_ok_listener_recover) auto buffer = data_segment->alloc_buffer(1, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); *static_cast(buffer->data()) = 6; // Fail because port regeneration - ASSERT_FALSE(managed_port->try_push(buffer)); - ASSERT_TRUE(managed_port->try_push(buffer)); + { + bool is_port_ok = false; + ASSERT_FALSE(managed_port->try_push(buffer, is_port_ok)); + ASSERT_FALSE(is_port_ok); + } + { + bool is_port_ok = false; + ASSERT_TRUE(managed_port->try_push(buffer, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } thread_listener.join(); } @@ -1529,14 +1549,25 @@ TEST_F(SHMTransportTests, buffer_recover) // Test 1 (without port overflow) uint32_t send_counter = 0u; + + bool is_port_ok = false; + while (listener1_recv_count.load() < 16u) { { // The segment should never overflow auto buf = segment->alloc_buffer(1, std::chrono::steady_clock::time_point()); - ASSERT_EQ(true, pub_sub1_write->try_push(buf)); - ASSERT_EQ(true, pub_sub2_write->try_push(buf)); + { + is_port_ok = false; + ASSERT_TRUE(pub_sub1_write->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } + { + is_port_ok = false; + ASSERT_TRUE(pub_sub2_write->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } } { @@ -1571,14 +1602,22 @@ TEST_F(SHMTransportTests, buffer_recover) // The segment should never overflow auto buf = segment->alloc_buffer(1u, std::chrono::steady_clock::time_point()); - if (!pub_sub1_write->try_push(buf)) { - port_overflows1++; + is_port_ok = false; + if (!pub_sub1_write->try_push(buf, is_port_ok)) + { + EXPECT_TRUE(is_port_ok); + port_overflows1++; + } } - if (!pub_sub2_write->try_push(buf)) { - port_overflows2++; + is_port_ok = false; + if (!pub_sub2_write->try_push(buf, is_port_ok)) + { + EXPECT_TRUE(is_port_ok); + port_overflows2++; + } } } @@ -1602,8 +1641,16 @@ TEST_F(SHMTransportTests, buffer_recover) { auto buf = segment->alloc_buffer(1u, std::chrono::steady_clock::time_point()); - ASSERT_EQ(true, pub_sub1_write->try_push(buf)); - ASSERT_EQ(true, pub_sub2_write->try_push(buf)); + { + is_port_ok = false; + ASSERT_TRUE(pub_sub1_write->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } + { + is_port_ok = false; + ASSERT_TRUE(pub_sub2_write->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); + } } thread_listener1.join(); @@ -1645,7 +1692,9 @@ TEST_F(SHMTransportTests, remote_segments_free) { if (j != i) { - ASSERT_TRUE(ports[j]->try_push(buf)); + bool is_port_ok = false; + ASSERT_TRUE(ports[j]->try_push(buf, is_port_ok)); + ASSERT_TRUE(is_port_ok); ASSERT_TRUE(listeners[j]->pop() != nullptr); } }