diff --git a/src/core/ddsc/tests/psmx.c b/src/core/ddsc/tests/psmx.c index e54cd30db5..bf43d4c7e8 100644 --- a/src/core/ddsc/tests/psmx.c +++ b/src/core/ddsc/tests/psmx.c @@ -15,6 +15,7 @@ #include "dds/ddsrt/md5.h" #include "dds/ddsrt/io.h" #include "dds/ddsrt/heap.h" +#include "dds/ddsrt/string.h" #include "dds/ddsrt/bswap.h" #include "dds/ddsrt/environ.h" #include "dds/ddsrt/static_assert.h" @@ -98,6 +99,67 @@ static dds_entity_t create_participant (dds_domainid_t int_dom) return pp; } +static void config_psmx_free_content(struct ddsi_config_psmx* cfg) +{ + assert(cfg); + ddsrt_free(cfg->name); + ddsrt_free(cfg->library); + ddsrt_free(cfg->config); +} + +static void config_psmx_elm_free(struct ddsi_config_psmx_listelem* elm) +{ + assert(elm); + config_psmx_free_content(&elm->cfg); + ddsrt_free(elm); +} + +static bool domain_pin(dds_entity_t domain, dds_domain** dom_out) +{ + dds_entity* x = NULL; + dds_return_t rc = dds_entity_pin(domain, &x); + if ( rc == DDS_RETCODE_OK && dds_entity_kind(x) == DDS_KIND_DOMAIN ) { + *dom_out = (dds_domain*)x; + return true; + } + return false; +} + +static void domain_unpin(dds_domain* dom) +{ + assert(dom); + dds_entity_unpin((dds_entity*)dom); +} + +static bool domain_get_psmx_locator(dds_entity_t domain, ddsi_locator_t* l_out) +{ + dds_domain* dom = NULL; + bool ret = domain_pin(domain, &dom); + if ( ret ) { + memcpy(l_out, dom->psmx_instances.instances[0]->locator, sizeof(ddsi_locator_t)); + domain_unpin(dom); + } + return ret; +} + +static bool domain_has_psmx_instance_name(dds_entity_t domain, const char* inst_name) +{ + dds_domain* dom = NULL; + bool ret = domain_pin(domain, &dom); + bool match = false; + if ( ret ) { + uint32_t len = dom->psmx_instances.length; + for (uint32_t idx = 0; idx < len; ++idx) { + if ( strcmp(dom->psmx_instances.instances[idx]->instance_name, inst_name) == 0 ) { + match = true; + break; + } + } + domain_unpin(dom); + } + return ret && match; +} + struct tracebuf { char buf[512]; size_t pos; @@ -1264,6 +1326,378 @@ CU_Test (ddsc_psmx, basic) dds_delete (dds_get_parent (participant)); } +/// @brief Check that shared memory can be enabled and the used locator can be set. +/// @methodology +/// Case with Iceoryx: +/// - Check that the data types I'm planning to use are actually suitable for use with shared memory. +/// - Expectation: They are memcopy-safe. +/// - Create a configuration with a psmx interface capable of shared memory and specify the locator. +/// - Create a domain using this configuration. +/// - Create some entities +/// - Check the locator used for shared memory. +/// - Expectation: The locator is the same as specified in the config for the domain. +/// - Check if shared memory is enabled. +/// - Expectation: Shared memory is enabled. +/// - Write and read some topics. +/// - Expectation: Sent data is received by readers. +/// +/// - Create a configuration with a psmx interface capable of shared memory and don't specify a locator. +/// - Create a domain using this configuration. +/// - Create some entities +/// - Check the locator used for shared memory. +/// - Expectation: The second half of the locator is all zeros. +/// - Check if shared memory is enabled. +/// - Expectation: Shared memory is enabled. +/// +/// Case with cdds: +/// - Create a configuration with a psmx interface incapable of shared memory. +/// - Create a domain using this configuration. +/// - Create some entities +/// - Check if shared memory is enabled. +/// - Expectation: Shared memory is not enabled. +/// +CU_Test(ddsc_psmx, shared_memory) +{ + #define sample_cnt 8 + { + // Check that the data types I'm planning to use are actually suitable for use with shared memory. + dds_data_type_properties_t props; + props = dds_stream_data_types(SC_Model_desc.m_ops); + CU_ASSERT_FATAL((props & DDS_DATA_TYPE_IS_MEMCPY_SAFE) == DDS_DATA_TYPE_IS_MEMCPY_SAFE); + props = dds_stream_data_types(PsmxType1_desc.m_ops); + CU_ASSERT_FATAL((props & DDS_DATA_TYPE_IS_MEMCPY_SAFE) == DDS_DATA_TYPE_IS_MEMCPY_SAFE); + } + + const dds_domainid_t domainId = 42; + char* CDDS_PSMX_NAME = ddsrt_expand_envvars("${CDDS_PSMX_NAME:-cdds}", domainId); + bool using_psmx_iox = (strcmp(CDDS_PSMX_NAME, "iox") == 0); + ddsrt_free(CDDS_PSMX_NAME); + + if ( using_psmx_iox ) { + { + // Test case using a config with specified locator. + uint8_t locator_in[16]; + memset(locator_in, 0x0, sizeof(locator_in)); // avoid warning 'uninitialized value' + ((uint64_t*)locator_in)[0] = (uint64_t)0x4a4d203df6996395; + ((uint64_t*)locator_in)[1] = (uint64_t)0xe1412fbecc2de4b6; + char config_str[100]; + { + // Create the config string including a locator. + char locator_str[50]; + uint8_t* l = locator_in; + snprintf( + locator_str, + sizeof(locator_str), + "LOCATOR=%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", + l[0], l[1], l[2], l[3], l[4], l[5], l[6], l[7], l[8], l[9], l[10], l[11], l[12], l[13], l[14], l[15] + ); + snprintf( + config_str, + sizeof(config_str), + "SERVICE_NAME=psmx1;%s", + locator_str + ); + } + dds_entity_t domain = -1; + { + // Create domain + struct ddsi_config cfg; + ddsi_config_init_default(&cfg); + cfg.psmx_instances = ddsrt_malloc(sizeof(*cfg.psmx_instances)); + cfg.psmx_instances->cfg.name = ddsrt_strdup("iox"); + cfg.psmx_instances->cfg.library = ddsrt_strdup("psmx_iox"); + cfg.psmx_instances->cfg.config = ddsrt_strdup(config_str); // config including a locator + cfg.psmx_instances->cfg.priority.isdefault = 1; + cfg.psmx_instances->cfg.priority.value = 1000000; + cfg.psmx_instances->next = NULL; + domain = dds_create_domain_with_rawconfig(domainId, &cfg); + config_psmx_elm_free(cfg.psmx_instances); + } + CU_ASSERT_FATAL(domain > 0); + CU_ASSERT_FATAL(domain_has_psmx_instance_name(domain, "CycloneDDS-IOX-PSMX")); + dds_entity_t participant = dds_create_participant(domainId, NULL, NULL); + CU_ASSERT_FATAL(participant > 0); + dds_entity_t writer1, reader1, writer2, reader2; + { + char topicname[100]; + dds_qos_t* qos = dds_create_qos(); + dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, sample_cnt); + + create_unique_topic_name("shared_memory", topicname, sizeof(topicname)); + dds_entity_t topic1 = dds_create_topic(participant, &SC_Model_desc, topicname, qos, NULL); + CU_ASSERT_FATAL(topic1 > 0); + + writer1 = dds_create_writer(participant, topic1, qos, NULL); + CU_ASSERT_FATAL(writer1 > 0); + + reader1 = dds_create_reader(participant, topic1, qos, NULL); + CU_ASSERT_FATAL(reader1 > 0); + + create_unique_topic_name("shared_memory", topicname, sizeof(topicname)); + dds_entity_t topic2 = dds_create_topic(participant, &PsmxType1_desc, topicname, qos, NULL); + CU_ASSERT_FATAL(topic2 > 0); + + writer2 = dds_create_writer(participant, topic2, qos, NULL); + CU_ASSERT_FATAL(writer2 > 0); + + reader2 = dds_create_reader(participant, topic2, qos, NULL); + CU_ASSERT_FATAL(reader2 > 0); + dds_delete_qos(qos); + } + { + // Check that I get the same locator that I provided with the config. + ddsi_locator_t locator_out; + CU_ASSERT_FATAL(domain_get_psmx_locator(domain, &locator_out)); + CU_ASSERT_FATAL(memcmp(locator_in, locator_out.address, sizeof(locator_in)) == 0); + } + { + // Check that shared memory is enabled. + bool psmx_enabled; + psmx_enabled = endpoint_has_psmx_enabled(writer1); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(writer1)); + + psmx_enabled = endpoint_has_psmx_enabled(reader1); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(reader1)); + + psmx_enabled = endpoint_has_psmx_enabled(writer2); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(writer2)); + + psmx_enabled = endpoint_has_psmx_enabled(reader2); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(reader2)); + } + { + // Write and read samples + dds_return_t result; + { + SC_Model* sample; + for (int i = 0; i < sample_cnt; ++i) { + result = dds_request_loan(writer1, (void**)&sample); + CU_ASSERT_FATAL(result == DDS_RETCODE_OK); + sample->a = (uint8_t)i; + sample->b = (uint8_t)i + 1; + sample->c = (uint8_t)i + 2; + result = dds_write(writer1, sample); + CU_ASSERT_FATAL(result == DDS_RETCODE_OK); + } + } + { + PsmxType1* sample; + for (int i = 0; i < sample_cnt; ++i) { + result = dds_request_loan(writer2, (void**)&sample); + CU_ASSERT_FATAL(result == DDS_RETCODE_OK); + sample->xy.x = i; + sample->xy.y = (uint8_t)i + 1; + sample->z = (uint8_t)i + 2; + result = dds_write(writer2, sample); + CU_ASSERT_FATAL(result == DDS_RETCODE_OK); + } + } + + { + SC_Model* samples1[sample_cnt]; + dds_sample_info_t sinfo[sample_cnt]; + const dds_entity_t ws1 = dds_create_waitset(participant); + dds_entity_t rcond1 = dds_create_readcondition(reader1, DDS_ANY_STATE); + memset(samples1, 0x0, sizeof(samples1)); + dds_waitset_attach(ws1, rcond1, rcond1); + + int j; + int cnt = 0; + bool match = true; + while ( match && cnt < sample_cnt ) { + result = dds_waitset_wait(ws1, NULL, 0, DDS_SECS(10)); + CU_ASSERT_FATAL(result > 0); + result = dds_take(reader1, (void**)samples1, sinfo, sample_cnt, sample_cnt); + CU_ASSERT_FATAL(result > 0); + for (int i = 0; i < result; ++i) { + j = cnt + i; + if ( + samples1[i]->a != j || + samples1[i]->b != j + 1 || + samples1[i]->c != j + 2 + ) { + match = false; + } + } + cnt += result; + } + CU_ASSERT_FATAL(match); + CU_ASSERT_FATAL(cnt == sample_cnt); + } + { + PsmxType1* samples2[sample_cnt]; + dds_sample_info_t sinfo[sample_cnt]; + const dds_entity_t ws2 = dds_create_waitset(participant); + dds_entity_t rcond2 = dds_create_readcondition(reader2, DDS_ANY_STATE); + memset(samples2, 0x0, sizeof(samples2)); + dds_waitset_attach(ws2, rcond2, rcond2); + + int j; + int cnt = 0; + bool match = true; + while ( match && cnt < sample_cnt ) { + result = dds_waitset_wait(ws2, NULL, 0, DDS_SECS(10)); + CU_ASSERT_FATAL(result > 0); + result = dds_take(reader2, (void**)samples2, sinfo, sample_cnt, sample_cnt); + CU_ASSERT_FATAL(result > 0); + for (int i = 0; i < result; ++i) { + j = cnt + i; + if ( + samples2[i]->xy.x != j || + samples2[i]->xy.y != (uint8_t)j + 1 || + samples2[i]->z != (uint8_t)j + 2 + ) { + match = false; + } + } + cnt += result; + } + CU_ASSERT_FATAL(match); + CU_ASSERT_FATAL(cnt == sample_cnt); + } + } + dds_delete(domain); + } + { + // Test case using a config without specifying a locator. + dds_entity_t domain = -1; + { + // Create domain + struct ddsi_config cfg; + ddsi_config_init_default(&cfg); + cfg.psmx_instances = ddsrt_malloc(sizeof(*cfg.psmx_instances)); + cfg.psmx_instances->cfg.name = ddsrt_strdup("iox"); + cfg.psmx_instances->cfg.library = ddsrt_strdup("psmx_iox"); + cfg.psmx_instances->cfg.config = ddsrt_strdup("SERVICE_NAME=psmx1;"); // config witout a locator + cfg.psmx_instances->cfg.priority.isdefault = 1; + cfg.psmx_instances->cfg.priority.value = 1000000; + cfg.psmx_instances->next = NULL; + domain = dds_create_domain_with_rawconfig(domainId, &cfg); + config_psmx_elm_free(cfg.psmx_instances); + } + CU_ASSERT_FATAL(domain > 0); + CU_ASSERT_FATAL(domain_has_psmx_instance_name(domain, "CycloneDDS-IOX-PSMX")); + dds_entity_t participant = dds_create_participant(domainId, NULL, NULL); + CU_ASSERT_FATAL(participant > 0); + { + // Check that the second half of the locator is all zeros. + uint8_t locator_zero[16]; + size_t locator_size_half = sizeof(locator_zero) / 2; + memset(locator_zero, 0, sizeof(locator_zero)); + ddsi_locator_t locator_out; + CU_ASSERT_FATAL(domain_get_psmx_locator(domain, &locator_out)); + CU_ASSERT_FATAL(memcmp(locator_zero + locator_size_half, locator_out.address + locator_size_half, locator_size_half) == 0); + } + { + // Check that shared memory is enabled. + bool psmx_enabled; + char topicname[100]; + { + create_unique_topic_name("shared_memory", topicname, sizeof(topicname)); + dds_entity_t topic = dds_create_topic(participant, &SC_Model_desc, topicname, NULL, NULL); + CU_ASSERT_FATAL(topic > 0); + + dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(writer > 0); + psmx_enabled = endpoint_has_psmx_enabled(writer); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(writer)); + + dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(reader > 0); + psmx_enabled = endpoint_has_psmx_enabled(reader); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(reader)); + } + { + create_unique_topic_name("shared_memory", topicname, sizeof(topicname)); + dds_entity_t topic = dds_create_topic(participant, &PsmxType1_desc, topicname, NULL, NULL); + CU_ASSERT_FATAL(topic > 0); + + dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(writer > 0); + psmx_enabled = endpoint_has_psmx_enabled(writer); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(writer)); + + dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(reader > 0); + psmx_enabled = endpoint_has_psmx_enabled(reader); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(dds_is_shared_memory_available(reader)); + } + } + dds_delete(domain); + } + } else { + // No shared memory. + dds_entity_t domain = -1; + { + // Create domain + struct ddsi_config cfg; + ddsi_config_init_default(&cfg); + cfg.psmx_instances = ddsrt_malloc(sizeof(*cfg.psmx_instances)); + cfg.psmx_instances->cfg.name = ddsrt_strdup("cdds"); + cfg.psmx_instances->cfg.library = ddsrt_strdup("psmx_cdds"); + cfg.psmx_instances->cfg.config = ddsrt_strdup("SERVICE_NAME=cdds1;"); + cfg.psmx_instances->cfg.priority.isdefault = 1; + cfg.psmx_instances->cfg.priority.value = 1000000; + cfg.psmx_instances->next = NULL; + domain = dds_create_domain_with_rawconfig(domainId, &cfg); + config_psmx_elm_free(cfg.psmx_instances); + } + CU_ASSERT_FATAL(domain > 0); + CU_ASSERT_FATAL(domain_has_psmx_instance_name(domain, "cdds-psmx")); + dds_entity_t participant = dds_create_participant(domainId, NULL, NULL); + CU_ASSERT_FATAL(participant > 0); + { + // Check that shared memory is not enabled. + bool psmx_enabled; + char topicname[100]; + { + create_unique_topic_name("shared_memory", topicname, sizeof(topicname)); + dds_entity_t topic = dds_create_topic(participant, &SC_Model_desc, topicname, NULL, NULL); + CU_ASSERT_FATAL(topic > 0); + + dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(writer > 0); + psmx_enabled = endpoint_has_psmx_enabled(writer); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(!dds_is_shared_memory_available(writer)); + + dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(reader > 0); + psmx_enabled = endpoint_has_psmx_enabled(reader); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(!dds_is_shared_memory_available(reader)); + } + { + create_unique_topic_name("shared_memory", topicname, sizeof(topicname)); + dds_entity_t topic = dds_create_topic(participant, &PsmxType1_desc, topicname, NULL, NULL); + CU_ASSERT_FATAL(topic > 0); + + dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(writer > 0); + psmx_enabled = endpoint_has_psmx_enabled(writer); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(!dds_is_shared_memory_available(writer)); + + dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL); + CU_ASSERT_FATAL(reader > 0); + psmx_enabled = endpoint_has_psmx_enabled(reader); + CU_ASSERT_FATAL(psmx_enabled); + CU_ASSERT_FATAL(!dds_is_shared_memory_available(reader)); + } + } + dds_delete(domain); + } + #undef sample_cnt +} + CU_Test (ddsc_psmx, zero_copy) { const struct {