From 9d13f827b0232029b7b2701d46e1cbdd88b8f6d3 Mon Sep 17 00:00:00 2001
From: Michel van den Hoek <michel.vandenhoek@zettascale.tech>
Date: Thu, 29 Feb 2024 16:59:37 +0100
Subject: [PATCH] add psmx test to check whether shared memory can be enabled
 and the locator can be set

Signed-off-by: Michel van den Hoek <michel.vandenhoek@zettascale.tech>
---
 src/core/ddsc/tests/psmx.c | 434 +++++++++++++++++++++++++++++++++++++
 1 file changed, 434 insertions(+)

diff --git a/src/core/ddsc/tests/psmx.c b/src/core/ddsc/tests/psmx.c
index e54cd30db5..bf43d4c7e8 100644
--- a/src/core/ddsc/tests/psmx.c
+++ b/src/core/ddsc/tests/psmx.c
@@ -15,6 +15,7 @@
 #include "dds/ddsrt/md5.h"
 #include "dds/ddsrt/io.h"
 #include "dds/ddsrt/heap.h"
+#include "dds/ddsrt/string.h"
 #include "dds/ddsrt/bswap.h"
 #include "dds/ddsrt/environ.h"
 #include "dds/ddsrt/static_assert.h"
@@ -98,6 +99,67 @@ static dds_entity_t create_participant (dds_domainid_t int_dom)
   return pp;
 }
 
+static void config_psmx_free_content(struct ddsi_config_psmx* cfg)
+{
+  assert(cfg);
+  ddsrt_free(cfg->name);
+  ddsrt_free(cfg->library);
+  ddsrt_free(cfg->config);
+}
+
+static void config_psmx_elm_free(struct ddsi_config_psmx_listelem* elm)
+{
+  assert(elm);
+  config_psmx_free_content(&elm->cfg);
+  ddsrt_free(elm);
+}
+
+static bool domain_pin(dds_entity_t domain, dds_domain** dom_out)
+{
+  dds_entity* x = NULL;
+  dds_return_t rc = dds_entity_pin(domain, &x);
+  if ( rc == DDS_RETCODE_OK && dds_entity_kind(x) == DDS_KIND_DOMAIN ) {
+    *dom_out = (dds_domain*)x;
+    return true;
+  }
+  return false;
+}
+
+static void domain_unpin(dds_domain* dom)
+{
+  assert(dom);
+  dds_entity_unpin((dds_entity*)dom);
+}
+
+static bool domain_get_psmx_locator(dds_entity_t domain, ddsi_locator_t* l_out)
+{
+  dds_domain* dom = NULL;
+  bool ret = domain_pin(domain, &dom);
+  if ( ret ) {
+    memcpy(l_out, dom->psmx_instances.instances[0]->locator, sizeof(ddsi_locator_t));
+    domain_unpin(dom);
+  }
+  return ret;
+}
+
+static bool domain_has_psmx_instance_name(dds_entity_t domain, const char* inst_name)
+{
+  dds_domain* dom = NULL;
+  bool ret = domain_pin(domain, &dom);
+  bool match = false;
+  if ( ret ) {
+    uint32_t len = dom->psmx_instances.length;
+    for (uint32_t idx = 0; idx < len; ++idx) {
+      if ( strcmp(dom->psmx_instances.instances[idx]->instance_name, inst_name) == 0 ) {
+        match = true;
+        break;
+      }
+    }
+    domain_unpin(dom);
+  }
+  return ret && match;
+}
+
 struct tracebuf {
   char buf[512];
   size_t pos;
@@ -1264,6 +1326,378 @@ CU_Test (ddsc_psmx, basic)
   dds_delete (dds_get_parent (participant));
 }
 
+/// @brief Check that shared memory can be enabled and the used locator can be set.
+/// @methodology
+/// Case with Iceoryx:
+/// - Check that the data types I'm planning to use are actually suitable for use with shared memory.
+/// - Expectation: They are memcopy-safe.
+/// - Create a configuration with a psmx interface capable of shared memory and specify the locator.
+/// - Create a domain using this configuration.
+/// - Create some entities
+/// - Check the locator used for shared memory.
+/// - Expectation: The locator is the same as specified in the config for the domain.
+/// - Check if shared memory is enabled.
+/// - Expectation: Shared memory is enabled.
+/// - Write and read some topics.
+/// - Expectation: Sent data is received by readers.
+/// 
+/// - Create a configuration with a psmx interface capable of shared memory and don't specify a locator.
+/// - Create a domain using this configuration.
+/// - Create some entities
+/// - Check the locator used for shared memory.
+/// - Expectation: The second half of the locator is all zeros.
+/// - Check if shared memory is enabled.
+/// - Expectation: Shared memory is enabled.
+/// 
+/// Case with cdds:
+/// - Create a configuration with a psmx interface incapable of shared memory.
+/// - Create a domain using this configuration.
+/// - Create some entities
+/// - Check if shared memory is enabled.
+/// - Expectation: Shared memory is not enabled.
+///
+CU_Test(ddsc_psmx, shared_memory)
+{
+  #define sample_cnt 8
+  {
+    // Check that the data types I'm planning to use are actually suitable for use with shared memory.
+    dds_data_type_properties_t props;
+    props = dds_stream_data_types(SC_Model_desc.m_ops);
+    CU_ASSERT_FATAL((props & DDS_DATA_TYPE_IS_MEMCPY_SAFE) == DDS_DATA_TYPE_IS_MEMCPY_SAFE);
+    props = dds_stream_data_types(PsmxType1_desc.m_ops);
+    CU_ASSERT_FATAL((props & DDS_DATA_TYPE_IS_MEMCPY_SAFE) == DDS_DATA_TYPE_IS_MEMCPY_SAFE);
+  }
+
+  const dds_domainid_t domainId = 42;
+  char* CDDS_PSMX_NAME = ddsrt_expand_envvars("${CDDS_PSMX_NAME:-cdds}", domainId);
+  bool using_psmx_iox = (strcmp(CDDS_PSMX_NAME, "iox") == 0);
+  ddsrt_free(CDDS_PSMX_NAME);
+
+  if ( using_psmx_iox ) {
+    {
+      // Test case using a config with specified locator.
+      uint8_t locator_in[16];
+      memset(locator_in, 0x0, sizeof(locator_in)); // avoid warning 'uninitialized value'
+      ((uint64_t*)locator_in)[0] = (uint64_t)0x4a4d203df6996395;
+      ((uint64_t*)locator_in)[1] = (uint64_t)0xe1412fbecc2de4b6;
+      char config_str[100];
+      {
+        // Create the config string including a locator.
+        char locator_str[50];
+        uint8_t* l = locator_in;
+        snprintf(
+          locator_str,
+          sizeof(locator_str),
+          "LOCATOR=%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+          l[0], l[1], l[2], l[3], l[4], l[5], l[6], l[7], l[8], l[9], l[10], l[11], l[12], l[13], l[14], l[15]
+        );
+        snprintf(
+          config_str,
+          sizeof(config_str),
+          "SERVICE_NAME=psmx1;%s",
+          locator_str
+        );
+      }
+      dds_entity_t domain = -1;
+      {
+        // Create domain
+        struct ddsi_config cfg;
+        ddsi_config_init_default(&cfg);
+        cfg.psmx_instances = ddsrt_malloc(sizeof(*cfg.psmx_instances));
+        cfg.psmx_instances->cfg.name = ddsrt_strdup("iox");
+        cfg.psmx_instances->cfg.library = ddsrt_strdup("psmx_iox");
+        cfg.psmx_instances->cfg.config = ddsrt_strdup(config_str); // config including a locator
+        cfg.psmx_instances->cfg.priority.isdefault = 1;
+        cfg.psmx_instances->cfg.priority.value = 1000000;
+        cfg.psmx_instances->next = NULL;
+        domain = dds_create_domain_with_rawconfig(domainId, &cfg);
+        config_psmx_elm_free(cfg.psmx_instances);
+      }
+      CU_ASSERT_FATAL(domain > 0);
+      CU_ASSERT_FATAL(domain_has_psmx_instance_name(domain, "CycloneDDS-IOX-PSMX"));
+      dds_entity_t participant = dds_create_participant(domainId, NULL, NULL);
+      CU_ASSERT_FATAL(participant > 0);
+      dds_entity_t writer1, reader1, writer2, reader2;
+      {
+        char topicname[100];
+        dds_qos_t* qos = dds_create_qos();
+        dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, sample_cnt);
+
+        create_unique_topic_name("shared_memory", topicname, sizeof(topicname));
+        dds_entity_t topic1 = dds_create_topic(participant, &SC_Model_desc, topicname, qos, NULL);
+        CU_ASSERT_FATAL(topic1 > 0);
+
+        writer1 = dds_create_writer(participant, topic1, qos, NULL);
+        CU_ASSERT_FATAL(writer1 > 0);
+
+        reader1 = dds_create_reader(participant, topic1, qos, NULL);
+        CU_ASSERT_FATAL(reader1 > 0);
+
+        create_unique_topic_name("shared_memory", topicname, sizeof(topicname));
+        dds_entity_t topic2 = dds_create_topic(participant, &PsmxType1_desc, topicname, qos, NULL);
+        CU_ASSERT_FATAL(topic2 > 0);
+
+        writer2 = dds_create_writer(participant, topic2, qos, NULL);
+        CU_ASSERT_FATAL(writer2 > 0);
+
+        reader2 = dds_create_reader(participant, topic2, qos, NULL);
+        CU_ASSERT_FATAL(reader2 > 0);
+        dds_delete_qos(qos);
+      }
+      {
+        // Check that I get the same locator that I provided with the config.
+        ddsi_locator_t locator_out;
+        CU_ASSERT_FATAL(domain_get_psmx_locator(domain, &locator_out));
+        CU_ASSERT_FATAL(memcmp(locator_in, locator_out.address, sizeof(locator_in)) == 0);
+      }
+      {
+        // Check that shared memory is enabled.
+        bool psmx_enabled;
+        psmx_enabled = endpoint_has_psmx_enabled(writer1);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(dds_is_shared_memory_available(writer1));
+
+        psmx_enabled = endpoint_has_psmx_enabled(reader1);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(dds_is_shared_memory_available(reader1));
+
+        psmx_enabled = endpoint_has_psmx_enabled(writer2);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(dds_is_shared_memory_available(writer2));
+
+        psmx_enabled = endpoint_has_psmx_enabled(reader2);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(dds_is_shared_memory_available(reader2));
+      }
+      {
+        // Write and read samples
+        dds_return_t result;
+        {
+          SC_Model* sample;
+          for (int i = 0; i < sample_cnt; ++i) {
+            result = dds_request_loan(writer1, (void**)&sample);
+            CU_ASSERT_FATAL(result == DDS_RETCODE_OK);
+            sample->a = (uint8_t)i;
+            sample->b = (uint8_t)i + 1;
+            sample->c = (uint8_t)i + 2;
+            result = dds_write(writer1, sample);
+            CU_ASSERT_FATAL(result == DDS_RETCODE_OK);
+          }
+        }
+        {
+          PsmxType1* sample;
+          for (int i = 0; i < sample_cnt; ++i) {
+            result = dds_request_loan(writer2, (void**)&sample);
+            CU_ASSERT_FATAL(result == DDS_RETCODE_OK);
+            sample->xy.x = i;
+            sample->xy.y = (uint8_t)i + 1;
+            sample->z = (uint8_t)i + 2;
+            result = dds_write(writer2, sample);
+            CU_ASSERT_FATAL(result == DDS_RETCODE_OK);
+          }
+        }
+
+        {
+          SC_Model* samples1[sample_cnt];
+          dds_sample_info_t sinfo[sample_cnt];
+          const dds_entity_t ws1 = dds_create_waitset(participant);
+          dds_entity_t rcond1 = dds_create_readcondition(reader1, DDS_ANY_STATE);
+          memset(samples1, 0x0, sizeof(samples1));
+          dds_waitset_attach(ws1, rcond1, rcond1);
+
+          int j;
+          int cnt = 0;
+          bool match = true;
+          while ( match && cnt < sample_cnt ) {
+            result = dds_waitset_wait(ws1, NULL, 0, DDS_SECS(10));
+            CU_ASSERT_FATAL(result > 0);
+            result = dds_take(reader1, (void**)samples1, sinfo, sample_cnt, sample_cnt);
+            CU_ASSERT_FATAL(result > 0);
+            for (int i = 0; i < result; ++i) {
+              j = cnt + i;
+              if (
+                samples1[i]->a != j ||
+                samples1[i]->b != j + 1 ||
+                samples1[i]->c != j + 2
+              ) {
+                match = false;
+              }
+            }
+            cnt += result;
+          }
+          CU_ASSERT_FATAL(match);
+          CU_ASSERT_FATAL(cnt == sample_cnt);
+        }
+        {
+          PsmxType1* samples2[sample_cnt];
+          dds_sample_info_t sinfo[sample_cnt];
+          const dds_entity_t ws2 = dds_create_waitset(participant);
+          dds_entity_t rcond2 = dds_create_readcondition(reader2, DDS_ANY_STATE);
+          memset(samples2, 0x0, sizeof(samples2));
+          dds_waitset_attach(ws2, rcond2, rcond2);
+
+          int j;
+          int cnt = 0;
+          bool match = true;
+          while ( match && cnt < sample_cnt ) {
+            result = dds_waitset_wait(ws2, NULL, 0, DDS_SECS(10));
+            CU_ASSERT_FATAL(result > 0);
+            result = dds_take(reader2, (void**)samples2, sinfo, sample_cnt, sample_cnt);
+            CU_ASSERT_FATAL(result > 0);
+            for (int i = 0; i < result; ++i) {
+              j = cnt + i;
+              if (
+                samples2[i]->xy.x != j ||
+                samples2[i]->xy.y != (uint8_t)j + 1 ||
+                samples2[i]->z != (uint8_t)j + 2
+              ) {
+                match = false;
+              }
+            }
+            cnt += result;
+          }
+          CU_ASSERT_FATAL(match);
+          CU_ASSERT_FATAL(cnt == sample_cnt);
+        }
+      }
+      dds_delete(domain);
+    }
+    {
+      // Test case using a config without specifying a locator.
+      dds_entity_t domain = -1;
+      {
+        // Create domain
+        struct ddsi_config cfg;
+        ddsi_config_init_default(&cfg);
+        cfg.psmx_instances = ddsrt_malloc(sizeof(*cfg.psmx_instances));
+        cfg.psmx_instances->cfg.name = ddsrt_strdup("iox");
+        cfg.psmx_instances->cfg.library = ddsrt_strdup("psmx_iox");
+        cfg.psmx_instances->cfg.config = ddsrt_strdup("SERVICE_NAME=psmx1;"); // config witout a locator
+        cfg.psmx_instances->cfg.priority.isdefault = 1;
+        cfg.psmx_instances->cfg.priority.value = 1000000;
+        cfg.psmx_instances->next = NULL;
+        domain = dds_create_domain_with_rawconfig(domainId, &cfg);
+        config_psmx_elm_free(cfg.psmx_instances);
+      }
+      CU_ASSERT_FATAL(domain > 0);
+      CU_ASSERT_FATAL(domain_has_psmx_instance_name(domain, "CycloneDDS-IOX-PSMX"));
+      dds_entity_t participant = dds_create_participant(domainId, NULL, NULL);
+      CU_ASSERT_FATAL(participant > 0);
+      {
+        // Check that the second half of the locator is all zeros.
+        uint8_t locator_zero[16];
+        size_t locator_size_half = sizeof(locator_zero) / 2;
+        memset(locator_zero, 0, sizeof(locator_zero));
+        ddsi_locator_t locator_out;
+        CU_ASSERT_FATAL(domain_get_psmx_locator(domain, &locator_out));
+        CU_ASSERT_FATAL(memcmp(locator_zero + locator_size_half, locator_out.address + locator_size_half, locator_size_half) == 0);
+      }
+      {
+        // Check that shared memory is enabled.
+        bool psmx_enabled;
+        char topicname[100];
+        {
+          create_unique_topic_name("shared_memory", topicname, sizeof(topicname));
+          dds_entity_t topic = dds_create_topic(participant, &SC_Model_desc, topicname, NULL, NULL);
+          CU_ASSERT_FATAL(topic > 0);
+
+          dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL);
+          CU_ASSERT_FATAL(writer > 0);
+          psmx_enabled = endpoint_has_psmx_enabled(writer);
+          CU_ASSERT_FATAL(psmx_enabled);
+          CU_ASSERT_FATAL(dds_is_shared_memory_available(writer));
+
+          dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL);
+          CU_ASSERT_FATAL(reader > 0);
+          psmx_enabled = endpoint_has_psmx_enabled(reader);
+          CU_ASSERT_FATAL(psmx_enabled);
+          CU_ASSERT_FATAL(dds_is_shared_memory_available(reader));
+        }
+        {
+          create_unique_topic_name("shared_memory", topicname, sizeof(topicname));
+          dds_entity_t topic = dds_create_topic(participant, &PsmxType1_desc, topicname, NULL, NULL);
+          CU_ASSERT_FATAL(topic > 0);
+
+          dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL);
+          CU_ASSERT_FATAL(writer > 0);
+          psmx_enabled = endpoint_has_psmx_enabled(writer);
+          CU_ASSERT_FATAL(psmx_enabled);
+          CU_ASSERT_FATAL(dds_is_shared_memory_available(writer));
+
+          dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL);
+          CU_ASSERT_FATAL(reader > 0);
+          psmx_enabled = endpoint_has_psmx_enabled(reader);
+          CU_ASSERT_FATAL(psmx_enabled);
+          CU_ASSERT_FATAL(dds_is_shared_memory_available(reader));
+        }
+      }
+      dds_delete(domain);
+    }
+  } else {
+    // No shared memory.
+    dds_entity_t domain = -1;
+    {
+      // Create domain
+      struct ddsi_config cfg;
+      ddsi_config_init_default(&cfg);
+      cfg.psmx_instances = ddsrt_malloc(sizeof(*cfg.psmx_instances));
+      cfg.psmx_instances->cfg.name = ddsrt_strdup("cdds");
+      cfg.psmx_instances->cfg.library = ddsrt_strdup("psmx_cdds");
+      cfg.psmx_instances->cfg.config = ddsrt_strdup("SERVICE_NAME=cdds1;");
+      cfg.psmx_instances->cfg.priority.isdefault = 1;
+      cfg.psmx_instances->cfg.priority.value = 1000000;
+      cfg.psmx_instances->next = NULL;
+      domain = dds_create_domain_with_rawconfig(domainId, &cfg);
+      config_psmx_elm_free(cfg.psmx_instances);
+    }
+    CU_ASSERT_FATAL(domain > 0);
+    CU_ASSERT_FATAL(domain_has_psmx_instance_name(domain, "cdds-psmx"));
+    dds_entity_t participant = dds_create_participant(domainId, NULL, NULL);
+    CU_ASSERT_FATAL(participant > 0);
+    {
+      // Check that shared memory is not enabled.
+      bool psmx_enabled;
+      char topicname[100];
+      {
+        create_unique_topic_name("shared_memory", topicname, sizeof(topicname));
+        dds_entity_t topic = dds_create_topic(participant, &SC_Model_desc, topicname, NULL, NULL);
+        CU_ASSERT_FATAL(topic > 0);
+
+        dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL);
+        CU_ASSERT_FATAL(writer > 0);
+        psmx_enabled = endpoint_has_psmx_enabled(writer);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(!dds_is_shared_memory_available(writer));
+
+        dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL);
+        CU_ASSERT_FATAL(reader > 0);
+        psmx_enabled = endpoint_has_psmx_enabled(reader);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(!dds_is_shared_memory_available(reader));
+      }
+      {
+        create_unique_topic_name("shared_memory", topicname, sizeof(topicname));
+        dds_entity_t topic = dds_create_topic(participant, &PsmxType1_desc, topicname, NULL, NULL);
+        CU_ASSERT_FATAL(topic > 0);
+
+        dds_entity_t writer = dds_create_writer(participant, topic, NULL, NULL);
+        CU_ASSERT_FATAL(writer > 0);
+        psmx_enabled = endpoint_has_psmx_enabled(writer);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(!dds_is_shared_memory_available(writer));
+
+        dds_entity_t reader = dds_create_reader(participant, topic, NULL, NULL);
+        CU_ASSERT_FATAL(reader > 0);
+        psmx_enabled = endpoint_has_psmx_enabled(reader);
+        CU_ASSERT_FATAL(psmx_enabled);
+        CU_ASSERT_FATAL(!dds_is_shared_memory_available(reader));
+      }
+    }
+    dds_delete(domain);
+  }
+  #undef sample_cnt
+}
+
 CU_Test (ddsc_psmx, zero_copy)
 {
   const struct {