Skip to content

Commit

Permalink
Go back to task signaling
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Della Vedova <[email protected]>
  • Loading branch information
luca-della-vedova committed Dec 12, 2024
1 parent 9ee9b55 commit 5802441
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 56 deletions.
12 changes: 6 additions & 6 deletions nexus_integration_tests/config/system_bts/main.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
<ReactiveSequence>
<!--IsPauseTriggered paused="{paused}"/-->
<!--PausableSequence pause="{paused}"-->
<Parallel success_threshold="-1">
<!-- Iterates over all task assignments and dispatches an AMR -->
<Sequence>
<DispatchTransporter name="bid_amr_transporter" result="{amr_transporter}" transport_task="{transport_task}"/>
<WorkcellRequest name="amr_request" workcell="{amr_transporter}" task="{transport_task}"/>
<Parallel success_threshold="-1">
<WorkcellRequest name="amr_request" workcell="{amr_transporter}" task="{transport_task}"/>
<ForEachTask task="{task}" workcell="{workcell}">
<ExecuteTask task="{task}" workcell="{workcell}" transport_task="{transport_task}"/>
</ForEachTask>
</Parallel>
</Sequence>
<ForEachTask task="{task}" workcell="{workcell}">
<ExecuteTask task="{task}" workcell="{workcell}"/>
</ForEachTask>
</Parallel>
<!--/PausableSequence-->
</ReactiveSequence>
</BehaviorTree>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<WaitForSignal name="wait_for_amr" signal="{workcell}" clear="true"/>
<WorkcellRequest name="workcell_request" workcell="{workcell}" task="{task}"/>
<!-- The transporter will listen and send an appropriate dispenser result for RMF or noop for conveyor belt-->
<SendSignal workcell="rmf_nexus_transporter" signal="{workcell}"/>
<SendSignal task="{transport_task}" signal="{workcell}"/>
</Sequence>
</ReactiveSequence>
</BehaviorTree>
Expand Down
10 changes: 10 additions & 0 deletions nexus_system_orchestrator/src/dispatch_transporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*
*/

#include <nexus_orchestrator_msgs/msg/workcell_task.hpp>

#include <yaml-cpp/yaml.h>

#include "dispatch_transporter.hpp"
Expand Down Expand Up @@ -54,6 +56,7 @@ BT::NodeStatus DispatchTransporter::onStart()
}
// Multipickup task
// TODO(luca) Consider encoding where is a pickup and where a dropoff
// TODO(luca) remove consecutive duplicates (multiple tasks to same workcell that don't need transportation)
YAML::Node order;
order["type"] = "pickup";
order["destination"] = assignment_it->second;
Expand Down Expand Up @@ -151,6 +154,13 @@ BT::NodeStatus DispatchTransporter::_update_ongoing_requests()
workcell_id.c_str());
this->setOutput("result", workcell_id);
this->setOutput("transport_task", this->_transport_task);
// Update the context
const auto& task_id = this->_transport_task.id;
this->_ctx->workcell_task_assignments.emplace(task_id, workcell_id);
auto p = this->_ctx->task_states.emplace(task_id, nexus_orchestrator_msgs::msg::TaskState());
auto& task_state = p.first->second;
task_state.workcell_id = workcell_id;
task_state.task_id = task_id;
return BT::NodeStatus::SUCCESS;
}
else
Expand Down
5 changes: 5 additions & 0 deletions nexus_system_orchestrator/src/execute_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ BT::NodeStatus ExecuteTask::onStart()
task_bt_path));
this->_bt->rootBlackboard()->set("task", *task);
this->_bt->rootBlackboard()->set("workcell", *workcell);
auto transport_task = this->getInput<Task>("transport_task");
if (transport_task)
{
this->_bt->rootBlackboard()->set("transport_task", *transport_task);
}

return BT::NodeStatus::RUNNING;
}
Expand Down
3 changes: 2 additions & 1 deletion nexus_system_orchestrator/src/execute_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public: using Task = nexus_orchestrator_msgs::msg::WorkcellTask;
public: static BT::PortsList providedPorts()
{
return { BT::InputPort<Task>("task"),
BT::InputPort<std::string>("workcell") };
BT::InputPort<std::string>("workcell"),
BT::InputPort<Task>("transport_task") };
}

public: ExecuteTask(const std::string& name,
Expand Down
84 changes: 42 additions & 42 deletions nexus_system_orchestrator/src/signals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ namespace nexus::system_orchestrator {

BT::NodeStatus SendSignal::tick()
{
auto workcell = this->getInput<std::string>("workcell");
if (!workcell)
auto task =
this->getInput<nexus_orchestrator_msgs::msg::WorkcellTask>("task");
if (!task)
{
RCLCPP_ERROR(
this->_ctx->node.get_logger(), "%s [workcell] is required",
this->_ctx->node.get_logger(), "%s: port [task] is required",
this->name().c_str());
return BT::NodeStatus::FAILURE;
}
Expand All @@ -41,48 +42,47 @@ BT::NodeStatus SendSignal::tick()
return BT::NodeStatus::FAILURE;
}

return signal_workcell(*workcell, *signal);
}

BT::NodeStatus SendSignal::signal_workcell(const std::string& workcell, const std::string& signal)
{
auto it = this->_ctx->workcell_sessions.find(workcell);
if (it == this->_ctx->workcell_sessions.cend())
try
{
RCLCPP_ERROR(
this->_ctx->node.get_logger(),
"%s: Unable to find workcell [%s]",
this->name().c_str(), workcell.c_str());
return BT::NodeStatus::FAILURE;
}
const auto& session = it->second;
if (!session)
{
// TODO(luca) implement a queueing mechanism if the workcell is not available?
RCLCPP_WARN(
this->_ctx->node.get_logger(),
"%s: No session found for workcell [%s], skipping signaling",
this->name().c_str(), workcell.c_str());
return BT::NodeStatus::SUCCESS;
}
auto it = std::find_if(
this->_ctx->workcell_task_assignments.cbegin(),
this->_ctx->workcell_task_assignments.cend(),
[&task](const std::pair<std::string, std::string>& p)
{
return p.first == task->id;
});
if (it == this->_ctx->workcell_task_assignments.cend())
{
RCLCPP_ERROR(
this->_ctx->node.get_logger(), "%s: Unable to find workcell assigned to task [%s]",
this->name().c_str(), task->id.c_str());
return BT::NodeStatus::FAILURE;
}
const auto& workcell = it->second;

auto req =
std::make_shared<endpoints::SignalWorkcellService::ServiceType::Request>();
// TODO(luca) Using job id for task id is not scalable, generate uuids?
req->task_id = this->_ctx->job_id;
req->signal = signal;
auto resp = session->signal_wc_client->send_request(req);
RCLCPP_INFO(
this->_ctx->node.get_logger(), "%s: Sent signal [%s] to workcell [%s]",
this->name().c_str(), signal.c_str(), workcell.c_str());
if (!resp->success)
const auto& session = this->_ctx->workcell_sessions.at(workcell);
auto req =
std::make_shared<endpoints::SignalWorkcellService::ServiceType::Request>();
req->task_id = task->id;
req->signal = *signal;
auto resp = session->signal_wc_client->send_request(req);
RCLCPP_INFO(
this->_ctx->node.get_logger(), "%s: Sent signal [%s] to workcell [%s]",
this->name().c_str(), signal->c_str(), workcell.c_str());
if (!resp->success)
{
RCLCPP_WARN(
this->_ctx->node.get_logger(),
"%s: Workcell is not able to accept [%s] signal now. Queuing the signal to be sent on the next task request.",
this->name().c_str(), signal->c_str());
this->_ctx->queued_signals[task->id].emplace_back(*signal);
}
}
catch (const std::out_of_range& e)
{
// TODO(luca) implement a queueing mechanism if the request fails?
RCLCPP_WARN(
this->_ctx->node.get_logger(),
"%s: Workcell [%s] is not able to accept [%s] signal now. Skipping signaling, error: [%s]",
this->name().c_str(), workcell.c_str(), signal.c_str(), resp->message.c_str());
return BT::NodeStatus::SUCCESS;
RCLCPP_ERROR(this->_ctx->node.get_logger(), "%s: %s",
this->name().c_str(), e.what());
return BT::NodeStatus::FAILURE;
}

return BT::NodeStatus::SUCCESS;
Expand Down
3 changes: 0 additions & 3 deletions nexus_system_orchestrator/src/signals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public: static BT::PortsList providedPorts()
{
return { BT::InputPort<nexus_orchestrator_msgs::msg::WorkcellTask>("task",
"The task the signal is tied to."),
BT::InputPort<std::string>("workcell", "The workcell to signal"),
BT::InputPort<std::string>(
"signal", "Signal to send.") };
}
Expand All @@ -51,8 +50,6 @@ public: SendSignal(const std::string& name, const BT::NodeConfiguration& config,

public: BT::NodeStatus tick() override;

private: BT::NodeStatus signal_workcell(const std::string& workcell, const std::string& signal);

private: std::shared_ptr<Context> _ctx;
};

Expand Down
4 changes: 1 addition & 3 deletions nexus_system_orchestrator/src/workcell_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ WorkcellRequest::make_goal()
void WorkcellRequest::on_feedback(
endpoints::WorkcellRequestAction::ActionType::Feedback::ConstSharedPtr msg)
{
// Will insert a new state if one was not created at work order creation time
// i.e. for transportation requests that are done after task assignments
this->_ctx->task_states[this->_task.id] = msg->state;
this->_ctx->task_states.at(this->_task.id) = msg->state;
this->_on_task_progress(this->_ctx->task_states);
}

Expand Down

0 comments on commit 5802441

Please sign in to comment.