Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The frequency of subscription topic triggering is inconsistent with the frequency of publishing #2075

Open
yanzhang920817 opened this issue Aug 22, 2024 · 0 comments

Comments

@yanzhang920817
Copy link

yanzhang920817 commented Aug 22, 2024

I subscribed to two topics according to examples/listtopics/listtopics.c, but encountered two strange problems. The first topic settings/controlMode could not print char *, and the second topic rt/clearError was triggered just after it was created. Moreover, when the publisher published once per second, it was not triggered once every 1s, but seemed to be triggered dozens of times. I don't know why?
process_topic function:

static bool process_topic(dds_entity_t readcond) {
#define MAXCOUNT 10
    void *samples[MAXCOUNT] = { NULL };
    dds_sample_info_t infos[MAXCOUNT];
    samples[0] = NULL;
    int32_t n = dds_take(readcond, samples, infos, MAXCOUNT, MAXCOUNT);
    //int32_t n = dds_read(readcond, samples, infos, MAXCOUNT, MAXCOUNT);
    bool topics_seen = false;
    for (int32_t i = 0; i < n; i++) {
        //dds_builtintopic_topic_t const * const sample = samples[i];
        dds_builtintopic_topic_t const * const sample = static_cast<dds_builtintopic_topic_t*>(samples[i]);
        struct keystr gs;
        printf ("%s: %s", instance_state_str(infos[i].instance_state), keystr(&gs, &sample->key));
        if (infos[i].valid_data) {
            printf (" %s %s", sample->topic_name, sample->type_name);
            if (strcmp(sample->topic_name, "settings/controlMode") == 0) {
                topics_seen = true;
                printf("\n#################################please enable the robot####################################\n");
                settings_controlMode_msg *msg = static_cast<settings_controlMode_msg*>(samples[i]);
                if (msg == NULL) {
                    printf("msg is null\n");
                } else {
                    if (msg->controlMode == NULL) {
                        printf("cm is null\n");
                    } else {
                        printf("all are not null, msg=%p,cm=%p\n", msg, msg->controlMode);
                        printf("Received message: controlMode = %s\n", msg->controlMode);
                    }
                }
            } else if (strcmp(sample->topic_name, "rt/clearError") == 0) {
                printf("\n#################################please clear error####################################\n");
                ClearError *msg = static_cast<ClearError*>(samples[i]);
                printf("msg id =%d\n", msg->msgid);
            }
        }
        printf ("\n");
    }
    (void) dds_return_loan (readcond, samples, n);
#undef MAXCOUNT
    return topics_seen;
}

`

ListenOperation function:

void *ListenOperation(void *arg)
{
    void **args = (void**)arg;
    LowCmd_ *lowCmd = (LowCmd_ *)args[0];
    pthread_mutex_t *lock = (pthread_mutex_t*)args[1];

    dds_entity_t participant;
    dds_entity_t topic;


    void *samples[MAX_SAMPLES];
    dds_sample_info_t infos[MAX_SAMPLES];
    dds_return_t rc;
    dds_qos_t *qos;
    bool topics_seen = false;
    bool endpoints_exist = false;

    /* Create a Participant. */
    participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
    if (participant < 0)
        DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));

    printf("%s: create participant successfully\n", __FUNCTION__);
    const dds_entity_t waitset = dds_create_waitset(participant);
#if 1
    const dds_entity_t reader = dds_create_reader(participant, DDS_BUILTIN_TOPIC_DCPSTOPIC, NULL, NULL);
    if (reader < 0)
    {
        if (reader == DDS_RETCODE_UNSUPPORTED)
            fprintf (stderr, "Topic discovery is not included in the build, rebuild with ENABLE_TOPIC_DISCOVERY=ON\n");
        else
            fprintf (stderr, "dds_create_reader(DCPSTopic): %s\n", dds_strretcode (reader));
        dds_delete (participant);
        pthread_exit(NULL);
    }
#endif
    //const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
    const dds_entity_t readcond = dds_create_readcondition (reader, DDS_NOT_READ_SAMPLE_STATE);
    (void) dds_waitset_attach (waitset, readcond, 0);


    const dds_topic_descriptor_t *type_desc[MAX_SUB_TOPICS] = {
        &settings_controlMode_msg_desc, &ClearError_desc};
    //static const dds_entity_t topic_names[MAX_SUB_TOPICS] = {
    const char *topic_names[MAX_SUB_TOPICS] = {
        "setttings/controlMode", "rt/clearError"};
    dds_entity_t topic_entities[MAX_SUB_TOPICS];
    dds_entity_t topic_readers[MAX_SUB_TOPICS];
    dds_entity_t topic_readconds[MAX_SUB_TOPICS];

    for (size_t i = 0; i < MAX_SUB_TOPICS; i++) {
        topic_entities[i] = dds_create_topic(participant, type_desc[i], topic_names[i], NULL, NULL);
        if (topic_entities[i] < 0) {
            fprintf(stderr, "dds_create_topic(%s): %s\n", topic_names[i], dds_strretcode(topic_entities[i]));
            dds_delete(participant);
            pthread_exit(NULL);
        }
        printf("%s [%zu]: create topic successfully\n", __FUNCTION__, i);
        topic_readers[i] = dds_create_reader(participant, topic_entities[i], NULL, NULL);
        if (topic_readers[i] < 0) {
            fprintf (stderr, "dds_create_reader(%s): %s\n", topic_names[i], dds_strretcode(topic_readers[i]));
            dds_delete(participant);
            pthread_exit(NULL);
        }
        printf("%s [%zu]: create reader successfully\n", __FUNCTION__, i);
        topic_readconds[i] = dds_create_readcondition(topic_readers[i], DDS_ANY_STATE);
        (void) dds_waitset_attach(waitset, topic_readconds[i], 0);
    }


    printf("%s is running\n", __FUNCTION__);
    /* Monitor topic creation/deletion indefinitely */
#if 1
    while (1) {
        if (dds_waitset_wait_until(waitset, NULL, 0, DDS_INFINITY) > 0) {
            printf("-------------------------------------------------------------------\n");
            if (process_topic(readcond)) {
                topics_seen = true;
                printf("--------------------------topics_seen-----------------------------------------\n");
            }
        }
    }
#else
    const dds_time_t tstop = dds_time() + DDS_SECS (1000000);
    while (dds_waitset_wait_until (waitset, NULL, 0, tstop) > 0) {
        printf("-------------------------------------------------------------------\n");
        if (process_topic(readcond)) {
           topics_seen = true;
            printf("--------------------------topics_seen-----------------------------------------\n");
        }
    }
#endif
    dds_delete (participant);
    pthread_exit(NULL);
}
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant