Skip to content

Commit

Permalink
Resolve merge conflicts with rolling
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 25, 2024
2 parents 52d16cb + 76fec8b commit a664ac2
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 160 deletions.
11 changes: 9 additions & 2 deletions rmw_zenoh_cpp/apps/init_rmw_zenoh_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <stdlib.h>
#include <sys/wait.h>

#include <iostream>
#include <string>
Expand All @@ -33,9 +34,15 @@ int Main(int, char **)
// Execute zenohd command
const std::string zenohd_cmd = "zenohd -c " + zenoh_router_config_path;
const int ret = system(zenohd_cmd.c_str());
if (ret != 0) {
std::cerr << "Failed to run zenoh router: " << zenohd_cmd << std::endl;
if (ret < 0) {
std::cerr << "Error running zenoh router via command: " << zenohd_cmd << std::endl;
return ret;
} else {
if (WIFEXITED(ret)) {
std::cout << "Zenoh router exited normally." << std::endl;
} else {
std::cout << "Zenoh router exited abnormally with error code [" << ret << "]" << std::endl;
}
}
return 0;
}
Expand Down
13 changes: 6 additions & 7 deletions rmw_zenoh_cpp/src/detail/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ void GuardCondition::detach_condition()
}

///==============================================================================
bool GuardCondition::has_triggered() const
bool GuardCondition::get_and_reset_trigger()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
return has_triggered_;
}
bool ret = has_triggered_;

///==============================================================================
void GuardCondition::reset_trigger()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
// There is no data associated with the guard condition, so as soon as the callers asks about the
// state, we can immediately reset and get ready for the next trigger.
has_triggered_ = false;

return ret;
}
5 changes: 1 addition & 4 deletions rmw_zenoh_cpp/src/detail/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ class GuardCondition final

void detach_condition();

bool has_triggered() const;

// Resets has_triggered_ to false.
void reset_trigger();
bool get_and_reset_trigger();

private:
mutable std::mutex internal_mutex_;
Expand Down
264 changes: 206 additions & 58 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,203 @@ saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uin
memcpy(publisher_gid, pub_gid, 16);
}

void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_subscription_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_subscription_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

bool rmw_subscription_data_t::message_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
return message_queue_.empty();
}

std::unique_ptr<saved_msg_data> rmw_subscription_data_t::pop_next_message()
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);

if (message_queue_.empty()) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return nullptr;
}

std::unique_ptr<saved_msg_data> msg_data = std::move(message_queue_.front());
message_queue_.pop_front();

return msg_data;
}

void rmw_subscription_data_t::add_new_message(
std::unique_ptr<saved_msg_data> msg, const std::string & topic_name)
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);

if (message_queue_.size() >= adapted_qos_profile.depth) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Message queue depth of %ld reached, discarding oldest message "
"for subscription for %s",
adapted_qos_profile.depth,
topic_name.c_str());

// If the adapted_qos_profile.depth is 0, the std::move command below will result
// in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1
// in rmw_create_subscription() but to be safe, we only attempt to discard from the
// queue if it is non-empty.
if (!message_queue_.empty()) {
std::unique_ptr<saved_msg_data> old = std::move(message_queue_.front());
z_drop(z_move(old->payload));
message_queue_.pop_front();
}

}

message_queue_.emplace_back(std::move(msg));

// Since we added new data, trigger the guard condition if it is available
notify();
}

bool rmw_service_data_t::query_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
return query_queue_.empty();
}

void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_service_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (query_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_queue_.front());
query_queue_.pop_front();

return query;
}

void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
query_queue_.emplace_back(std::move(query));

// Since we added new data, trigger the guard condition if it is available
notify();
}

bool rmw_service_data_t::add_to_query_map(
int64_t sequence_number, std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) {
return false;
}
sequence_to_query_map_.emplace(
std::pair(sequence_number, std::move(query)));

return true;
}

std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequence_number)
{
std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
auto query_it = sequence_to_query_map_.find(sequence_number);
if (query_it == sequence_to_query_map_.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
sequence_to_query_map_.erase(query_it);

return query;
}

void rmw_client_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);
reply_queue_.emplace_back(std::move(reply));

notify();
}

bool rmw_client_data_t::reply_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

return reply_queue_.empty();
}

void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_client_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

if (reply_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohReply> latest_reply = std::move(reply_queue_.front());
reply_queue_.pop_front();

return latest_reply;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand All @@ -53,40 +250,10 @@ void sub_data_handler(
return;
}

{
std::lock_guard<std::mutex> lock(sub_data->message_queue_mutex);

if (sub_data->message_queue.size() >= sub_data->adapted_qos_profile.depth) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Message queue depth of %ld reached, discarding oldest message "
"for subscription for %s",
sub_data->adapted_qos_profile.depth,
z_loan(keystr));

// If the adapted_qos_profile.depth is 0, the std::move command below will result
// in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1
// in rmw_create_subscription() but to be safe, we only attempt to discard from the
// queue if it is non-empty.
if (!sub_data->message_queue.empty()) {
std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.front());
z_drop(&old->payload);
sub_data->message_queue.pop_front();
}
}

sub_data->message_queue.emplace_back(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id));

// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(sub_data->internal_mutex);
if (sub_data->condition != nullptr) {
sub_data->condition->notify_one();
}
}
sub_data->add_new_message(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr));
}

ZenohQuery::ZenohQuery(const z_query_t * query)
Expand Down Expand Up @@ -124,18 +291,7 @@ void service_data_handler(const z_query_t * query, void * data)
return;
}

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.emplace_back(std::make_unique<ZenohQuery>(query));
}
{
// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(service_data->internal_mutex);
if (service_data->condition != nullptr) {
service_data->condition->notify_one();
}
}
service_data->add_new_query(std::make_unique<ZenohQuery>(query));
}

ZenohReply::ZenohReply(const z_owned_reply_t * reply)
Expand Down Expand Up @@ -188,16 +344,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
);
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->replies_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(std::make_unique<ZenohReply>(reply));
*reply = z_reply_null();
}
{
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (client_data->condition != nullptr) {
client_data->condition->notify_one();
}
}

client_data->add_new_reply(std::make_unique<ZenohReply>(reply));
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}
Loading

0 comments on commit a664ac2

Please sign in to comment.