Skip to content

Commit

Permalink
Make service wait for response reader (#390)
Browse files Browse the repository at this point in the history
* Sending response subscriber guid with request.

Signed-off-by: Miguel Company <[email protected]>

* Server ensures that response reader is matched.

Signed-off-by: Miguel Company <[email protected]>

* Addressing review.

Signed-off-by: Miguel Company <[email protected]>

* Linters

Signed-off-by: Miguel Company <[email protected]>

* Using unordered_set

Signed-off-by: Miguel Company <[email protected]>

* Linters

Signed-off-by: Miguel Company <[email protected]>

* Additional checks on rmw_service_server_is_available.

Signed-off-by: Miguel Company <[email protected]>

* Suggestions on guid_utils

Signed-off-by: Miguel Company <[email protected]>

* Added TODO mentioning DDS-RPC.

Signed-off-by: Miguel Company <[email protected]>

* linters again.

Signed-off-by: Miguel Company <[email protected]>
  • Loading branch information
MiguelCompany authored Jun 17, 2020
1 parent cfbc0fb commit d7a9559
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 6 deletions.
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ rmw_create_client(
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
if (!rmw_client) {
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, nullptr);
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
RMW_SET_ERROR_MSG("create_service() could not create publisher");
goto fail;
Expand Down Expand Up @@ -255,6 +256,10 @@ rmw_create_service(
Domain::removePublisher(info->response_publisher_);
}

if (info->pub_listener_) {
delete info->pub_listener_;
}

if (info->request_subscriber_) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_subscriber_->getGuid());
Expand Down
3 changes: 2 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ rmw_create_client(
info->request_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->request_publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
RMW_SET_ERROR_MSG("create_client() could not create publisher");
goto fail;
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
if (!rmw_client) {
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, nullptr);
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
Expand Down Expand Up @@ -285,6 +286,10 @@ rmw_create_service(
Domain::removePublisher(info->response_publisher_);
}

if (info->pub_listener_) {
delete info->pub_listener_;
}

if (info->request_subscriber_) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_subscriber_->getGuid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct CustomClientInfo
eprosima::fastrtps::Publisher * request_publisher_;
ClientListener * listener_;
eprosima::fastrtps::rtps::GUID_t writer_guid_;
eprosima::fastrtps::rtps::GUID_t reader_guid_;
eprosima::fastrtps::Participant * participant_;
const char * typesupport_identifier_;
ClientPubListener * pub_listener_;
Expand Down Expand Up @@ -88,7 +89,9 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
if (eprosima::fastrtps::rtps::ALIVE == response.sample_info_.sampleKind) {
response.sample_identity_ = response.sample_info_.related_sample_identity;

if (response.sample_identity_.writer_guid() == info_->writer_guid_) {
if (response.sample_identity_.writer_guid() == info_->reader_guid_ ||
response.sample_identity_.writer_guid() == info_->writer_guid_)
{
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <condition_variable>
#include <list>
#include <mutex>
#include <unordered_set>

#include "fastcdr/FastBuffer.h"

Expand All @@ -32,8 +33,10 @@
#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"

class ServiceListener;
class ServicePubListener;

typedef struct CustomServiceInfo
{
Expand All @@ -44,6 +47,7 @@ typedef struct CustomServiceInfo
eprosima::fastrtps::Subscriber * request_subscriber_;
eprosima::fastrtps::Publisher * response_publisher_;
ServiceListener * listener_;
ServicePubListener * pub_listener_;
eprosima::fastrtps::Participant * participant_;
const char * typesupport_identifier_;
} CustomServiceInfo;
Expand Down Expand Up @@ -84,6 +88,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
if (sub->takeNextData(&data, &request.sample_info_)) {
if (eprosima::fastrtps::rtps::ALIVE == request.sample_info_.sampleKind) {
request.sample_identity_ = request.sample_info_.sample_identity;
// Use response subscriber guid (on related_sample_identity) when present.
const eprosima::fastrtps::rtps::GUID_t & reader_guid =
request.sample_info_.related_sample_identity.writer_guid();
if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) {
request.sample_identity_.writer_guid() = reader_guid;
}

std::lock_guard<std::mutex> lock(internalMutex_);

Expand Down Expand Up @@ -159,4 +169,49 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() -> bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RMW_FASTRTPS_SHARED_CPP__GUID_UTILS_HPP_

#include <cassert>
#include <cstddef>
#include <cstring>
#include <type_traits>

Expand Down Expand Up @@ -55,6 +56,36 @@ copy_from_fastrtps_guid_to_byte_array(
memcpy(&guid_byte_array[prefix_size], &guid.entityId, guid.entityId.size);
}

struct hash_fastrtps_guid
{
std::size_t operator()(const eprosima::fastrtps::rtps::GUID_t & guid) const
{
union u_convert {
uint8_t plain_value[sizeof(guid)];
uint32_t plain_ints[sizeof(guid) / sizeof(uint32_t)];
} u;

static_assert(
sizeof(guid) == 16 &&
sizeof(u.plain_value) == sizeof(u.plain_ints) &&
offsetof(u_convert, plain_value) == offsetof(u_convert, plain_ints),
"Plain guid should be easily convertible to uint32_t[4]");

copy_from_fastrtps_guid_to_byte_array(guid, u.plain_value);

constexpr std::size_t prime_1 = 7;
constexpr std::size_t prime_2 = 31;
constexpr std::size_t prime_3 = 59;

size_t ret_val = prime_1 * u.plain_ints[0];
ret_val = prime_2 * (u.plain_ints[1] + ret_val);
ret_val = prime_3 * (u.plain_ints[2] + ret_val);
ret_val = u.plain_ints[3] + ret_val;

return ret_val;
}
};

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__GUID_UTILS_HPP_
1 change: 1 addition & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ __rmw_send_request(
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_request);
data.impl = info->request_type_support_impl_;
wparams.related_sample_identity().writer_guid() = info->reader_guid_;
if (info->request_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
Expand Down
22 changes: 22 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ __rmw_send_response(
wparams.related_sample_identity().sequence_number().low =
(int32_t)(request_header->sequence_number & 0xFFFFFFFF);

// TODO(MiguelCompany) The following block is a workaround for the race on the
// discovery of services. It is (ab)using a related_sample_identity on the request
// with the GUID of the response reader, so we can wait here for it to be matched to
// the server response writer. In the future, this should be done with the mechanism
// explained on OMG DDS-RPC 1.0 spec under section 7.6.2 (Enhanced Service Mapping)

// According to the list of possible entity kinds in section 9.3.1.2 of RTPS
// readers will have this bit on, while writers will not. We use this to know
// if the related guid is the request writer or the response reader.
constexpr uint8_t entity_id_is_reader_bit = 0x04;
const eprosima::fastrtps::rtps::GUID_t & related_guid =
wparams.related_sample_identity().writer_guid();
if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) {
// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
auto listener = info->pub_listener_;
if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
}
}

rmw_fastrtps_shared_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_response);
Expand Down
3 changes: 3 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ __rmw_destroy_service(
if (info->response_publisher_ != nullptr) {
Domain::removePublisher(info->response_publisher_);
}
if (info->pub_listener_ != nullptr) {
delete info->pub_listener_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
Expand Down
15 changes: 13 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,22 @@ __rmw_service_server_is_available(
return RMW_RET_OK;
}

if (0 == client_info->request_publisher_matched_count_.load()) {
if (number_of_request_subscribers != number_of_response_publishers) {
// not ready
return RMW_RET_OK;
}
if (0 == client_info->response_subscriber_matched_count_.load()) {

size_t matched_request_pubs = client_info->request_publisher_matched_count_.load();
if (0 == matched_request_pubs) {
// not ready
return RMW_RET_OK;
}
size_t matched_response_subs = client_info->response_subscriber_matched_count_.load();
if (0 == matched_response_subs) {
// not ready
return RMW_RET_OK;
}
if (matched_request_pubs != matched_response_subs) {
// not ready
return RMW_RET_OK;
}
Expand Down

0 comments on commit d7a9559

Please sign in to comment.