Skip to content

Commit

Permalink
iox-#2044 Port 'PortIntrospection' to new FixedPositionContainer
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Oct 21, 2023
1 parent ec8154f commit d430455
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 390 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
#ifndef IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
#define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP

#include "fixed_size_container.hpp"
#include "iceoryx_hoofs/internal/concurrent/periodic_task.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp"
#include "iceoryx_posh/roudi/introspection_types.hpp"
#include "iox/fixed_position_container.hpp"
#include "iox/function.hpp"

#include <atomic>
Expand Down Expand Up @@ -208,15 +208,15 @@ class PortIntrospection
void setNew(bool value) noexcept;

private:
using PublisherContainer = FixedSizeContainer<PublisherInfo, MAX_PUBLISHERS>;
using ConnectionContainer = FixedSizeContainer<ConnectionInfo, MAX_SUBSCRIBERS>;
using PublisherContainer = FixedPositionContainer<PublisherInfo, MAX_PUBLISHERS>;
using ConnectionContainer = FixedPositionContainer<ConnectionInfo, MAX_SUBSCRIBERS>;

/// @brief inner map maps from unique port IDs to indices in the PublisherContainer
std::map<capro::ServiceDescription, std::map<popo::UniquePortId, typename PublisherContainer::Index_t>>
std::map<capro::ServiceDescription, std::map<popo::UniquePortId, typename PublisherContainer::IndexType>>
m_publisherMap;

/// inner map maps from unique port IDs to indices in the ConnectionContainer
std::map<capro::ServiceDescription, std::map<popo::UniquePortId, typename ConnectionContainer::Index_t>>
std::map<capro::ServiceDescription, std::map<popo::UniquePortId, typename ConnectionContainer::IndexType>>
m_connectionMap;

/// @note we avoid allocating the objects on the heap but can still use a map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::updateCo

for (auto& pair : innerConnectionMap)
{
auto& connection = m_connectionContainer[pair.second];
connection.state = getNextState<iox::build::CommunicationPolicy>(connection.state, messageType);
auto connection = m_connectionContainer.iter_from_index(pair.second);
connection->state = getNextState<iox::build::CommunicationPolicy>(connection->state, messageType);
}

setNew(true);
Expand Down Expand Up @@ -224,8 +224,8 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::updateSu
return false;
}

auto& connection = m_connectionContainer[iterInnerMap->second];
connection.state = getNextState<iox::build::CommunicationPolicy>(connection.state, messageType);
auto connection = m_connectionContainer.iter_from_index(iterInnerMap->second);
connection->state = getNextState<iox::build::CommunicationPolicy>(connection->state, messageType);

setNew(true);
return true;
Expand All @@ -240,8 +240,8 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addPubli
auto service = port.m_serviceDescription;
auto uniqueId = port.m_uniqueId;

auto index = m_publisherContainer.add(PublisherInfo(port));
if (index < 0)
auto publisherInfo = m_publisherContainer.emplace(PublisherInfo(port));
if (publisherInfo == m_publisherContainer.end())
{
return false;
}
Expand All @@ -250,8 +250,8 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addPubli
if (iter == m_publisherMap.end())
{
// service is new, create new map
std::map<popo::UniquePortId, typename PublisherContainer::Index_t> innerPublisherMap;
innerPublisherMap.insert(std::make_pair(uniqueId, index));
std::map<popo::UniquePortId, typename PublisherContainer::IndexType> innerPublisherMap;
innerPublisherMap.insert(std::make_pair(uniqueId, publisherInfo.to_index()));
m_publisherMap.insert(std::make_pair(service, innerPublisherMap));
}
else
Expand All @@ -261,7 +261,7 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addPubli
auto iter = innerPublisherMap.find(uniqueId);
if (iter == innerPublisherMap.end())
{
innerPublisherMap.insert(std::make_pair(uniqueId, index));
innerPublisherMap.insert(std::make_pair(uniqueId, publisherInfo.to_index()));
}
else
{
Expand All @@ -270,7 +270,6 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addPubli
}

// connect publisher to all subscribers with the same Id
PublisherInfo* publisher = m_publisherContainer.get(index);

// find corresponding subscribers
auto connIter = m_connectionMap.find(service);
Expand All @@ -279,10 +278,10 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addPubli
auto& innerConnectionMap = connIter->second;
for (auto& pair : innerConnectionMap)
{
auto& connection = m_connectionContainer[pair.second];
if (service == connection.subscriberInfo.service)
auto connection = m_connectionContainer.iter_from_index(pair.second);
if (service == connection->subscriberInfo.service)
{
connection.publisherInfo = publisher;
connection->publisherInfo = publisherInfo.to_ptr();
}
}
}
Expand All @@ -300,8 +299,8 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addSubsc
auto service = portData.m_serviceDescription;
auto uniqueId = portData.m_uniqueId;

auto index = m_connectionContainer.add(ConnectionInfo(portData));
if (index < 0)
auto connection = m_connectionContainer.emplace(ConnectionInfo(portData));
if (connection == m_connectionContainer.end())
{
return false;
}
Expand All @@ -311,8 +310,8 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addSubsc
if (iter == m_connectionMap.end())
{
// service is new, create new map
std::map<popo::UniquePortId, typename ConnectionContainer::Index_t> innerConnectionMap;
innerConnectionMap.insert(std::make_pair(uniqueId, index));
std::map<popo::UniquePortId, typename ConnectionContainer::IndexType> innerConnectionMap;
innerConnectionMap.insert(std::make_pair(uniqueId, connection.to_index()));
m_connectionMap.insert(std::make_pair(service, innerConnectionMap));
}
else
Expand All @@ -322,24 +321,24 @@ inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addSubsc
auto iter = innerConnectionMap.find(uniqueId);
if (iter == innerConnectionMap.end())
{
innerConnectionMap.insert(std::make_pair(uniqueId, index));
innerConnectionMap.insert(std::make_pair(uniqueId, connection.to_index()));
}
else
{
return false;
}
}

auto& connection = m_connectionContainer[index];

auto sendIter = m_publisherMap.find(service);
if (sendIter != m_publisherMap.end())
{
auto& innerPublisherMap = sendIter->second;
for (auto& iter : innerPublisherMap)
{
auto publisher = m_publisherContainer.get(iter.second);
connection.publisherInfo = publisher; // set corresponding publisher if exists
auto publisher = m_publisherContainer.iter_from_index(iter.second);
connection->publisherInfo = (publisher != m_publisherContainer.end())
? publisher.to_ptr()
: nullptr; // set corresponding publisher if exists
}
}

Expand All @@ -365,17 +364,17 @@ PortIntrospection<PublisherPort, SubscriberPort>::PortData::removePublisher(cons
return false;
}
auto m_publisherIndex = iterInnerMap->second;
auto& publisher = m_publisherContainer[m_publisherIndex];
auto publisher = m_publisherContainer.iter_from_index(m_publisherIndex);

// disconnect publisher from all its subscribers
for (auto& pair : publisher.connectionMap)
for (auto& pair : publisher->connectionMap)
{
pair.second->publisherInfo = nullptr; // publisher is disconnected
pair.second->state = ConnectionState::DEFAULT; // connection state is now default
}

innerPublisherMap.erase(iterInnerMap);
m_publisherContainer.remove(m_publisherIndex);
m_publisherContainer.erase(publisher);
setNew(true); // indicates we have to send new data because
// something changed

Expand Down Expand Up @@ -404,8 +403,8 @@ PortIntrospection<PublisherPort, SubscriberPort>::PortData::removeSubscriber(con

// remove subscriber in corresponding publisher
auto connectionIndex = mapIter->second;
auto& connection = m_connectionContainer[connectionIndex];
auto& publisher = connection.publisherInfo;
auto connection = m_connectionContainer.iter_from_index(connectionIndex);
auto& publisher = connection->publisherInfo;

if (publisher)
{
Expand All @@ -417,7 +416,7 @@ PortIntrospection<PublisherPort, SubscriberPort>::PortData::removeSubscriber(con
}

innerConnectionMap.erase(mapIter);
m_connectionContainer.remove(connectionIndex);
m_connectionContainer.erase(connection);

setNew(true);
return true;
Expand Down Expand Up @@ -533,20 +532,20 @@ PortIntrospection<PublisherPort, SubscriberPort>::PortData::prepareTopic(PortInt
auto m_publisherIndex = pair.second;
if (m_publisherIndex >= 0)
{
auto& publisherInfo = m_publisherContainer[m_publisherIndex];
auto publisherInfo = m_publisherContainer.iter_from_index(m_publisherIndex);
PublisherPortData publisherData;
PublisherPort port(publisherInfo.portData);
PublisherPort port(publisherInfo->portData);
publisherData.m_publisherPortID = static_cast<uint64_t>(port.getUniqueID());
publisherData.m_sourceInterface = publisherInfo.service.getSourceInterface();
publisherData.m_name = publisherInfo.process;
publisherData.m_node = publisherInfo.node;
publisherData.m_sourceInterface = publisherInfo->service.getSourceInterface();
publisherData.m_name = publisherInfo->process;
publisherData.m_node = publisherInfo->node;

publisherData.m_caproInstanceID = publisherInfo.service.getInstanceIDString();
publisherData.m_caproServiceID = publisherInfo.service.getServiceIDString();
publisherData.m_caproEventMethodID = publisherInfo.service.getEventIDString();
publisherData.m_caproInstanceID = publisherInfo->service.getInstanceIDString();
publisherData.m_caproServiceID = publisherInfo->service.getServiceIDString();
publisherData.m_caproEventMethodID = publisherInfo->service.getEventIDString();

m_publisherList.emplace_back(publisherData);
publisherInfo.index = index++;
publisherInfo->index = index++;
}
}
}
Expand All @@ -559,9 +558,9 @@ PortIntrospection<PublisherPort, SubscriberPort>::PortData::prepareTopic(PortInt
auto connectionIndex = pair.second;
if (connectionIndex >= 0)
{
auto& connection = m_connectionContainer[connectionIndex];
auto connection = m_connectionContainer.iter_from_index(connectionIndex);
SubscriberPortData subscriberData;
auto& subscriberInfo = connection.subscriberInfo;
auto& subscriberInfo = connection->subscriberInfo;

subscriberData.m_name = subscriberInfo.process;
subscriberData.m_node = subscriberInfo.node;
Expand Down Expand Up @@ -597,8 +596,8 @@ inline void PortIntrospection<PublisherPort, SubscriberPort>::PortData::prepareT
auto connectionIndex = pair.second;
if (connectionIndex >= 0)
{
auto& connection = m_connectionContainer[connectionIndex];
auto& subscriberInfo = connection.subscriberInfo;
auto connection = m_connectionContainer.iter_from_index(connectionIndex);
auto& subscriberInfo = connection->subscriberInfo;
SubscriberPortChangingData subscriberData;
if (subscriberInfo.portData != nullptr)
{
Expand Down
Loading

0 comments on commit d430455

Please sign in to comment.