diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 645ffc1e6d..4745acd6c5 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -67,23 +67,23 @@ jobs: cc: gcc-10 testing: off idlc_xtests: off - 'Ubuntu 22.04 LTS with Clang 12 (Debug, x86_64)': + 'Ubuntu 22.04 LTS with Clang 13 (Debug, x86_64)': image: ubuntu-22.04 analyzer: on sanitizer: address,undefined - cc: clang-12 - 'Ubuntu 22.04 LTS with Clang 12 (Debug, x86_64, no security)': + cc: clang-13 + 'Ubuntu 22.04 LTS with Clang 13 (Debug, x86_64, no security)': image: ubuntu-22.04 sanitizer: address,undefined security: off - cc: clang-12 - 'Ubuntu 22.04 LTS with Clang 12 (Release, x86_64, no topic discovery)': + cc: clang-13 + 'Ubuntu 22.04 LTS with Clang 13 (Release, x86_64, no topic discovery)': image: ubuntu-22.04 build_type: Release sanitizer: undefined topic_discovery: off idlc_xtests: off # temporary disabled because of passing -t option to idlc in this test for recursive types - cc: clang-12 + cc: clang-13 'macOS 11 with Clang 12 (Debug, x86_64)': image: macOS-11 sanitizer: address,undefined @@ -174,9 +174,9 @@ jobs: vmImage: $(image) strategy: matrix: - 'Test and fuzzing (Ubuntu 22.04 LTS, Clang 12, Debug)': + 'Test and fuzzing (Ubuntu 22.04 LTS, Clang 13, Debug)': image: ubuntu-22.04 - cc: clang-12 + cc: clang-13 build_type: Debug steps: - template: /.azure/templates/python-binding.yml @@ -187,9 +187,9 @@ jobs: vmImage: $(image) strategy: matrix: - 'Build and test (Ubuntu 22.04 LTS, Clang 12, Debug)': + 'Build and test (Ubuntu 22.04 LTS, Clang 13, Debug)': image: ubuntu-22.04 - cc: clang-12 + cc: clang-13 build_type: Debug steps: - template: /.azure/templates/cxx-binding.yml diff --git a/src/core/ddsc/include/dds/dds.h b/src/core/ddsc/include/dds/dds.h index f2791f3246..53bc66ac8d 100644 --- a/src/core/ddsc/include/dds/dds.h +++ b/src/core/ddsc/include/dds/dds.h @@ -3075,6 +3075,237 @@ dds_waitset_wait_until( * @ingroup reader */ +/** + * @brief Read data from the data reader, read or query condition without updating state + * @ingroup reading + * @component read_data + * + * Reads samples from the reader history cache without marking these samples as "read". It starts + * with an arbitrary (matching) instance, reading (matching) samples from the oldest to + * the most recent, then continues with another arbitrarily selected (matching) instance, + * etc. This continues until it has traversed the entire history cache or has gathered + * `maxs` samples. + * + * The @ref `dds_read` operation can be used to mark the returned samples as "read"; the + * @ref `dds_take` operation can be used to also remove the returned samples from the + * history cache. + * + * For the plain `dds_peek` operation, all instances and samples match. This is different + * for the more selective variants, where the documentation refers to this function and + * only gives detailed information where it differs. + * + * The `buf` parameter is used as follows: + * - If `buf[0]` on entry is a null pointer: + * - on return `buf[0]` .. `buf[k-1]` will point to middleware-owned memory (a.k.a. loans); and + * - `buf[k]` will be a null pointer if `0 <= k < bufsz-1`; and + * - `0 <= k <= maxs` is the number of samples read (a.k.a. the return value). + * + * - If `buf[0]` on entry is an outstanding sample loan (i.e., resulting from a previous call to, e.g., read), then: + * - all of `buf[0]` .. `buf[k-1]` must be pointers to outstanding loans; and + * - `k` = `bufsz` or `buf[k]` is a null pointer; where + * - `1 <= k < bufsz`; and + * - all these outstanding loans are returned as-if through @ref `dds_return_loan`; and + * - the result will be as if `buf[0]` had been a null pointer on entry. + * + * - If `buf[0]` on entry is any other address, then: + * - all of `buf[0]` .. `buf[bufsz-1]` must point to memory suitable for storing samples; and + * - the C binding requires that this memory must be initialized such that all embedded strings, externals, + * optionals and sequences are initialized (null pointers are ok, sequences may also be all-0) + * + * The loans returned by `dds_peek` operation are potentially shared copies of the data and the contents + * may not be modified. If a private copy is required, pass in non-null pointers to memory as in the third case + * above. + * + * The `si` array is filled with sample information on all returned samples. If the + * `valid_data` flag is set in the sample info for a particular sample, all fields of that + * sample are valid. Otherwise, only the key value is valid. For the C binding, all other + * fields will be set to 0. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[in,out] buf An array of `bufsz` pointers to samples (see above). + * @param[out] si Pointer to an array of @ref dds_sample_info_t returned for each data value. + * @param[in] bufsz The size of buffer provided. + * @param[in] maxs Maximum number of samples to read. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + */ +DDS_EXPORT dds_return_t +dds_peek( + dds_entity_t reader_or_condition, + void **buf, + dds_sample_info_t *si, + size_t bufsz, + uint32_t maxs); + +/** + * @brief Read data matching sample/view/instance states from the data reader, read or query condition without updating state + * @ingroup reading + * @component read_data + * + * See @ref `dds_peek`. The matching criterion referred to there is that the + * sample/view/instance states must match the specification in the `mask` parameter. + * + * If the sample/view/instance state component in the mask is 0 and `reader_or_condition` + * references a data reader (as opposed to a read or query condition), it is treated as + * equivalent to any sample/view/instance state. If `reader_or_condition` references a + * read or query condition, the matching states are the union of `mask` and the + * condition's mask. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[in,out] buf An array of `bufsz` pointers to samples. + * @param[out] si Pointer to an array of @ref dds_sample_info_t returned for each data value. + * @param[in] bufsz The size of buffer provided. + * @param[in] maxs Maximum number of samples to read. + * @param[in] mask Filter the data based on dds_sample_state_t|dds_view_state_t|dds_instance_state_t. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + */ +DDS_EXPORT dds_return_t +dds_peek_mask( + dds_entity_t reader_or_condition, + void **buf, + dds_sample_info_t *si, + size_t bufsz, + uint32_t maxs, + uint32_t mask); + +/** + * @brief Read data for a specific instance from the data reader, read or query condition without updating state + * @ingroup reading + * @component read_data + * + * See @ref `dds_peek`. The matching criterion referred to there is that the instance + * handle must equal the `handle` parameter. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[in,out] buf An array of `bufsz` pointers to samples. + * @param[out] si Pointer to an array of @ref dds_sample_info_t returned for each data value. + * @param[in] bufsz The size of buffer provided. + * @param[in] maxs Maximum number of samples to read. + * @param[in] handle Instance handle related to the samples to read. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * The instance handle has not been registered with this reader. + */ +DDS_EXPORT dds_return_t +dds_peek_instance( + dds_entity_t reader_or_condition, + void **buf, + dds_sample_info_t *si, + size_t bufsz, + uint32_t maxs, + dds_instance_handle_t handle); + +/** + * @brief Read data for a specific instance matching sample/view/instance states from the data reader, read or query condition without updating state + * @ingroup reading + * @component read_data + * + * See @ref `dds_peek`. The matching criterion referred to there is that: + * - the instance handle must equal the `handle` parameter; and + * - the sample/view/instance states must match the specification in the `mask` parameter. + * + * If the sample/view/instance state component in the mask is 0 and `reader_or_condition` + * references a data reader (as opposed to a read or query condition), it is treated as + * equivalent to any sample/view/instance state. If `reader_or_condition` references a + * read or query condition, the matching states are the union of `mask` and the + * condition's mask. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[in,out] buf An array of `bufsz` pointers to samples. + * @param[out] si Pointer to an array of @ref dds_sample_info_t returned for each data value. + * @param[in] bufsz The size of buffer provided. + * @param[in] maxs Maximum number of samples to read. + * @param[in] handle Instance handle related to the samples to read. + * @param[in] mask Filter the data based on dds_sample_state_t|dds_view_state_t|dds_instance_state_t. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * The instance handle has not been registered with this reader. + */ +DDS_EXPORT dds_return_t +dds_peek_instance_mask( + dds_entity_t reader_or_condition, + void **buf, + dds_sample_info_t *si, + size_t bufsz, + uint32_t maxs, + dds_instance_handle_t handle, + uint32_t mask); + +/** + * @brief Read the first unread sample without updating state + * @ingroup reading + * @component read_data + * + * Equivalent to `dds_peek_mask(reader, buf, si, 1, 1, DDS_NOT_READ_SAMPLE_STATE)`. + * + * @param[in] reader The reader entity. + * @param[in,out] buf A pointer to a sample. + * @param[out] si The pointer to @ref dds_sample_info_t returned for a data value. + * + * @returns A dds_return_t indicating success or failure. + * + * @retval DDS_RETCODE_OK + * The operation was successful. + * @retval DDS_RETCODE_BAD_PARAMETER + * The entity parameter is not a valid parameter. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + */ +DDS_EXPORT dds_return_t +dds_peek_next( + dds_entity_t reader, + void **buf, + dds_sample_info_t *si); + /** * @brief Read data from the data reader, read or query condition * @ingroup reading @@ -3086,7 +3317,8 @@ dds_waitset_wait_until( * etc. This continues until it has traversed the entire history cache or has gathered * `maxs` samples. * - * The @ref `dds_take` operation can be used to also remove the returned samples from the + * The @ref `dds_peek` operation can be read samples without marking them as "read"; the + * @ref `dds_take` operation can be used to also remove the returned samples from the * history cache. * * For the plain `dds_read` operation, all instances and samples match. This is different @@ -3485,7 +3717,8 @@ dds_read_next_wl( * samples. * * The @ref `dds_read` operation can be used to read samples without removing them from - * the history cache. + * the history cache but marking them as "read"; the @ref `dds_peek` operation can be used + * to read samples from the cache without changing any internal state. * * For the plain `dds_take` operation, all instances and samples match. This is different * for the more selective variants, where the documentation refers to this function and @@ -3905,6 +4138,46 @@ typedef dds_return_t (*dds_read_with_collector_fn_t) ( const struct ddsi_sertype *st, struct ddsi_serdata *sd); +/** + * @brief Read samples while collecting result in an application-defined way without updating state + * @ingroup reading + * @component read_data + * + * When using a readcondition or querycondition, their masks are or'd with the given mask. + * + * If the sample/view/instance state component in the mask is 0 and there is no read or query condition, + * to combine it with, it is treated as equivalent to any sample/view/instance state. + * + * Collected samples are not marked as read. + * + * @param[in] reader_or_condition Handle of a reader or a read/query condition + * @param[in] maxs Maximum number of samples (1 .. INT32_MAX) + * @param[in] handle Instance handle or 0 if not reading a specific instance + * @param[in] mask Sample/view/instance state mask + * @param[in] collect_sample Function be called for each sample in the result + * @param[in] collect_sample_arg Arbitrary argument passed to "collect_sample" + * @return The number of returned samples or an error code + * @retval > 0 number of samples passed successfully collected by collect_sample + * @retval 0 success, no matching data + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + * @retval < 0 Return value of failing collect_sample on first invocation + */ +DDS_EXPORT dds_return_t +dds_peek_with_collector ( + dds_entity_t reader_or_condition, + uint32_t maxs, + dds_instance_handle_t handle, + uint32_t mask, + dds_read_with_collector_fn_t collect_sample, + void *collect_sample_arg); + /** * @brief Read samples while collecting result in an application-defined way * @ingroup reading @@ -3915,6 +4188,8 @@ typedef dds_return_t (*dds_read_with_collector_fn_t) ( * If the sample/view/instance state component in the mask is 0 and there is no read or query condition, * to combine it with, it is treated as equivalent to any sample/view/instance state. * + * Collected samples are marked as read. + * * @param[in] reader_or_condition Handle of a reader or a read/query condition * @param[in] maxs Maximum number of samples (1 .. INT32_MAX) * @param[in] handle Instance handle or 0 if not reading a specific instance @@ -3953,6 +4228,8 @@ dds_read_with_collector ( * If the sample/view/instance state component in the mask is 0 and there is no read or query condition, * to combine it with, it is treated as equivalent to any sample/view/instance state. * + * Collected samples are removed from the history cache. + * * @param[in] reader_or_condition Handle of a reader or a read/query condition * @param[in] maxs Maximum number of samples (1 .. INT32_MAX) * @param[in] handle Instance handle or 0 if not taking from a specific instance @@ -3988,6 +4265,115 @@ dds_take_with_collector ( */ #define DDS_HAS_READCDR 1 +/** + * @brief Access the collection of serialized data values (of same type) and + * sample info from the data reader, readcondition or querycondition + * without updating state. + * @ingroup reading + * @component read_data + * + * This call accesses the serialized data from the data reader, readcondition or + * querycondition and makes it available to the application. The serialized data + * is made available through @ref ddsi_serdata structures. Returned samples are + * marked not marked as READ. + * + * When using a readcondition or querycondition, their masks are or'd with the given mask. + * + * If the sample/view/instance state component in the mask is 0 and there is no read or query condition, + * to combine it with, it is treated as equivalent to any sample/view/instance state. + * + * Return value provides information about the number of samples read, which will + * be <= maxs. Based on the count, the buffer will contain serialized data to be + * read only when valid_data bit in sample info structure is set. + * The buffer required for data values, could be allocated explicitly or can + * use the memory from data reader to prevent copy. In the latter case, buffer and + * sample_info should be returned back, once it is no longer using the data. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[out] buf An array of pointers to @ref ddsi_serdata structures that contain + * the serialized data. The pointers can be NULL. + * @param[in] maxs Maximum number of samples to read. + * @param[out] si Pointer to an array of @ref dds_sample_info_t returned for each data value. + * @param[in] mask Filter the data based on dds_sample_state_t|dds_view_state_t|dds_instance_state_t. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * The precondition for this operation is not met. + */ +DDS_EXPORT dds_return_t +dds_peekcdr( + dds_entity_t reader_or_condition, + struct ddsi_serdata **buf, + uint32_t maxs, + dds_sample_info_t *si, + uint32_t mask); + +/** + * @brief Access the collection of serialized data values (of same type) and + * sample info from the data reader, readcondition or querycondition + * scoped by the provided instance handle without updating state. + * @ingroup reading + * @component read_data + * + * This operation implements the same functionality as dds_read_instance_wl, except that + * samples are now in their serialized form. The serialized data is made available through + * @ref ddsi_serdata structures. Returned samples are not marked as READ. + * + * When using a readcondition or querycondition, their masks are or'd with the given mask. + * + * If the sample/view/instance state component in the mask is 0 and there is no read or query condition, + * to combine it with, it is treated as equivalent to any sample/view/instance state. + * + * Return value provides information about the number of samples read, which will + * be <= maxs. Based on the count, the buffer will contain serialized data to be + * read only when valid_data bit in sample info structure is set. + * The buffer required for data values, could be allocated explicitly or can + * use the memory from data reader to prevent copy. In the latter case, buffer and + * sample_info should be returned back, once it is no longer using the data. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[out] buf An array of pointers to @ref ddsi_serdata structures that contain + * the serialized data. The pointers can be NULL. + * @param[in] maxs Maximum number of samples to read. + * @param[out] si Pointer to an array of @ref dds_sample_info_t returned for each data value. + * @param[in] handle Instance handle related to the samples to read. + * @param[in] mask Filter the data based on dds_sample_state_t|dds_view_state_t|dds_instance_state_t. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * The instance handle has not been registered with this reader. + */ +DDS_EXPORT dds_return_t +dds_peekcdr_instance ( + dds_entity_t reader_or_condition, + struct ddsi_serdata **buf, + uint32_t maxs, + dds_sample_info_t *si, + dds_instance_handle_t handle, + uint32_t mask); + /** * @brief Access the collection of serialized data values (of same type) and * sample info from the data reader, readcondition or querycondition. diff --git a/src/core/ddsc/include/dds/ddsc/dds_rhc.h b/src/core/ddsc/include/dds/ddsc/dds_rhc.h index 366040c9aa..a59710acaa 100644 --- a/src/core/ddsc/include/dds/ddsc/dds_rhc.h +++ b/src/core/ddsc/include/dds/ddsc/dds_rhc.h @@ -39,6 +39,7 @@ struct dds_rhc_ops { /* A copy of DDSI rhc ops comes first so we can use either interface without additional indirections */ struct ddsi_rhc_ops rhc_ops; + dds_rhc_read_take_t peek; dds_rhc_read_take_t read; dds_rhc_read_take_t take; dds_rhc_add_readcondition_t add_readcondition; @@ -86,6 +87,11 @@ DDS_INLINE_EXPORT inline void dds_rhc_free (struct dds_rhc *rhc) { rhc->common.ops->rhc_ops.free (&rhc->common.rhc); } +/** @component rhc */ +DDS_INLINE_EXPORT inline int32_t dds_rhc_peek (struct dds_rhc *rhc, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { + return (rhc->common.ops->peek) (rhc, max_samples, mask, handle, cond, collect_sample, collect_sample_arg); +} + /** @component rhc */ DDS_INLINE_EXPORT inline int32_t dds_rhc_read (struct dds_rhc *rhc, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { return (rhc->common.ops->read) (rhc, max_samples, mask, handle, cond, collect_sample, collect_sample_arg); diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index 2b0596972d..c607850293 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -143,7 +143,13 @@ static dds_return_t dds_read_impl_setup (dds_entity_t reader_or_condition, bool return DDS_RETCODE_OK; } -static dds_return_t dds_read_impl_common (bool take, struct dds_reader *rd, struct dds_readcond *cond, uint32_t maxs, uint32_t mask, dds_instance_handle_t hand, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) +enum dds_read_impl_common_oper { + READ_OPER_PEEK, + READ_OPER_READ, + READ_OPER_TAKE +}; + +static dds_return_t dds_read_impl_common (enum dds_read_impl_common_oper oper, struct dds_reader *rd, struct dds_readcond *cond, uint32_t maxs, uint32_t mask, dds_instance_handle_t hand, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { /* read/take resets data available status -- must reset before reading because the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */ @@ -152,16 +158,24 @@ static dds_return_t dds_read_impl_common (bool take, struct dds_reader *rd, stru if (sm_old & (DDS_DATA_ON_READERS_STATUS << SAM_ENABLED_SHIFT)) dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); - dds_return_t ret; + dds_return_t ret = DDS_RETCODE_ERROR; assert (maxs <= INT32_MAX); - if (take) - ret = dds_rhc_take (rd->m_rhc, (int32_t) maxs, mask, hand, cond, collect_sample, collect_sample_arg); - else - ret = dds_rhc_read (rd->m_rhc, (int32_t) maxs, mask, hand, cond, collect_sample, collect_sample_arg); + switch (oper) + { + case READ_OPER_PEEK: + ret = dds_rhc_peek (rd->m_rhc, (int32_t) maxs, mask, hand, cond, collect_sample, collect_sample_arg); + break; + case READ_OPER_READ: + ret = dds_rhc_read (rd->m_rhc, (int32_t) maxs, mask, hand, cond, collect_sample, collect_sample_arg); + break; + case READ_OPER_TAKE: + ret = dds_rhc_take (rd->m_rhc, (int32_t) maxs, mask, hand, cond, collect_sample, collect_sample_arg); + break; + } return ret; } -static dds_return_t dds_read_with_collector_impl (bool take, dds_entity_t reader_or_condition, uint32_t maxs, uint32_t mask, dds_instance_handle_t hand, bool only_reader, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) +static dds_return_t dds_read_with_collector_impl (enum dds_read_impl_common_oper oper, dds_entity_t reader_or_condition, uint32_t maxs, uint32_t mask, dds_instance_handle_t hand, bool only_reader, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { dds_return_t ret; struct dds_entity *entity; @@ -176,27 +190,27 @@ static dds_return_t dds_read_with_collector_impl (bool take, dds_entity_t reader struct ddsi_thread_state * const thrst = ddsi_lookup_thread_state (); ddsi_thread_state_awake (thrst, &entity->m_domain->gv); - ret = dds_read_impl_common (take, rd, cond, maxs, mask, hand, collect_sample, collect_sample_arg); + ret = dds_read_impl_common (oper, rd, cond, maxs, mask, hand, collect_sample, collect_sample_arg); ddsi_thread_state_asleep (thrst); dds_entity_unpin (entity); return ret; } -static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask, dds_instance_handle_t hand) +static dds_return_t dds_readcdr_impl (enum dds_read_impl_common_oper oper, dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask, dds_instance_handle_t hand) { if (buf == NULL || si == NULL) return DDS_RETCODE_BAD_PARAMETER; struct dds_read_collect_sample_arg collect_arg; DDSRT_STATIC_ASSERT (sizeof (struct ddsi_serdata *) == sizeof (void *)); dds_read_collect_sample_arg_init (&collect_arg, (void **) buf, si, NULL, NULL); - const dds_return_t ret = dds_read_with_collector_impl (take, reader_or_condition, maxs, mask, hand, true, dds_read_collect_sample_refs, &collect_arg); + const dds_return_t ret = dds_read_with_collector_impl (oper, reader_or_condition, maxs, mask, hand, true, dds_read_collect_sample_refs, &collect_arg); return ret; } static dds_return_t return_reader_loan_locked (dds_reader *rd, void **buf, int32_t bufsz) ddsrt_nonnull_all ddsrt_attribute_warn_unused_result; -static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, void **buf, size_t bufsz, uint32_t maxs, dds_sample_info_t *si, uint32_t mask, dds_instance_handle_t hand, bool only_reader) +static dds_return_t dds_read_impl (enum dds_read_impl_common_oper oper, dds_entity_t reader_or_condition, void **buf, size_t bufsz, uint32_t maxs, dds_sample_info_t *si, uint32_t mask, dds_instance_handle_t hand, bool only_reader) { if (buf == NULL || si == NULL || maxs == 0 || bufsz == 0 || bufsz < maxs || maxs > INT32_MAX) return DDS_RETCODE_BAD_PARAMETER; @@ -231,7 +245,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, dds_read_collect_sample_arg_init (&collect_arg, buf, si, rd->m_loans, rd->m_heap_loan_cache); const bool use_loan = (buf[0] == NULL); const dds_read_with_collector_fn_t collect_sample = use_loan ? dds_read_collect_sample_loan : dds_read_collect_sample; - ret = dds_read_impl_common (take, rd, cond, maxs, mask, hand, collect_sample, &collect_arg); + ret = dds_read_impl_common (oper, rd, cond, maxs, mask, hand, collect_sample, &collect_arg); // If use_loan, make sure the `buf` is either fully initialized or ends on a null pointer // so the various paths returning loans know when to stop. (If no data returned and using @@ -254,9 +268,39 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, return ret; } +dds_return_t dds_peek (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs) +{ + return dds_read_impl (READ_OPER_PEEK, reader_or_condition, buf, bufsz, maxs, si, 0, DDS_HANDLE_NIL, false); +} + +dds_return_t dds_peek_mask (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, uint32_t mask) +{ + return dds_read_impl (READ_OPER_PEEK, reader_or_condition, buf, bufsz, maxs, si, mask, DDS_HANDLE_NIL, false); +} + +dds_return_t dds_peek_instance (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, dds_instance_handle_t handle) +{ + if (handle == DDS_HANDLE_NIL) + return DDS_RETCODE_PRECONDITION_NOT_MET; + return dds_read_impl (READ_OPER_PEEK, reader_or_condition, buf, bufsz, maxs, si, 0, handle, false); +} + +dds_return_t dds_peek_instance_mask (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, dds_instance_handle_t handle, uint32_t mask) +{ + if (handle == DDS_HANDLE_NIL) + return DDS_RETCODE_PRECONDITION_NOT_MET; + return dds_read_impl (READ_OPER_PEEK, reader_or_condition, buf, bufsz, maxs, si, mask, handle, false); +} + +dds_return_t dds_peek_next (dds_entity_t reader, void **buf, dds_sample_info_t *si) +{ + uint32_t mask = DDS_NOT_READ_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE; + return dds_read_impl (READ_OPER_PEEK, reader, buf, 1u, 1u, si, mask, DDS_HANDLE_NIL, true); +} + dds_return_t dds_read (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs) { - return dds_read_impl (false, reader_or_condition, buf, bufsz, maxs, si, 0, DDS_HANDLE_NIL, false); + return dds_read_impl (READ_OPER_READ, reader_or_condition, buf, bufsz, maxs, si, 0, DDS_HANDLE_NIL, false); } dds_return_t dds_read_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs) @@ -266,7 +310,7 @@ dds_return_t dds_read_wl (dds_entity_t reader_or_condition, void **buf, dds_samp dds_return_t dds_read_mask (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, uint32_t mask) { - return dds_read_impl (false, reader_or_condition, buf, bufsz, maxs, si, mask, DDS_HANDLE_NIL, false); + return dds_read_impl (READ_OPER_READ, reader_or_condition, buf, bufsz, maxs, si, mask, DDS_HANDLE_NIL, false); } dds_return_t dds_read_mask_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs, uint32_t mask) @@ -278,7 +322,7 @@ dds_return_t dds_read_instance (dds_entity_t reader_or_condition, void **buf, dd { if (handle == DDS_HANDLE_NIL) return DDS_RETCODE_PRECONDITION_NOT_MET; - return dds_read_impl (false, reader_or_condition, buf, bufsz, maxs, si, 0, handle, false); + return dds_read_impl (READ_OPER_READ, reader_or_condition, buf, bufsz, maxs, si, 0, handle, false); } dds_return_t dds_read_instance_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs, dds_instance_handle_t handle) @@ -290,7 +334,7 @@ dds_return_t dds_read_instance_mask (dds_entity_t reader_or_condition, void **bu { if (handle == DDS_HANDLE_NIL) return DDS_RETCODE_PRECONDITION_NOT_MET; - return dds_read_impl (false, reader_or_condition, buf, bufsz, maxs, si, mask, handle, false); + return dds_read_impl (READ_OPER_READ, reader_or_condition, buf, bufsz, maxs, si, mask, handle, false); } dds_return_t dds_read_instance_mask_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs, dds_instance_handle_t handle, uint32_t mask) @@ -301,7 +345,7 @@ dds_return_t dds_read_instance_mask_wl (dds_entity_t reader_or_condition, void * dds_return_t dds_read_next (dds_entity_t reader, void **buf, dds_sample_info_t *si) { uint32_t mask = DDS_NOT_READ_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE; - return dds_read_impl (false, reader, buf, 1u, 1u, si, mask, DDS_HANDLE_NIL, true); + return dds_read_impl (READ_OPER_READ, reader, buf, 1u, 1u, si, mask, DDS_HANDLE_NIL, true); } dds_return_t dds_read_next_wl (dds_entity_t reader, void **buf, dds_sample_info_t *si) @@ -311,7 +355,7 @@ dds_return_t dds_read_next_wl (dds_entity_t reader, void **buf, dds_sample_info_ dds_return_t dds_take (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs) { - return dds_read_impl (true, reader_or_condition, buf, bufsz, maxs, si, 0, DDS_HANDLE_NIL, false); + return dds_read_impl (READ_OPER_TAKE, reader_or_condition, buf, bufsz, maxs, si, 0, DDS_HANDLE_NIL, false); } dds_return_t dds_take_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs) @@ -321,7 +365,7 @@ dds_return_t dds_take_wl (dds_entity_t reader_or_condition, void **buf, dds_samp dds_return_t dds_take_mask (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, uint32_t mask) { - return dds_read_impl (true, reader_or_condition, buf, bufsz, maxs, si, mask, DDS_HANDLE_NIL, false); + return dds_read_impl (READ_OPER_TAKE, reader_or_condition, buf, bufsz, maxs, si, mask, DDS_HANDLE_NIL, false); } dds_return_t dds_take_mask_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs, uint32_t mask) @@ -333,7 +377,7 @@ dds_return_t dds_take_instance (dds_entity_t reader_or_condition, void **buf, dd { if (handle == DDS_HANDLE_NIL) return DDS_RETCODE_PRECONDITION_NOT_MET; - return dds_read_impl (true, reader_or_condition, buf, bufsz, maxs, si, 0, handle, false); + return dds_read_impl (READ_OPER_TAKE, reader_or_condition, buf, bufsz, maxs, si, 0, handle, false); } dds_return_t dds_take_instance_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs, dds_instance_handle_t handle) @@ -345,7 +389,7 @@ dds_return_t dds_take_instance_mask (dds_entity_t reader_or_condition, void **bu { if (handle == DDS_HANDLE_NIL) return DDS_RETCODE_PRECONDITION_NOT_MET; - return dds_read_impl (true, reader_or_condition, buf, bufsz, maxs, si, mask, handle, false); + return dds_read_impl (READ_OPER_TAKE, reader_or_condition, buf, bufsz, maxs, si, mask, handle, false); } dds_return_t dds_take_instance_mask_wl (dds_entity_t reader_or_condition, void **buf, dds_sample_info_t *si, uint32_t maxs, dds_instance_handle_t handle, uint32_t mask) @@ -356,7 +400,7 @@ dds_return_t dds_take_instance_mask_wl (dds_entity_t reader_or_condition, void * dds_return_t dds_take_next (dds_entity_t reader, void **buf, dds_sample_info_t *si) { uint32_t mask = DDS_NOT_READ_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE; - return dds_read_impl (true, reader, buf, 1u, 1u, si, mask, DDS_HANDLE_NIL, true); + return dds_read_impl (READ_OPER_TAKE, reader, buf, 1u, 1u, si, mask, DDS_HANDLE_NIL, true); } dds_return_t dds_take_next_wl (dds_entity_t reader, void **buf, dds_sample_info_t *si) @@ -364,38 +408,55 @@ dds_return_t dds_take_next_wl (dds_entity_t reader, void **buf, dds_sample_info_ return dds_take_next (reader, buf, si); } +dds_return_t dds_peekcdr (dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask) +{ + return dds_readcdr_impl (READ_OPER_PEEK, reader_or_condition, buf, maxs, si, mask, DDS_HANDLE_NIL); +} + +dds_return_t dds_peekcdr_instance (dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, dds_instance_handle_t handle, uint32_t mask) +{ + if (handle == DDS_HANDLE_NIL) + return DDS_RETCODE_PRECONDITION_NOT_MET; + return dds_readcdr_impl (READ_OPER_PEEK, reader_or_condition, buf, maxs, si, mask, handle); +} + dds_return_t dds_readcdr (dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask) { - return dds_readcdr_impl (false, reader_or_condition, buf, maxs, si, mask, DDS_HANDLE_NIL); + return dds_readcdr_impl (READ_OPER_READ, reader_or_condition, buf, maxs, si, mask, DDS_HANDLE_NIL); } dds_return_t dds_readcdr_instance (dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, dds_instance_handle_t handle, uint32_t mask) { if (handle == DDS_HANDLE_NIL) return DDS_RETCODE_PRECONDITION_NOT_MET; - return dds_readcdr_impl(false, reader_or_condition, buf, maxs, si, mask, handle); + return dds_readcdr_impl (READ_OPER_READ, reader_or_condition, buf, maxs, si, mask, handle); } dds_return_t dds_takecdr (dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask) { - return dds_readcdr_impl (true, reader_or_condition, buf, maxs, si, mask, DDS_HANDLE_NIL); + return dds_readcdr_impl (READ_OPER_TAKE, reader_or_condition, buf, maxs, si, mask, DDS_HANDLE_NIL); } dds_return_t dds_takecdr_instance (dds_entity_t reader_or_condition, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, dds_instance_handle_t handle, uint32_t mask) { if (handle == DDS_HANDLE_NIL) return DDS_RETCODE_PRECONDITION_NOT_MET; - return dds_readcdr_impl (true, reader_or_condition, buf, maxs, si, mask, handle); + return dds_readcdr_impl (READ_OPER_TAKE, reader_or_condition, buf, maxs, si, mask, handle); +} + +dds_return_t dds_peek_with_collector (dds_entity_t reader_or_condition, uint32_t maxs, dds_instance_handle_t handle, uint32_t mask, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) +{ + return dds_read_with_collector_impl (READ_OPER_PEEK, reader_or_condition, maxs, mask, handle, false, collect_sample, collect_sample_arg); } dds_return_t dds_read_with_collector (dds_entity_t reader_or_condition, uint32_t maxs, dds_instance_handle_t handle, uint32_t mask, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { - return dds_read_with_collector_impl (false, reader_or_condition, maxs, mask, handle, false, collect_sample, collect_sample_arg); + return dds_read_with_collector_impl (READ_OPER_READ, reader_or_condition, maxs, mask, handle, false, collect_sample, collect_sample_arg); } dds_return_t dds_take_with_collector (dds_entity_t reader_or_condition, uint32_t maxs, dds_instance_handle_t handle, uint32_t mask, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { - return dds_read_with_collector_impl (true, reader_or_condition, maxs, mask, handle, false, collect_sample, collect_sample_arg); + return dds_read_with_collector_impl (READ_OPER_TAKE, reader_or_condition, maxs, mask, handle, false, collect_sample, collect_sample_arg); } static void return_reader_loan_locked_onesample (dds_reader *rd, dds_loaned_sample_t *loan, bool reset) diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index b6cde37f37..17e058c174 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -20,6 +20,7 @@ DDS_EXPORT extern inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict DDS_EXPORT extern inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid); DDS_EXPORT extern inline void dds_rhc_set_qos (struct dds_rhc *rhc, const struct dds_qos *qos); DDS_EXPORT extern inline void dds_rhc_free (struct dds_rhc *rhc); +DDS_EXPORT extern inline int32_t dds_rhc_peek (struct dds_rhc *rhc, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg); DDS_EXPORT extern inline int32_t dds_rhc_read (struct dds_rhc *rhc, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg); DDS_EXPORT extern inline int32_t dds_rhc_take (struct dds_rhc *rhc, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg); DDS_EXPORT extern inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond); diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index 0ed06a2906..c397b3c0a0 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -2102,7 +2102,7 @@ static void readtake_w_qminv_inst_get_rank_info (const struct readtake_w_qminv_i } } -static int32_t read_w_qminv_inst_validsamples (const struct readtake_w_qminv_inst_state * __restrict state, struct rhc_instance * const __restrict inst, struct trigger_info_pre *pre, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc) +static int32_t read_w_qminv_inst_validsamples (const struct readtake_w_qminv_inst_state * __restrict state, bool mark_as_read, struct rhc_instance * const __restrict inst, struct trigger_info_pre *pre, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc) { assert (*state->limit > 0); assert (inst->latest && (qmask_of_inst (inst) & state->qminv) == 0); @@ -2140,7 +2140,7 @@ static int32_t read_w_qminv_inst_validsamples (const struct readtake_w_qminv_ins // the presence of a problem (if it persists) return rc; } - if (!sample->isread) + if (mark_as_read && !sample->isread) { read_sample_update_conditions (state->rhc, pre, post, trig_qc, inst, sample->conds, false); sample->isread = true; @@ -2210,7 +2210,7 @@ static int32_t take_w_qminv_inst_validsamples (const struct readtake_w_qminv_ins return DDS_RETCODE_OK; } -static int32_t read_w_qminv_inst (const struct readtake_w_qminv_inst_state * __restrict state, struct rhc_instance * const __restrict inst) +static int32_t read_w_qminv_inst (const struct readtake_w_qminv_inst_state * __restrict state, bool mark_as_read, struct rhc_instance * const __restrict inst) { const int32_t initial_limit = *state->limit; assert (*state->limit > 0); @@ -2229,7 +2229,7 @@ static int32_t read_w_qminv_inst (const struct readtake_w_qminv_inst_state * __r init_trigger_info_qcond (&trig_qc); /* valid samples come first */ - if (inst->latest && (rc = read_w_qminv_inst_validsamples (state, inst, &pre, &post, &trig_qc)) < 0) + if (inst->latest && (rc = read_w_qminv_inst_validsamples (state, mark_as_read, inst, &pre, &post, &trig_qc)) < 0) goto abort_on_error; /* add an invalid sample if it exists, matches and there is room in the result */ @@ -2242,7 +2242,7 @@ static int32_t read_w_qminv_inst (const struct readtake_w_qminv_inst_state * __r make_sample_info_invsample (&si, inst); if ((rc = state->collect_sample (state->collect_sample_arg, &si, state->rhc->type, inst->tk->m_sample)) < 0) goto abort_on_error; - if (!inst->inv_isread) + if (mark_as_read && !inst->inv_isread) { read_sample_update_conditions (state->rhc, &pre, &post, &trig_qc, inst, inst->conds, false); inst->inv_isread = 1; @@ -2253,7 +2253,7 @@ static int32_t read_w_qminv_inst (const struct readtake_w_qminv_inst_state * __r abort_on_error: ; bool inst_became_old = false; - if (*state->limit < initial_limit && inst->isnew) + if (mark_as_read && *state->limit < initial_limit && inst->isnew) { inst_became_old = true; inst->isnew = 0; @@ -2329,7 +2329,7 @@ abort_on_error: ; return rc; } -static dds_return_t read_w_qminv (const struct readtake_w_qminv_inst_state * __restrict state, dds_instance_handle_t handle) +static dds_return_t read_w_qminv (const struct readtake_w_qminv_inst_state * __restrict state, bool mark_as_read, dds_instance_handle_t handle) { struct dds_rhc_default * const rhc = state->rhc; dds_return_t rc = DDS_RETCODE_OK; @@ -2345,7 +2345,7 @@ static dds_return_t read_w_qminv (const struct readtake_w_qminv_inst_state * __r struct rhc_instance template, *inst; template.iid = handle; if ((inst = ddsrt_hh_lookup (rhc->instances, &template)) != NULL) - rc = read_w_qminv_inst (state, inst); + rc = read_w_qminv_inst (state, mark_as_read, inst); else rc = DDS_RETCODE_PRECONDITION_NOT_MET; } @@ -2354,7 +2354,7 @@ static dds_return_t read_w_qminv (const struct readtake_w_qminv_inst_state * __r struct rhc_instance * inst = oldest_nonempty_instance (rhc); struct rhc_instance * const end = inst; do { - rc = read_w_qminv_inst (state, inst); + rc = read_w_qminv_inst (state, mark_as_read, inst); inst = next_nonempty_instance (inst); } while (rc >= 0 && inst != end && *state->limit > 0); } @@ -2782,36 +2782,45 @@ static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_f ****** READ/TAKE ****** *************************/ -static int32_t dds_rhc_default_read (struct dds_rhc *rhc_common, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) +static struct readtake_w_qminv_inst_state make_readtake_w_qminv_inst_state (struct dds_rhc_default *rhc, int32_t *limit, uint32_t mask, dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { - struct dds_rhc_default * const rhc = (struct dds_rhc_default *) rhc_common; - uint32_t qminv = qmask_from_mask_n_cond (mask, cond); - int32_t limit = max_samples; - const struct readtake_w_qminv_inst_state readtake_w_qminv_inst_state = { + const struct readtake_w_qminv_inst_state st = { .rhc = rhc, - .limit = &limit, - .qminv = qminv, + .limit = limit, + .qminv = qmask_from_mask_n_cond (mask, cond), .qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0, .collect_sample = collect_sample, .collect_sample_arg = collect_sample_arg }; - dds_return_t rc = read_w_qminv (&readtake_w_qminv_inst_state, handle); + return st; +} + +static int32_t dds_rhc_default_peek (struct dds_rhc *rhc_common, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) +{ + struct dds_rhc_default * const rhc = (struct dds_rhc_default *) rhc_common; + int32_t limit = max_samples; + const struct readtake_w_qminv_inst_state readtake_w_qminv_inst_state = + make_readtake_w_qminv_inst_state (rhc, &limit, mask, cond, collect_sample, collect_sample_arg); + dds_return_t rc = read_w_qminv (&readtake_w_qminv_inst_state, false, handle); + return (rc < 0 && limit == max_samples) ? rc : (max_samples - limit); +} + +static int32_t dds_rhc_default_read (struct dds_rhc *rhc_common, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) +{ + struct dds_rhc_default * const rhc = (struct dds_rhc_default *) rhc_common; + int32_t limit = max_samples; + const struct readtake_w_qminv_inst_state readtake_w_qminv_inst_state = + make_readtake_w_qminv_inst_state (rhc, &limit, mask, cond, collect_sample, collect_sample_arg); + dds_return_t rc = read_w_qminv (&readtake_w_qminv_inst_state, true, handle); return (rc < 0 && limit == max_samples) ? rc : (max_samples - limit); } static int32_t dds_rhc_default_take (struct dds_rhc *rhc_common, int32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond, dds_read_with_collector_fn_t collect_sample, void *collect_sample_arg) { struct dds_rhc_default * const rhc = (struct dds_rhc_default *) rhc_common; - uint32_t qminv = qmask_from_mask_n_cond(mask, cond); int32_t limit = max_samples; - const struct readtake_w_qminv_inst_state readtake_w_qminv_inst_state = { - .rhc = rhc, - .limit = &limit, - .qminv = qminv, - .qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0, - .collect_sample = collect_sample, - .collect_sample_arg = collect_sample_arg - }; + const struct readtake_w_qminv_inst_state readtake_w_qminv_inst_state = + make_readtake_w_qminv_inst_state (rhc, &limit, mask, cond, collect_sample, collect_sample_arg); dds_return_t rc = take_w_qminv (&readtake_w_qminv_inst_state, handle); return (rc < 0 && limit == max_samples) ? rc : (max_samples - limit); } @@ -2998,6 +3007,7 @@ static const struct dds_rhc_ops dds_rhc_default_ops = { .set_qos = dds_rhc_default_set_qos, .free = dds_rhc_default_free }, + .peek = dds_rhc_default_peek, .read = dds_rhc_default_read, .take = dds_rhc_default_take, .add_readcondition = dds_rhc_default_add_readcondition, diff --git a/src/core/ddsc/tests/readcollect.c b/src/core/ddsc/tests/readcollect.c index 53c9a5c7b4..a15d819978 100644 --- a/src/core/ddsc/tests/readcollect.c +++ b/src/core/ddsc/tests/readcollect.c @@ -108,8 +108,9 @@ static void dotest (read_op op) CU_ASSERT_FATAL (rc == 1); CU_ASSERT_FATAL (arg2.k == (1+arg1.k)%3); - assert (op == dds_read_with_collector || op == dds_take_with_collector); + assert (op == dds_peek_with_collector || op == dds_read_with_collector || op == dds_take_with_collector); bool isread = (op == dds_read_with_collector); + bool isnew = (op == dds_peek_with_collector); // check that the remainder is as we expect it Space_Type1 xs[10]; @@ -117,29 +118,35 @@ static void dotest (read_op op) void *ptrs[10]; for (uint32_t i = 0; i < 10; i++) ptrs[i] = &xs[i]; - rc = dds_take (rd, ptrs, si, 2 + isread, 2 + isread); + rc = dds_take (rd, ptrs, si, (size_t) (2 + isread + isnew), (uint32_t) (2 + isread + isnew)); for (int i = 0; i < rc; i++) - printf ("take(1) %"PRId32", %"PRId32"\n", xs[i].long_1, xs[i].long_2); - CU_ASSERT_FATAL (rc == 2 + isread); + printf ("take(1) %"PRId32", %"PRId32" %c%c\n", xs[i].long_1, xs[i].long_2, + (si[i].sample_state == DDS_NOT_READ_SAMPLE_STATE) ? 'f' : 's', + (si[i].view_state == DDS_NEW_VIEW_STATE) ? 'n' : 'o'); + CU_ASSERT_FATAL (rc == (int32_t) (2 + isread + isnew)); for (int i = 0; i < rc; i++) { CU_ASSERT_FATAL (xs[i].long_1 == arg1.k); CU_ASSERT_FATAL (si[i].sample_state == (i == 0 && isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE)); - CU_ASSERT_FATAL (si[i].view_state == DDS_NOT_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[i].view_state == (isnew ? DDS_NEW_VIEW_STATE : DDS_NOT_NEW_VIEW_STATE)); } - rc = dds_take_instance (rd, ptrs, si, 2 + isread, 2 + isread, ih); + rc = dds_take_instance (rd, ptrs, si, (size_t) (2 + isread + isnew), (uint32_t) (2 + isread + isnew), ih); for (int i = 0; i < rc; i++) - printf ("take(2) %"PRId32", %"PRId32"\n", xs[i].long_1, xs[i].long_2); - CU_ASSERT_FATAL (rc == 2 + isread); + printf ("take(2) %"PRId32", %"PRId32" %c%c\n", xs[i].long_1, xs[i].long_2, + (si[i].sample_state == DDS_NOT_READ_SAMPLE_STATE) ? 'f' : 's', + (si[i].view_state == DDS_NEW_VIEW_STATE) ? 'n' : 'o'); + CU_ASSERT_FATAL (rc == (int32_t) (2 + isread + isnew)); for (int i = 0; i < rc; i++) { CU_ASSERT_FATAL (xs[i].long_1 == arg2.k); CU_ASSERT_FATAL (si[i].sample_state == (i == 0 && isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE)); - CU_ASSERT_FATAL (si[i].view_state == DDS_NOT_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[i].view_state == (isnew ? DDS_NEW_VIEW_STATE : DDS_NOT_NEW_VIEW_STATE)); } rc = dds_take (rd, ptrs, si, 10, 10); for (int i = 0; i < rc; i++) - printf ("take(3) %"PRId32", %"PRId32"\n", xs[i].long_1, xs[i].long_2); + printf ("take(3) %"PRId32", %"PRId32" %c%c\n", xs[i].long_1, xs[i].long_2, + (si[i].sample_state == DDS_NOT_READ_SAMPLE_STATE) ? 'f' : 's', + (si[i].view_state == DDS_NEW_VIEW_STATE) ? 'n' : 'o'); CU_ASSERT_FATAL (rc == 3); for (int i = 0; i < rc; i++) { @@ -152,6 +159,11 @@ static void dotest (read_op op) CU_ASSERT_FATAL (rc == 0); } +CU_Test(ddsc_read_with_collector, peek) +{ + dotest (dds_peek_with_collector); +} + CU_Test(ddsc_read_with_collector, read) { dotest (dds_read_with_collector); diff --git a/src/core/ddsc/tests/reader.c b/src/core/ddsc/tests/reader.c index 227e0de6a0..a4ff9c47b7 100644 --- a/src/core/ddsc/tests/reader.c +++ b/src/core/ddsc/tests/reader.c @@ -3393,3 +3393,159 @@ CU_Test (ddsc_take, sample_rank) { do_readtake_sample_rank ("took", dds_take_mask); } + +static void ddsc_peek_setup (dds_entity_t *dp, dds_entity_t *rd, dds_instance_handle_t *ih) +{ + *dp = dds_create_participant (0, NULL, NULL); + CU_ASSERT_FATAL (*dp > 0); + char topicname[100]; + create_unique_topic_name ("ddsc_peek", topicname, sizeof (topicname)); + const dds_entity_t tp = dds_create_topic (*dp, &Space_Type1_desc, topicname, NULL, NULL); + CU_ASSERT_FATAL (tp > 0); + *rd = dds_create_reader (*dp, tp, NULL, NULL); + CU_ASSERT_FATAL (*rd > 0); + const dds_entity_t wr = dds_create_writer (*dp, tp, NULL, NULL); + CU_ASSERT_FATAL (wr > 0); + dds_return_t rc; + rc = dds_write (wr, &(Space_Type1){ 2,2,2 }); + CU_ASSERT_FATAL (rc == 0); + rc = dds_write (wr, &(Space_Type1){ 3,3,3 }); + CU_ASSERT_FATAL (rc == 0); + rc = dds_write (wr, &(Space_Type1){ 5,5,5 }); + CU_ASSERT_FATAL (rc == 0); + *ih = dds_lookup_instance (*rd, &(Space_Type1){ 3,3,3 }); + CU_ASSERT_FATAL (*ih != 0); +} + +CU_Test (ddsc_peek, plain) +{ + dds_entity_t dp, rd; + dds_instance_handle_t ih; + dds_entity_t rc; + ddsc_peek_setup (&dp, &rd, &ih); + dds_sample_info_t si[3]; + Space_Type1 xs[3]; + void *ptrs[] = { &xs[0], &xs[1], &xs[2] }; + for (int k = 0; k < 2; k++) + { + uint32_t seen = 0; + rc = dds_peek (rd, ptrs, si, 3, 3); + CU_ASSERT_FATAL (rc == 3); + for (int32_t i = 0; i < rc; i++) + { + CU_ASSERT_FATAL (xs[i].long_1 == xs[i].long_2 && xs[i].long_2 == xs[i].long_3); + CU_ASSERT_FATAL (xs[i].long_1 <= 31); + seen |= 1u << xs[i].long_1; + CU_ASSERT_FATAL (si[i].view_state == DDS_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[i].sample_state == DDS_NOT_READ_SAMPLE_STATE); + } + CU_ASSERT_FATAL (seen == 44u); + } + rc = dds_delete (dp); + CU_ASSERT_FATAL (rc == 0); +} + +CU_Test (ddsc_peek, mask) +{ + dds_entity_t dp, rd; + dds_instance_handle_t ih; + dds_entity_t rc; + ddsc_peek_setup (&dp, &rd, &ih); + dds_sample_info_t si[3]; + Space_Type1 xs[3]; + void *ptrs[] = { &xs[0], &xs[1], &xs[2] }; + rc = dds_read (rd, ptrs, si, 1, 1); + CU_ASSERT_FATAL (rc == 1 && xs[0].long_1 <= 31); + const uint32_t not_visible = 1u << xs[0].long_1; + // dds_read gest tested elsewhere, no need to redo that here + for (int k = 0; k < 2; k++) + { + uint32_t seen = 0; + rc = dds_peek_mask (rd, ptrs, si, 3, 3, DDS_NOT_READ_SAMPLE_STATE); + CU_ASSERT_FATAL (rc == 2); + for (int32_t i = 0; i < rc; i++) + { + CU_ASSERT_FATAL (xs[i].long_1 == xs[i].long_2 && xs[i].long_2 == xs[i].long_3); + CU_ASSERT_FATAL (xs[i].long_1 <= 31); + seen |= 1u << xs[i].long_1; + CU_ASSERT_FATAL (si[i].view_state == DDS_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[i].sample_state == DDS_NOT_READ_SAMPLE_STATE); + } + CU_ASSERT_FATAL (seen == (44u & ~not_visible)); + } + rc = dds_delete (dp); + CU_ASSERT_FATAL (rc == 0); +} + +CU_Test (ddsc_peek, instance) +{ + dds_entity_t dp, rd; + dds_instance_handle_t ih; + dds_entity_t rc; + ddsc_peek_setup (&dp, &rd, &ih); + dds_sample_info_t si[3]; + Space_Type1 xs[3]; + void *ptrs[] = { &xs[0], &xs[1], &xs[2] }; + rc = dds_peek_instance (rd, ptrs, si, 3, 3, ih); + CU_ASSERT_FATAL (rc == 1); + CU_ASSERT_FATAL (xs[0].long_1 == xs[0].long_2 && xs[0].long_2 == xs[0].long_3); + CU_ASSERT_FATAL (xs[0].long_1 <= 31); + const int32_t expect = xs[0].long_1; + CU_ASSERT_FATAL (si[0].view_state == DDS_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[0].sample_state == DDS_NOT_READ_SAMPLE_STATE); + rc = dds_peek_instance (rd, ptrs, si, 3, 3, ih); + CU_ASSERT_FATAL (rc == 1); + CU_ASSERT_FATAL (xs[0].long_1 == xs[0].long_2 && xs[0].long_2 == xs[0].long_3); + CU_ASSERT_FATAL (xs[0].long_1 == expect); + CU_ASSERT_FATAL (si[0].view_state == DDS_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[0].sample_state == DDS_NOT_READ_SAMPLE_STATE); + rc = dds_delete (dp); + CU_ASSERT_FATAL (rc == 0); +} + +CU_Test (ddsc_peek, instance_mask) +{ + dds_entity_t dp, rd; + dds_instance_handle_t ih; + dds_entity_t rc; + ddsc_peek_setup (&dp, &rd, &ih); + dds_sample_info_t si[3]; + Space_Type1 xs[3]; + void *ptrs[] = { &xs[0], &xs[1], &xs[2] }; + rc = dds_read_instance (rd, ptrs, si, 1, 1, ih); + CU_ASSERT_FATAL (rc == 1 && xs[0].long_1 <= 31); + const int32_t expect = xs[0].long_1; + rc = dds_peek_instance_mask (rd, ptrs, si, 3, 3, ih, DDS_NOT_READ_SAMPLE_STATE); + CU_ASSERT_FATAL (rc == 0); + rc = dds_peek_instance_mask (rd, ptrs, si, 3, 3, ih, DDS_READ_SAMPLE_STATE); + CU_ASSERT_FATAL (rc == 1); + CU_ASSERT_FATAL (xs[0].long_1 == xs[0].long_2 && xs[0].long_2 == xs[0].long_3); + CU_ASSERT_FATAL (xs[0].long_1 == expect); + CU_ASSERT_FATAL (si[0].view_state == DDS_NOT_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[0].sample_state == DDS_READ_SAMPLE_STATE); + rc = dds_delete (dp); + CU_ASSERT_FATAL (rc == 0); +} + +CU_Test (ddsc_peek, next) +{ + dds_entity_t dp, rd; + dds_instance_handle_t ih; + dds_entity_t rc; + ddsc_peek_setup (&dp, &rd, &ih); + dds_sample_info_t si[3]; + Space_Type1 xs[3]; + void *ptrs[] = { &xs[0], &xs[1], &xs[2] }; + // not much of an expectation yet, but at least it should return something + // unread even if we try it a couple of times + for (int i = 0; i < 4; i++) + { + rc = dds_peek_next (rd, ptrs, si); + CU_ASSERT_FATAL (rc == 1); + CU_ASSERT_FATAL (xs[0].long_1 == xs[0].long_2 && xs[0].long_2 == xs[0].long_3); + CU_ASSERT_FATAL (si[0].view_state == DDS_NEW_VIEW_STATE); + CU_ASSERT_FATAL (si[0].sample_state == DDS_NOT_READ_SAMPLE_STATE); + } + rc = dds_delete (dp); + CU_ASSERT_FATAL (rc == 0); +} diff --git a/src/core/xtests/symbol_export/symbol_export.c b/src/core/xtests/symbol_export/symbol_export.c index bea8f1060f..d05e329e82 100644 --- a/src/core/xtests/symbol_export/symbol_export.c +++ b/src/core/xtests/symbol_export/symbol_export.c @@ -190,6 +190,11 @@ int main (int argc, char **argv) dds_waitset_set_trigger (1, 0); dds_waitset_wait (1, ptr, 0, 0); dds_waitset_wait_until (1, ptr, 0, 0); + dds_peek (1, ptr, ptr, 0, 0); + dds_peek_mask (1, ptr, ptr, 0, 0, 0); + dds_peek_instance (1, ptr, ptr, 0, 0, 1); + dds_peek_instance_mask (1, ptr, ptr, 0, 0, 1, 0); + dds_peek_next (1, ptr, ptr); dds_read (1, ptr, ptr, 0, 0); dds_read_wl (1, ptr, ptr, 0); dds_read_mask (1, ptr, ptr, 0, 0, 0); @@ -198,22 +203,25 @@ int main (int argc, char **argv) dds_read_instance_wl (1, ptr, ptr, 0, 1); dds_read_instance_mask (1, ptr, ptr, 0, 0, 1, 0); dds_read_instance_mask_wl (1, ptr, ptr, 0, 1, 0); + dds_read_next (1, ptr, ptr); + dds_read_next_wl (1, ptr, ptr); dds_take (1, ptr, ptr, 0, 0); dds_take_wl (1, ptr, ptr, 0); dds_take_mask (1, ptr, ptr, 0, 0, 0); dds_take_mask_wl (1, ptr, ptr, 0, 0); - dds_readcdr (1, ptr, 0, ptr, 0); - dds_readcdr_instance (1, ptr, 0, ptr, 1, 0); - dds_takecdr (1, ptr, 0, ptr, 0); - dds_takecdr_instance (1, ptr, 0, ptr, 1, 0); dds_take_instance (1, ptr, ptr, 0, 0, 1); dds_take_instance_wl (1, ptr, ptr, 0, 1); dds_take_instance_mask (1, ptr, ptr, 0, 0, 1, 0); dds_take_instance_mask_wl (1, ptr, ptr, 0, 1, 0); dds_take_next (1, ptr, ptr); dds_take_next_wl (1, ptr, ptr); - dds_read_next (1, ptr, ptr); - dds_read_next_wl (1, ptr, ptr); + dds_peekcdr (1, ptr, 0, ptr, 0); + dds_peekcdr_instance (1, ptr, 0, ptr, 1, 0); + dds_readcdr (1, ptr, 0, ptr, 0); + dds_readcdr_instance (1, ptr, 0, ptr, 1, 0); + dds_takecdr (1, ptr, 0, ptr, 0); + dds_takecdr_instance (1, ptr, 0, ptr, 1, 0); + dds_peek_with_collector (1, 0, 1, 0, test_collect_sample, ptr); dds_read_with_collector (1, 0, 1, 0, test_collect_sample, ptr); dds_take_with_collector (1, 0, 1, 0, test_collect_sample, ptr); dds_lookup_instance (1, ptr); @@ -433,6 +441,7 @@ int main (int argc, char **argv) dds_rhc_relinquish_ownership (ptr, 1); dds_rhc_set_qos (ptr, ptr); dds_rhc_free (ptr); + dds_rhc_peek (ptr, 0, 0, 1, ptr, 0, 0); dds_rhc_read (ptr, 0, 0, 1, ptr, 0, 0); dds_rhc_take (ptr, 0, 0, 1, ptr, 0, 0); dds_rhc_add_readcondition (ptr, ptr); diff --git a/src/ddsrt/tests/thread_cleanup.c b/src/ddsrt/tests/thread_cleanup.c index 08264189d3..a39e24cab9 100644 --- a/src/ddsrt/tests/thread_cleanup.c +++ b/src/ddsrt/tests/thread_cleanup.c @@ -98,6 +98,7 @@ thread_main( } assert(targ->pop <= pushed); + (void) pushed; if (targ->block) { ddsrt_mutex_lock(targ->mutex);