Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21121] IPayloadPool refactor #4892

Merged
merged 23 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2d39ab8
Refs #21121: Move paylowad_owner_ to SerializedPayload_t
cferreiragonz Jun 3, 2024
820d9af
Refs #21121: Make IPayloadPool manage SerializedPayload_t
cferreiragonz Jun 4, 2024
591c11e
Refs #21121: Update Tests with new API
cferreiragonz Jun 4, 2024
d398e41
Refs #21121: Clean headers
cferreiragonz Jun 4, 2024
01b40f6
Refs #21121: Doxygen
cferreiragonz Jun 4, 2024
cc3e91e
Refs #21121: Uncrustify
cferreiragonz Jun 4, 2024
b8eaa9a
Refs #21121: Remove unnecessary parameter
cferreiragonz Jun 4, 2024
709bcc4
Refs #21121: Move payload destructor
cferreiragonz Jun 4, 2024
cffd43c
Refs #21121: Avoid unsetting payload_owner
cferreiragonz Jun 5, 2024
f997218
Refs #21121: Fix LoanManager
cferreiragonz Jun 6, 2024
514b5d2
Refs #21121: Revision - remove getters/setter
cferreiragonz Jun 6, 2024
c3bd8ae
Refs #21121: Revision - avoid tmp change
cferreiragonz Jun 6, 2024
a20fee2
Refs #21121: Revision - remove PayloadInfo
cferreiragonz Jun 6, 2024
e5c905a
Refs #21121: Revision - uncrustify
cferreiragonz Jun 6, 2024
57b5f2f
Refs #21121: Revision - minor changes
cferreiragonz Jun 6, 2024
7b00e19
Refs #21121: Avoid cyclic release
cferreiragonz Jun 7, 2024
1725df2
Refs #21121: Fix Loan ser_payload destruction
cferreiragonz Jun 7, 2024
852ea53
Refs #21121: Avoid SerializedPayload_t copies
cferreiragonz Jun 10, 2024
4331af5
Refs #21121: Make get_payload's first arg const
cferreiragonz Jun 11, 2024
c7c210b
Refs #21121: Create SerializedPayload.cpp
cferreiragonz Jun 11, 2024
befd466
Refs #21121: Revision - minor changes 2
cferreiragonz Jun 11, 2024
bc4a145
Refs #21121: Fix old payload tests
cferreiragonz Jun 11, 2024
e383089
Refs #21121: Update versions.md & CMakeLists
cferreiragonz Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions examples/cpp/dds/CustomPayloadPoolExample/CustomPayloadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <string.h>

#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/SerializedPayload.h>

class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool
{
Expand All @@ -34,64 +34,63 @@ class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool

bool get_payload(
unsigned int size,
eprosima::fastrtps::rtps::CacheChange_t& cache_change)
eprosima::fastrtps::rtps::SerializedPayload_t& payload)
{
// Reserve new memory for the payload buffer
unsigned char* payload = new unsigned char[size];
unsigned char* payload_buff = new unsigned char[size];

// Assign the payload buffer to the CacheChange and update sizes
cache_change.serializedPayload.data = payload;
cache_change.serializedPayload.length = size;
cache_change.serializedPayload.max_size = size;
// Assign the payload buffer to the SerializedPayload and update sizes
payload.data = payload_buff;
payload.length = size;
payload.max_size = size;

// Tell the CacheChange who needs to release its payload
cache_change.payload_owner(this);
// Tell the SerializedPayload who needs to release its payload
payload.payload_owner = this;

return true;
}

bool get_payload(
eprosima::fastrtps::rtps::SerializedPayload_t& data,
eprosima::fastrtps::rtps::IPayloadPool*& /*data_owner*/,
eprosima::fastrtps::rtps::CacheChange_t& cache_change)
const eprosima::fastrtps::rtps::SerializedPayload_t& data,
eprosima::fastrtps::rtps::SerializedPayload_t& payload)
{
// Reserve new memory for the payload buffer
unsigned char* payload = new unsigned char[data.length];
unsigned char* payload_buff = new unsigned char[data.length];

// Copy the data
memcpy(payload, data.data, data.length);
memcpy(payload_buff, data.data, data.length);

// Tell the CacheChange who needs to release its payload
cache_change.payload_owner(this);
// Tell the SerializedPayload who needs to release its payload
payload.payload_owner = this;

// Assign the payload buffer to the CacheChange and update sizes
cache_change.serializedPayload.data = payload;
cache_change.serializedPayload.length = data.length;
cache_change.serializedPayload.max_size = data.length;
// Assign the payload buffer to the SerializedPayload and update sizes
payload.data = payload_buff;
payload.length = data.length;
payload.max_size = data.length;

return true;
}

bool release_payload(
eprosima::fastrtps::rtps::CacheChange_t& cache_change)
eprosima::fastrtps::rtps::SerializedPayload_t& payload)
{
// Ensure precondition
if (this != cache_change.payload_owner())
if (this != payload.payload_owner)
{
std::cerr << "Trying to release a payload buffer allocated by a different PayloadPool." << std::endl;
return false;
}

// Dealloc the buffer of the payload
delete[] cache_change.serializedPayload.data;
delete[] payload.data;

// Reset sizes and pointers
cache_change.serializedPayload.data = nullptr;
cache_change.serializedPayload.length = 0;
cache_change.serializedPayload.max_size = 0;
payload.data = nullptr;
payload.length = 0;
payload.max_size = 0;

// Reset the owner of the payload
cache_change.payload_owner(nullptr);
payload.payload_owner = nullptr;

return true;
}
Expand Down
30 changes: 3 additions & 27 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

struct CacheChange_t;

/*!
* Specific information for a writer.
*/
Expand Down Expand Up @@ -179,14 +181,7 @@ struct FASTDDS_EXPORTED_API CacheChange_t
setFragmentSize(ch_ptr->fragment_size_, false);
}

virtual ~CacheChange_t()
{
if (payload_owner_ != nullptr)
{
payload_owner_->release_payload(*this);
}
assert(payload_owner_ == nullptr);
}
virtual ~CacheChange_t() = default;

/*!
* Get the number of fragments this change is split into.
Expand Down Expand Up @@ -326,22 +321,6 @@ struct FASTDDS_EXPORTED_API CacheChange_t
return is_fully_assembled();
}

IPayloadPool const* payload_owner() const
{
return payload_owner_;
}

IPayloadPool* payload_owner()
{
return payload_owner_;
}

void payload_owner(
IPayloadPool* owner)
{
payload_owner_ = owner;
}

private:

// Fragment size
Expand All @@ -353,9 +332,6 @@ struct FASTDDS_EXPORTED_API CacheChange_t
// First fragment in missing list
uint32_t first_missing_fragment_ = 0;

// Pool that created the payload of this cache change
IPayloadPool* payload_owner_ = nullptr;

uint32_t get_next_missing_fragment(
uint32_t fragment_index)
{
Expand Down
114 changes: 35 additions & 79 deletions include/fastdds/rtps/common/SerializedPayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
#include <cstring>
#include <new>
#include <stdexcept>
#include <cassert>
#include <stdint.h>
#include <stdlib.h>

#include <fastdds/fastdds_dll.hpp>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/IPayloadPool.h>

/*!
* @brief Maximum payload is maximum of UDP packet size minus 536bytes (RTPSMESSAGE_COMMON_RTPS_PAYLOAD_SIZE)
Expand Down Expand Up @@ -68,6 +70,8 @@ struct FASTDDS_EXPORTED_API SerializedPayload_t
uint32_t max_size;
//!Position when reading
uint32_t pos;
//!Pool that created the payload
IPayloadPool* payload_owner = nullptr;

//!Default constructor
SerializedPayload_t()
Expand All @@ -89,107 +93,59 @@ struct FASTDDS_EXPORTED_API SerializedPayload_t
this->reserve(len);
}

~SerializedPayload_t()
//!Copy constructor
SerializedPayload_t(
const SerializedPayload_t& other) = delete;
//!Copy operator
SerializedPayload_t& operator = (
const SerializedPayload_t& other) = delete;

//!Move constructor
SerializedPayload_t(
SerializedPayload_t&& other) noexcept
{
this->empty();
*this = std::move(other);
}

//!Move operator
SerializedPayload_t& operator = (
SerializedPayload_t&& other) noexcept;

/*!
* Destructor
* It is expected to release the payload if the payload owner is not nullptr before destruction
*/
~SerializedPayload_t();

bool operator == (
const SerializedPayload_t& other) const
{
return ((encapsulation == other.encapsulation) &&
(length == other.length) &&
(0 == memcmp(data, other.data, length)));
}
const SerializedPayload_t& other) const;

/*!
* Copy another structure (including allocating new space for the data.)
* Copy another structure (including allocating new space for the data).
* @param[in] serData Pointer to the structure to copy
* @param with_limit if true, the function will fail when providing a payload too big
* @return True if correct
*/
bool copy(
const SerializedPayload_t* serData,
bool with_limit = true)
{
length = serData->length;

if (serData->length > max_size)
{
if (with_limit)
{
return false;
}
else
{
this->reserve(serData->length);
}
}
encapsulation = serData->encapsulation;
if (length == 0)
{
return true;
}
memcpy(data, serData->data, length);
return true;
}
bool with_limit = true);

/*!
* Allocate new space for fragmented data
* @param[in] serData Pointer to the structure to copy
* @return True if correct
*/
bool reserve_fragmented(
SerializedPayload_t* serData)
{
length = serData->length;
max_size = serData->length;
encapsulation = serData->encapsulation;
data = (octet*)calloc(length, sizeof(octet));
return true;
}
SerializedPayload_t* serData);

//! Empty the payload
void empty()
{
length = 0;
encapsulation = CDR_BE;
max_size = 0;
if (data != nullptr)
{
free(data);
}
data = nullptr;
}
/*!
* Empty the payload
* @pre payload_owner must be nullptr
*/
void empty();

void reserve(
uint32_t new_size)
{
if (new_size <= this->max_size)
{
return;
}
if (data == nullptr)
{
data = (octet*)calloc(new_size, sizeof(octet));
if (!data)
{
throw std::bad_alloc();
}
}
else
{
void* old_data = data;
data = (octet*)realloc(data, new_size);
if (!data)
{
free(old_data);
throw std::bad_alloc();
}
memset(data + max_size, 0, (new_size - max_size) * sizeof(octet));
}
max_size = new_size;
}
uint32_t new_size);

};

Expand Down
Loading
Loading