Skip to content

Commit

Permalink
first rough working poc of distributed chained controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
mamueluth committed May 20, 2023
1 parent 92e2965 commit 6f9de32
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ class ControllerManager : public rclcpp::Node
CONTROLLER_MANAGER_PUBLIC
void register_sub_controller_manager();

CONTROLLER_MANAGER_PUBLIC
void register_reference_interfaces(const std::vector<std::string> & reference_interfaces_names);

CONTROLLER_MANAGER_PUBLIC
controller_interface::ControllerInterfaceBaseSharedPtr add_controller_impl(
const ControllerSpec & controller);
Expand Down
169 changes: 167 additions & 2 deletions controller_manager/src/controller_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "controller_interface/controller_interface_base.hpp"
#include "controller_manager_msgs/msg/hardware_component_state.hpp"
#include "controller_manager_msgs/msg/publisher_description.hpp"

#include "hardware_interface/distributed_control_interface/command_forwarder.hpp"
#include "hardware_interface/distributed_control_interface/state_publisher.hpp"
Expand Down Expand Up @@ -645,6 +646,55 @@ void ControllerManager::register_sub_controller_manager_references_srv_cb(
std::shared_ptr<controller_manager_msgs::srv::RegisterSubControllerManagerReferences::Response>
response)
{
std::lock_guard<std::mutex> guard(central_controller_manager_srv_lock_);

// only command interfaces can be state publishers. We initialize state interfaces to empty list.
std::vector<controller_manager_msgs::msg::PublisherDescription> empty_state_publishers{};
auto sub_ctrl_mng_wrapper = std::make_shared<distributed_control::SubControllerManagerWrapper>(
request->sub_controller_manager_namespace, request->sub_controller_manager_name,
empty_state_publishers, request->command_state_publishers);

std::vector<std::shared_ptr<hardware_interface::DistributedReadWriteHandle>>
distributed_command_interfaces;
distributed_command_interfaces.reserve(sub_ctrl_mng_wrapper->get_command_forwarder_count());
// create distributed command interface and import into resource storage.
distributed_command_interfaces =
resource_manager_->import_reference_interfaces_of_sub_controller_manager(
sub_ctrl_mng_wrapper, get_namespace(), distributed_pub_sub_node_);

for (const auto & command_interface : distributed_command_interfaces)
{
// register every node of command_interface at executor only if multiple nodes
// are used. Otherwise the single nodes has already been added
if (use_multiple_nodes())
{
try
{
executor_->add_node(command_interface->get_node()->get_node_base_interface());
}
catch (const std::runtime_error & e)
{
response->ok = false;
RCLCPP_WARN_STREAM(
get_logger(),
"ControllerManager: Caught exception while trying to register node of reference "
"interface of sub_controller_manager. Exception:"
<< e.what());
}
}
auto msg = controller_manager_msgs::msg::PublisherDescription();
msg.ns = get_namespace();
msg.name.prefix_name = command_interface->get_prefix_name();
msg.name.interface_name = command_interface->get_interface_name();
// TODO(Manuel): want topic name relative to namespace, but have to treat "root" namespace separate
msg.publisher_topic = std::string("/") + command_interface->forward_command_topic_name();
response->command_state_publishers.push_back(msg);
}

response->ok = true;
RCLCPP_INFO_STREAM(
get_logger(), "ControllerManager: Registered reference interfaces of sub_controller_manager <"
<< sub_ctrl_mng_wrapper->get_name() << ">.");
}

void ControllerManager::create_hardware_state_publishers(
Expand Down Expand Up @@ -829,6 +879,112 @@ void ControllerManager::register_sub_controller_manager()
}
}

void ControllerManager::register_reference_interfaces(
const std::vector<std::string> & reference_interfaces_names)
{
RCLCPP_INFO_STREAM(
get_logger(), "SubControllerManager:<" << get_namespace() << "/" << get_name()
<< "> trying to register reference interfaces.");
rclcpp::Client<controller_manager_msgs::srv::RegisterSubControllerManagerReferences>::SharedPtr
client = create_client<controller_manager_msgs::srv::RegisterSubControllerManagerReferences>(
"/register_sub_controller_manager_references");

auto request = std::make_shared<
controller_manager_msgs::srv::RegisterSubControllerManagerReferences::Request>();
request->sub_controller_manager_namespace = get_namespace();
request->sub_controller_manager_name = get_name();

// export the provided CommandForwarders
for (auto const & reference_interface_name : reference_interfaces_names)
{
auto [found, command_forwarder] =
resource_manager_->find_command_forwarder(reference_interface_name);
if (found)
{
// create description of StatePublisher including: prefix_name, interface_name and topic.
// So that receiver is able to create a DistributedStateInterface which subscribes to the
// topics provided by this sub controller manager
request->command_state_publishers.push_back(
command_forwarder->create_publisher_description_msg());
}
else
{
RCLCPP_WARN_STREAM(
get_logger(), "SubControllerManager: <"
<< get_namespace() << "/" << get_name()
<< "> could not find command_forwarder for reference interfaces:"
<< reference_interface_name);
}
}

using namespace std::chrono_literals;
while (!client->wait_for_service(1s))
{
if (!rclcpp::ok())
{
RCLCPP_ERROR_STREAM(
get_logger(), "SubControllerManager:<"
<< get_namespace() << "/" << get_name()
<< ">. Interrupted while waiting for central controller managers "
"register_sub_controller_manager_references service. Exiting.");
return;
}
RCLCPP_INFO_STREAM(
get_logger(), "SubControllerManager:<"
<< get_namespace() << "/" << get_name()
<< ">. Central controller managers "
"register_sub_controller_manager_references service not available, "
"waiting again...");
}

auto result = client->async_send_request(request);
// TODO(Manuel): first try to wait synchronous. If this doesn't work we might have to create a
// queue or something similar, add the future and check in update periodically if finished.

// This blocks... which might be bad...
result.wait();
// can call get only once
auto res = result.get();
if (res->ok)
{
auto command_state_publishers = res->command_state_publishers;
// TODO(Manuel) we should probably make the keys explicit (add key_generation function to handles)
// send keys with request
for (const auto & command_state_publisher : command_state_publishers)
{
std::string key = command_state_publisher.name.prefix_name + "/" +
command_state_publisher.name.interface_name;
auto [found, command_forwarder] = resource_manager_->find_command_forwarder(key);
if (found)
{
RCLCPP_WARN_STREAM(
get_logger(), "SubControllerManager: <" << get_namespace() << "/" << get_name()
<< "> found commad forwarder for" << key);
command_forwarder->subscribe_to_command_publisher(command_state_publisher.publisher_topic);
}
else
{
RCLCPP_WARN_STREAM(
get_logger(), "SubControllerManager:<"
<< get_namespace() << "/" << get_name()
<< ">. Could not find a CommandForwarder for key[" << key
<< "]. No subscription to command state possible.");
}
}
RCLCPP_INFO_STREAM(
get_logger(), "SubControllerManager:<" << get_namespace() << "/" << get_name()
<< ">. Successfully registered.");
}
else
{
RCLCPP_WARN_STREAM(
get_logger(), "SubControllerManager: <"
<< get_namespace() << "/" << get_name()
<< ">. Registration of StatePublishers failed. Central ControllerManager "
"returned error code.");
}
}

controller_interface::ControllerInterfaceBaseSharedPtr ControllerManager::load_controller(
const std::string & controller_name, const std::string & controller_type)
{
Expand Down Expand Up @@ -1069,10 +1225,19 @@ controller_interface::return_type ControllerManager::configure_controller(

if (is_sub_controller_manager())
{
// TODO(Manuel); This is only for fast poc, chaining of multiples in sub controller
// is most likely going to lead to issues if handled this way.
// We should only allow the first controller in the chain to be distributed in each
// sub controller manager and chain the successor locally in sub controller manager
// instead of exporting for every.

// Set chained mode as default true and make references available so that
// hardware_command_forwarders can be created.
controller->set_chained_mode(true);
resource_manager_->make_controller_reference_interfaces_available(controller_name);
// export all of the just created reference interfaces by default
create_hardware_command_forwarders(reference_interfaces_names);

// TODO(Manuel) : register
register_reference_interfaces(reference_interfaces_names);
}

// TODO(destogl): check and resort controllers in the vector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ class SubControllerManagerWrapper final

size_t get_command_forwarder_count() const { return command_forwarder_descriptions_.size(); }

std::string type() const { return TYPE_; }

private:
const std::string NAMESPACE_;
const std::string NAME_;
const std::string TYPE_ = "sub_controller_manager";
std::vector<PublisherDescription> state_publisher_descriptions_;
std::vector<PublisherDescription> command_forwarder_descriptions_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct HardwareComponentInfo
/// Component name.
std::string name;

/// Component "classification": "actuator", "sensor" or "system"
/// Component "classification": "actuator", "sensor", "system" or "sub_controller_manager"
std::string type;

/// Component pluginlib plugin name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ class HARDWARE_INTERFACE_PUBLIC ResourceManager
std::shared_ptr<distributed_control::SubControllerManagerWrapper> sub_controller_manager,
const std::string & ns, std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node);

std::vector<std::shared_ptr<DistributedReadWriteHandle>>
import_reference_interfaces_of_sub_controller_manager(
std::shared_ptr<distributed_control::SubControllerManagerWrapper> sub_controller_manager,
const std::string & ns, std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node);

void add_hardware_state_publishers(
std::shared_ptr<distributed_control::StatePublisher> state_publisher);

Expand Down
81 changes: 76 additions & 5 deletions hardware_interface/src/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,13 @@ class ResourceStorage
void add_sub_controller_manager(
std::shared_ptr<distributed_control::SubControllerManagerWrapper> sub_controller_manager)
{
const auto [it, success] = sub_controller_manager_map_.insert(
std::pair{sub_controller_manager->get_name(), sub_controller_manager});
HardwareComponentInfo component_info;
component_info.name = sub_controller_manager->get_name();
component_info.type = sub_controller_manager->type();
component_info.state_interfaces = {};
component_info.command_interfaces = {};
const auto [it, success] =
hardware_info_map_.insert(std::pair{component_info.name, component_info});
if (!success)
{
std::string msg(
Expand Down Expand Up @@ -786,6 +791,64 @@ class ResourceStorage
return distributed_command_interfaces;
}

std::vector<std::shared_ptr<DistributedReadWriteHandle>> import_distributed_reference_interfaces(
std::shared_ptr<distributed_control::SubControllerManagerWrapper> sub_controller_manager,
const std::string & ns, std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node)
{
std::vector<std::shared_ptr<DistributedReadWriteHandle>> distributed_command_interfaces;
distributed_command_interfaces.reserve(sub_controller_manager->get_command_forwarder_count());
std::vector<std::string> interface_names;
interface_names.reserve(sub_controller_manager->get_command_forwarder_count());

for (const auto & command_forwarder_description :
sub_controller_manager->get_command_forwarder_descriptions())
{
// create StateInterface from the Description and store in ResourceStorage.
auto command_interface =
std::make_shared<DistributedReadWriteHandle>(command_forwarder_description, ns, node);
add_command_interface(command_interface);
// add to return vector, node needs to added to executor.
distributed_command_interfaces.push_back(command_interface);
// TODO(Manuel) this should be handled at one point DRY (adding, claimed ....), key should be made explicit
claimed_command_interface_map_.emplace(std::make_pair(command_interface->get_name(), false));
interface_names.push_back(command_interface->get_name());
}
// TODO(Manuel) this should be handled at one point DRY(adding, claimed,make available....), key should be made explicit
available_command_interfaces_.reserve(
available_command_interfaces_.capacity() + interface_names.size());
auto command_interfaces_end =
hardware_info_map_[sub_controller_manager->get_name()].command_interfaces.end();
hardware_info_map_[sub_controller_manager->get_name()].command_interfaces.insert(
command_interfaces_end, interface_names.begin(), interface_names.end());

for (const auto & interface : interface_names)
{
// TODO(destogl): check if interface should be available on configure
auto found_it = std::find(
available_command_interfaces_.begin(), available_command_interfaces_.end(), interface);

if (found_it == available_command_interfaces_.end())
{
available_command_interfaces_.emplace_back(interface);
RCUTILS_LOG_DEBUG_NAMED(
"resource_manager", "(hardware '%s'): '%s' command interface added into available list",
sub_controller_manager->get_name().c_str(), interface.c_str());
}
else
{
// TODO(destogl): do here error management if interfaces are only partially added into
// "available" list - this should never be the case!
RCUTILS_LOG_WARN_NAMED(
"resource_manager",
"(hardware '%s'): '%s' command interface already in available list."
" This can happen due to multiple calls to 'configure'",
sub_controller_manager->get_name().c_str(), interface.c_str());
}
}

return distributed_command_interfaces;
}

// hardware plugins
pluginlib::ClassLoader<ActuatorInterface> actuator_loader_;
pluginlib::ClassLoader<SensorInterface> sensor_loader_;
Expand Down Expand Up @@ -821,9 +884,6 @@ class ResourceStorage

std::map<std::string, std::shared_ptr<distributed_control::CommandForwarder>>
command_interface_command_forwarder_map_;

std::map<std::string, std::shared_ptr<distributed_control::SubControllerManagerWrapper>>
sub_controller_manager_map_;
};

ResourceManager::ResourceManager() : resource_storage_(std::make_unique<ResourceStorage>()) {}
Expand Down Expand Up @@ -957,6 +1017,16 @@ ResourceManager::import_command_interfaces_of_sub_controller_manager(
return resource_storage_->import_distributed_command_interfaces(sub_controller_manager, ns, node);
}

std::vector<std::shared_ptr<DistributedReadWriteHandle>>
ResourceManager::import_reference_interfaces_of_sub_controller_manager(
std::shared_ptr<distributed_control::SubControllerManagerWrapper> sub_controller_manager,
const std::string & ns, std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node)
{
std::lock_guard<std::recursive_mutex> guard(resource_interfaces_lock_);
return resource_storage_->import_distributed_reference_interfaces(
sub_controller_manager, ns, node);
}

void ResourceManager::add_hardware_state_publishers(
std::shared_ptr<distributed_control::StatePublisher> state_publisher)
{
Expand Down Expand Up @@ -988,6 +1058,7 @@ ResourceManager::get_command_forwarders() const
std::pair<bool, std::shared_ptr<distributed_control::CommandForwarder>>
ResourceManager::find_command_forwarder(const std::string & key)
{
std::lock_guard<std::recursive_mutex> guard(resource_interfaces_lock_);
return resource_storage_->find_command_forwarder(key);
}

Expand Down

0 comments on commit 6f9de32

Please sign in to comment.