Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to one participant per context model #145

Merged
merged 9 commits into from
Apr 22, 2020
158 changes: 69 additions & 89 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ bool operator<(dds_builtintopic_guid_t const & a, dds_builtintopic_guid_t const
static rmw_ret_t discovery_thread_stop(rmw_dds_common::Context & context);
static bool dds_qos_to_rmw_qos(const dds_qos_t * dds_qos, rmw_qos_profile_t * qos_policies);

static rmw_publisher_t * create_publisher(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_publisher_options_t * publisher_options
);
static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher);

static rmw_subscription_t * create_subscription(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options
);
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription);

static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl);
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc);

struct CddsDomain;
struct CddsWaitset;

Expand Down Expand Up @@ -244,7 +263,17 @@ struct rmw_context_impl_t
~rmw_context_impl_t()
{
discovery_thread_stop(common);
if (dds_delete(ppant) < 0) {
common.graph_cache.clear_on_change_callback();
if (common.graph_guard_condition) {
destroy_guard_condition(common.graph_guard_condition);
}
if (common.pub) {
destroy_publisher(common.pub);
}
if (common.sub) {
destroy_subscription(common.sub);
}
if (ppant > 0 && dds_delete(ppant) < 0) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy domain in destructor\n");
}
Expand Down Expand Up @@ -329,25 +358,6 @@ static void clean_waitset_caches();
static void check_for_blocked_requests(CddsClient & client);
#endif

static rmw_publisher_t * create_publisher(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_publisher_options_t * publisher_options
);
static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher);

static rmw_subscription_t * create_subscription(
dds_entity_t dds_ppant, dds_entity_t dds_pub,
const rosidl_message_type_support_t * type_supports,
const char * topic_name, const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options
);
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription);

static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl);
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc);

#ifndef WIN32
/* TODO(allenh1): check for Clang */
#pragma GCC visibility push (default)
Expand Down Expand Up @@ -618,14 +628,14 @@ static void discovery_thread(rmw_context_impl_t * impl)
dds_delete(ws);
}

static rmw_ret_t discovery_thread_start(rmw_context_t * context)
static rmw_ret_t discovery_thread_start(rmw_context_impl_t * impl)
{
auto common_context = &context->impl->common;
auto common_context = &impl->common;
common_context->thread_is_running.store(true);
common_context->listener_thread_gc = create_guard_condition(context->impl);
common_context->listener_thread_gc = create_guard_condition(impl);
if (common_context->listener_thread_gc) {
try {
common_context->listener_thread = std::thread(discovery_thread, context->impl);
common_context->listener_thread = std::thread(discovery_thread, impl);
return RMW_RET_OK;
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to create std::thread: %s", exc.what());
Expand All @@ -648,23 +658,24 @@ static rmw_ret_t discovery_thread_start(rmw_context_t * context)

static rmw_ret_t discovery_thread_stop(rmw_dds_common::Context & common_context)
{
common_context.thread_is_running.exchange(false);
rmw_ret_t rmw_ret = rmw_trigger_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
}
try {
common_context.listener_thread.join();
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to join std::thread: %s", exc.what());
return RMW_RET_ERROR;
} catch (...) {
RMW_SET_ERROR_MSG("Failed to join std::thread");
return RMW_RET_ERROR;
}
rmw_ret = destroy_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
if (common_context.thread_is_running.exchange(false)) {
rmw_ret_t rmw_ret = rmw_trigger_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
}
try {
common_context.listener_thread.join();
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to join std::thread: %s", exc.what());
return RMW_RET_ERROR;
} catch (...) {
RMW_SET_ERROR_MSG("Failed to join std::thread");
return RMW_RET_ERROR;
}
rmw_ret = destroy_guard_condition(common_context.listener_thread_gc);
if (RMW_RET_OK != rmw_ret) {
return rmw_ret;
}
}
return RMW_RET_OK;
}
Expand Down Expand Up @@ -880,6 +891,12 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
return RMW_RET_BAD_ALLOC;
}

/* "impl"'s destructor relies on these being initialized properly */
impl->common.thread_is_running.store(false);
impl->common.graph_guard_condition = nullptr;
impl->common.pub = nullptr;
impl->common.sub = nullptr;
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved

/* Take domains_lock and hold it until after the participant creation succeeded or
failed: otherwise there is a race with rmw_destroy_node deleting the last participant
and tearing down the domain for versions of Cyclone that implement the original
Expand Down Expand Up @@ -961,76 +978,42 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
/* Create RMW publisher/subscription/guard condition used by rmw_dds_common
discovery */
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
std::unique_ptr<rmw_publisher_t, std::function<void(rmw_publisher_t *)>>
publisher(
create_publisher(
impl->common.pub = create_publisher(
impl->ppant, impl->dds_pub,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"ros_discovery_info",
&pubsub_qos,
&publisher_options),
[&](rmw_publisher_t * pub) {
if (RMW_RET_OK != destroy_publisher(pub)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy publisher after function: '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
});
if (publisher == nullptr) {
&publisher_options);
if (impl->common.pub == nullptr) {
return RMW_RET_ERROR;
}

rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
subscription_options.ignore_local_publications = true;
// FIXME: keyed topics => KEEP_LAST and depth 1.
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL;
std::unique_ptr<rmw_subscription_t, std::function<void(rmw_subscription_t *)>>
subscription(
create_subscription(
impl->common.sub = create_subscription(
impl->ppant, impl->dds_sub,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"ros_discovery_info",
&pubsub_qos,
&subscription_options),
[&](rmw_subscription_t * sub) {
if (RMW_RET_OK != destroy_subscription(sub)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy subscription after function: '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
});
if (subscription == nullptr) {
&subscription_options);
if (impl->common.sub == nullptr) {
return RMW_RET_ERROR;
}

std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
create_guard_condition(impl.get()),
[&](rmw_guard_condition_t * gc) {
if (RMW_RET_OK != destroy_guard_condition(gc)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to destroy guard condition after function: '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
});
if (!graph_guard_condition) {
impl->common.graph_guard_condition = create_guard_condition(impl.get());
if (impl->common.graph_guard_condition == nullptr) {
return RMW_RET_BAD_ALLOC;
}

impl->common.graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = impl->common.graph_guard_condition]() {
static_cast<void>(rmw_trigger_guard_condition(guard_condition));
});

impl->common.pub = publisher.get();
impl->common.sub = subscription.get();
impl->common.graph_guard_condition = graph_guard_condition.get();
get_entity_gid(impl->ppant, impl->common.gid);
impl->common.graph_cache.add_participant(impl->common.gid, context->options.enclave);
impl->common.thread_is_running.store(false);
impl->common.listener_thread_gc = nullptr;

context->impl = impl.get();

// One could also use a set of listeners instead of a thread for maintaining the graph cache:
// - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock
Expand All @@ -1039,14 +1022,11 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
// updates and triggering a guard condition, and so that should be safe.
// however, the graph cache updates could be expensive, and so performing those operations on
// the thread receiving data from the network may not be wise.
if ((ret = discovery_thread_start(context)) != RMW_RET_OK) {
if ((ret = discovery_thread_start(impl.get())) != RMW_RET_OK) {
return ret;
}

graph_guard_condition.release();
publisher.release();
subscription.release();
impl.release();
context->impl = impl.release();
return RMW_RET_OK;
}

Expand Down