Skip to content

Commit

Permalink
Return a loaned_sample from dds_loan_shared_memory_buffer
Browse files Browse the repository at this point in the history
Update (and rename) dds_loan_shared_memory_buffer so that it
returns a dds_loaned_sample_t and not just a raw loan pointer
(that can't be used with dds_write because the buffer is not
registered in the loan administration). With this change, the
loaned sample (with custom size) can be used with the regular
write function; only difference with the regular request_loan
is that the user can specify the size in this case.

The request_raw_loan fn prototype is removed from the PSMX
endpoint ops; request_loan with the user-specified size
is used instead.

Signed-off-by: Dennis Potman <[email protected]>
  • Loading branch information
dpotman committed Sep 18, 2023
1 parent 030802a commit a043eb8
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 83 deletions.
11 changes: 0 additions & 11 deletions src/core/ddsc/include/dds/ddsc/dds_psmx.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,6 @@ typedef struct dds_psmx_topic_ops {
*/
typedef dds_loaned_sample_t * (* dds_psmx_endpoint_request_loan_fn) (struct dds_psmx_endpoint *psmx_endpoint, uint32_t size_requested);

/**
* @brief Definition for function to requests a raw loan (without an encapsulating loaned sample) from the PSMX
*
* @param[in] psmx_endpoint the endpoint to loan from
* @param[in] size_requested the size of the loan requested
* @param[out] buffer buffer to write the loan pointer in
* @returns a DDS return code
*/
typedef dds_return_t (* dds_psmx_endpoint_request_raw_loan_fn) (struct dds_psmx_endpoint *psmx_endpoint, uint32_t size_requested, void **buffer);

/**
* @brief Definition of function to write data on a PSMX endpoint
*
Expand Down Expand Up @@ -240,7 +230,6 @@ typedef dds_return_t (* dds_psmx_endpoint_on_data_available_fn) (struct dds_psmx
*/
typedef struct dds_psmx_endpoint_ops {
dds_psmx_endpoint_request_loan_fn request_loan;
dds_psmx_endpoint_request_raw_loan_fn request_raw_loan;
dds_psmx_endpoint_write_fn write;
dds_psmx_endpoint_take_fn take;
dds_psmx_endpoint_on_data_available_fn on_data_available;
Expand Down
19 changes: 10 additions & 9 deletions src/core/ddsc/include/dds/ddsc/dds_public_loan_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ DDS_EXPORT dds_return_t dds_return_loan (dds_entity_t entity, void **buf, int32_
* @component read_data
* @brief Check if a shared memory is available to reader/writer.
*
* @note dds_loan_shared_memory_buffer can be used if and only if
* @note dds_request_loan_of_size can be used if and only if
* dds_is_shared_memory_available returns true.
*
* @param[in] entity the handle of the entity
Expand All @@ -113,22 +113,23 @@ DDS_EXPORT dds_return_t dds_return_loan (dds_entity_t entity, void **buf, int32_
DDS_EXPORT bool dds_is_shared_memory_available (const dds_entity_t entity);

/**
* @brief Request a loan of a specified size from an entity.
* @ingroup loan
* @component read_data
* @brief Loan a shared memory buffer of a specific size from the writer.
*
* @note Currently this function is to be used with dds_writecdr by adding the
* loaned buffer in the serdata as loan->sample_ptr.
* Borrow a sample of a specified size from the entity, which currently must be a
* writer. This sample can then be returned using @ref `dds_return_loan` or can be
* used to publish data using @ref `dds_write` or @ref `dds_writedispose`.
*
* @note The function can only be used if dds_is_shared_memory_available is
* true for the writer.
*
* @param[in] writer the writer to loan the buffer from
* @param[in] size the requested buffer size
* @param[out] buffer the loaned buffer
* @param[in] writer The entity to request loans from.
* @param[in] size the requested loan size
* @param[out] sample Where to store the address of the loaned sample.
*
* @returns DDS_RETCODE_OK if successful, DDS_RETCODE_ERROR otherwise
*/
DDS_EXPORT dds_return_t dds_loan_shared_memory_buffer (dds_entity_t writer, size_t size, void **buffer);
DDS_EXPORT dds_return_t dds_request_loan_of_size (dds_entity_t writer, size_t size, void **sample);

/**
* @ingroup deprecated
Expand Down
7 changes: 6 additions & 1 deletion src/core/ddsc/src/dds__writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ extern "C" {

DEFINE_ENTITY_LOCK_UNLOCK(dds_writer, DDS_KIND_WRITER, writer)

enum dds_writer_loan_type {
DDS_WRITER_LOAN_REGULAR,
DDS_WRITER_LOAN_RAW
};

struct ddsi_status_cb_data;

/** @component writer */
Expand All @@ -34,7 +39,7 @@ dds_return_t dds_return_writer_loan (dds_writer *wr, void **samples_ptr, int32_t
dds_return_t dds__ddsi_writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout);

/** @component writer */
dds_return_t dds_request_writer_loan (dds_writer *wr, void **sample)
dds_return_t dds_request_writer_loan (dds_writer *wr, enum dds_writer_loan_type loan_type, uint32_t sz, void **sample)
ddsrt_nonnull_all;

#if defined (__cplusplus)
Expand Down
2 changes: 1 addition & 1 deletion src/core/ddsc/src/dds_entity.c
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ dds_return_t dds_request_loan (dds_entity_t entity, void **sample)
{
case DDS_KIND_WRITER: {
dds_writer *wr = (dds_writer *) p_entity;
ret = dds_request_writer_loan (wr, sample);
ret = dds_request_writer_loan (wr, DDS_WRITER_LOAN_REGULAR, 0, sample);
break;
}
case DDS_KIND_DONTCARE:
Expand Down
23 changes: 5 additions & 18 deletions src/core/ddsc/src/dds_psmx.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "dds__psmx.h"
#include "dds__qos.h"
#include "dds__entity.h"
#include "dds__writer.h"

static struct dds_psmx_endpoint * psmx_create_endpoint (struct dds_psmx_topic *psmx_topic, const struct dds_qos *qos, dds_psmx_endpoint_type_t endpoint_type);
static dds_return_t psmx_delete_endpoint (struct dds_psmx_endpoint *psmx_endpoint);
Expand Down Expand Up @@ -464,33 +465,19 @@ bool dds_is_loan_available (const dds_entity_t entity)
return is_loan_available;
}

dds_return_t dds_loan_shared_memory_buffer (dds_entity_t writer, size_t size, void **buffer)
dds_return_t dds_request_loan_of_size (dds_entity_t writer, size_t size, void **sample)
{
dds_entity *e;
dds_return_t ret = DDS_RETCODE_OK;

if (dds_entity_pin (writer, &e) != DDS_RETCODE_OK)
return false;

if (dds_entity_kind (e) != DDS_KIND_WRITER)
{
ret = DDS_RETCODE_BAD_PARAMETER;
}
if (dds_entity_kind (e) == DDS_KIND_WRITER)
ret = dds_request_writer_loan ((struct dds_writer *) e, DDS_WRITER_LOAN_RAW, (uint32_t) size, sample);
else
{
struct dds_writer const *const wr = (struct dds_writer *) e;
*buffer = NULL;

// TODO: implement correct behavior in case of multiple PSMX endpoints
for (uint32_t i = 0; *buffer != NULL && i < wr->m_endpoint.psmx_endpoints.length; i++)
{
struct dds_psmx_endpoint *psmx_endpoint = wr->m_endpoint.psmx_endpoints.endpoints[i];
if (psmx_endpoint == NULL || !(dds_psmx_supported_features (psmx_endpoint->psmx_topic->psmx_instance) & DDS_PSMX_FEATURE_SHARED_MEMORY))
continue;
psmx_endpoint->ops.request_raw_loan (psmx_endpoint, (uint32_t) size, buffer);
}
ret = DDS_RETCODE_BAD_PARAMETER;

}
dds_entity_unpin (e);
return ret;
}
32 changes: 23 additions & 9 deletions src/core/ddsc/src/dds_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,9 @@ static dds_return_t dds_write_basic_impl (struct ddsi_thread_state * const ts, d
return ret;
}

dds_return_t dds_request_writer_loan (dds_writer *wr, void **sample)
dds_return_t dds_request_writer_loan (dds_writer *wr, enum dds_writer_loan_type loan_type, uint32_t sz, void **sample)
{
dds_return_t ret = DDS_RETCODE_OK;
dds_return_t ret = DDS_RETCODE_ERROR;

ddsrt_mutex_lock (&wr->m_entity.m_mutex);
// We don't bother the PSMX interface with types that contain pointers, but we do
Expand All @@ -501,17 +501,31 @@ dds_return_t dds_request_writer_loan (dds_writer *wr, void **sample)
// FIXME: allow multiple psmx instances
assert (wr->m_endpoint.psmx_endpoints.length <= 1);

dds_loaned_sample_t *loan;
if (wr->m_endpoint.psmx_endpoints.length > 0 && wr->m_topic->m_stype->is_memcpy_safe)
dds_loaned_sample_t *loan = NULL;
switch (loan_type)
{
if ((loan = dds_psmx_endpoint_request_loan (wr->m_endpoint.psmx_endpoints.endpoints[0], wr->m_topic->m_stype->sizeof_type)) == NULL)
ret = DDS_RETCODE_ERROR;
case DDS_WRITER_LOAN_RAW:
if (wr->m_endpoint.psmx_endpoints.length > 0)
{
if ((loan = dds_psmx_endpoint_request_loan (wr->m_endpoint.psmx_endpoints.endpoints[0], sz)) != NULL)
ret = DDS_RETCODE_OK;
}
break;

case DDS_WRITER_LOAN_REGULAR:
if (wr->m_endpoint.psmx_endpoints.length > 0 && wr->m_topic->m_stype->is_memcpy_safe)
{
if ((loan = dds_psmx_endpoint_request_loan (wr->m_endpoint.psmx_endpoints.endpoints[0], wr->m_topic->m_stype->sizeof_type)) != NULL)
ret = DDS_RETCODE_OK;
}
else
ret = dds_heap_loan (wr->m_topic->m_stype, DDS_LOANED_SAMPLE_STATE_RAW_DATA, &loan);
break;
}
else if ((ret = dds_heap_loan (wr->m_topic->m_stype, DDS_LOANED_SAMPLE_STATE_RAW_DATA, &loan)) != DDS_RETCODE_OK)
loan = NULL;

if (loan != NULL)
if (ret == DDS_RETCODE_OK)
{
assert (loan != NULL);
if ((ret = dds_loan_pool_add_loan (wr->m_loans, loan)) != DDS_RETCODE_OK)
dds_loaned_sample_unref (loan);
else
Expand Down
11 changes: 0 additions & 11 deletions src/core/ddsc/tests/psmx_cdds_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,12 @@ static const dds_psmx_topic_ops_t psmx_topic_ops = {
};

static dds_loaned_sample_t * cdds_psmx_ep_request_loan (struct dds_psmx_endpoint *psmx_endpoint, uint32_t size_requested);
static dds_return_t cdds_psmx_ep_request_raw_loan (struct dds_psmx_endpoint *psmx_ep, uint32_t size_requested, void **buffer);
static dds_return_t cdds_psmx_ep_write (struct dds_psmx_endpoint *psmx_endpoint, dds_loaned_sample_t *data);
static dds_loaned_sample_t * cdds_psmx_ep_take (struct dds_psmx_endpoint *psmx_endpoint);
static dds_return_t cdds_psmx_ep_on_data_available (struct dds_psmx_endpoint *psmx_endpoint, dds_entity_t reader);

static const dds_psmx_endpoint_ops_t psmx_ep_ops = {
.request_loan = cdds_psmx_ep_request_loan,
.request_raw_loan = cdds_psmx_ep_request_raw_loan,
.write = cdds_psmx_ep_write,
.take = cdds_psmx_ep_take,
.on_data_available = cdds_psmx_ep_on_data_available
Expand Down Expand Up @@ -310,15 +308,6 @@ static dds_loaned_sample_t * cdds_psmx_ep_request_loan (struct dds_psmx_endpoint
return ls;
}

static dds_return_t cdds_psmx_ep_request_raw_loan (struct dds_psmx_endpoint *psmx_ep, uint32_t size_requested, void **buffer)
{
(void) psmx_ep;
dds_return_t ret = DDS_RETCODE_OK;
if ((*buffer = dds_alloc (size_requested)) == NULL)
ret = DDS_RETCODE_OUT_OF_RESOURCES;
return ret;
}

static dds_return_t cdds_psmx_ep_write (struct dds_psmx_endpoint *psmx_ep, dds_loaned_sample_t *data)
{
struct cdds_psmx_endpoint *cep = (struct cdds_psmx_endpoint *) psmx_ep;
Expand Down
2 changes: 1 addition & 1 deletion src/core/xtests/symbol_export/symbol_export.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ int main (int argc, char **argv)
dds_request_loan (1, ptr);
dds_return_loan (1, ptr, 0);
dds_is_shared_memory_available (1);
dds_loan_shared_memory_buffer (1, 0, ptr);
dds_request_loan_of_size (1, 0, ptr);
dds_is_loan_available (1); // deprecated
dds_loan_sample (1, ptr); // deprecated

Expand Down
22 changes: 0 additions & 22 deletions src/psmx_iox/src/psmx_iox_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,12 @@ static const dds_psmx_topic_ops_t psmx_topic_ops = {


static dds_loaned_sample_t * iox_req_loan(struct dds_psmx_endpoint *psmx_endpoint, uint32_t size_requested);
static dds_return_t iox_req_raw_loan(struct dds_psmx_endpoint *psmx_endpoint, uint32_t size_requested, void **buffer);
static dds_return_t iox_write(struct dds_psmx_endpoint * psmx_endpoint, dds_loaned_sample_t * data);
static dds_loaned_sample_t * iox_take(struct dds_psmx_endpoint * psmx_endpoint);
static dds_return_t iox_on_data_available(struct dds_psmx_endpoint * psmx_endpoint, dds_entity_t reader);

static const dds_psmx_endpoint_ops_t psmx_ep_ops = {
.request_loan = iox_req_loan,
.request_raw_loan = iox_req_raw_loan,
.write = iox_write,
.take = iox_take,
.on_data_available = iox_on_data_available
Expand Down Expand Up @@ -525,26 +523,6 @@ static dds_loaned_sample_t * iox_req_loan(struct dds_psmx_endpoint *psmx_endpoin
return loaned_sample;
}

static dds_return_t iox_req_raw_loan(struct dds_psmx_endpoint *psmx_endpoint, uint32_t size_requested, void **buffer)
{
auto cpp_ep_ptr = static_cast<iox_psmx_endpoint *>(psmx_endpoint);
if (psmx_endpoint->endpoint_type != DDS_PSMX_ENDPOINT_TYPE_WRITER)
return DDS_RETCODE_BAD_PARAMETER;
else
{
dds_return_t ret = DDS_RETCODE_OK;
const std::lock_guard<std::mutex> lock(cpp_ep_ptr->lock);
auto publisher = static_cast<iox::popo::UntypedPublisher *>(cpp_ep_ptr->_iox_endpoint);
publisher->loan(size_requested, iox::CHUNK_DEFAULT_USER_PAYLOAD_ALIGNMENT, sizeof(dds_psmx_metadata_t), alignof(dds_psmx_metadata_t))
.and_then([buffer](void * iox_payload) { *buffer = iox_payload; })
.or_else([&ret](auto& error) {
std::cerr << ERROR_PREFIX "failure getting loan" << iox::popo::asStringLiteral(error) << std::endl;
ret = DDS_RETCODE_ERROR;
});
return ret;
}
}

static dds_return_t iox_write(struct dds_psmx_endpoint * psmx_endpoint, dds_loaned_sample_t * data)
{
assert(psmx_endpoint->endpoint_type == DDS_PSMX_ENDPOINT_TYPE_WRITER);
Expand Down

0 comments on commit a043eb8

Please sign in to comment.