From 79eef2a65512a439bb04cb1e5ebf1ed01aabac08 Mon Sep 17 00:00:00 2001 From: Byeong-gil Jun Date: Sun, 10 Sep 2023 11:59:30 +0900 Subject: [PATCH] Initialized ndt queue in the same manner with the event queue For reusing the pqueue.c, save ndt_queue stores dummy events that has the tag of NDTs. Nonetheless, current pqueue.c can only use time as the priority. So new pqueue should be implemented. --- core/environment.c | 6 ++++-- core/federated/federate.c | 10 ++++++++++ core/reactor_common.c | 6 ++++++ include/core/environment.h | 2 +- 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/core/environment.c b/core/environment.c index f04b7e942..9c569633b 100644 --- a/core/environment.c +++ b/core/environment.c @@ -107,8 +107,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st */ static void environment_init_federated(environment_t* env, int num_is_present_fields) { #ifdef FEDERATED_CENTRALIZED - // TODO: init ndt_queue with proper functions. - // env->ndt_queue = pqueue_init(10, tag_in_reverse_order, ) + // FIXME: Create a queue saving tags instead of events. For now, ndt_q stores + // dummy events. + env->ndt_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, + get_event_position, set_event_position, event_matches, print_event); #endif #ifdef FEDERATED_DECENTRALIZED env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*)); diff --git a/core/federated/federate.c b/core/federated/federate.c index bd7e7142f..da013fb5d 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2321,6 +2321,16 @@ void handle_next_downstream_tag() { LF_PRINT_LOG("Received from RTI a MSG_TYPE_NEXT_DOWNSTREAM_TAG message with elapsed tag " PRINTF_TAG ".", NDT.time - start_time, NDT.microstep); + environment_t* env; + _lf_get_environments(&env); + + if (lf_tag_compare(env->current_tag, NDT) < 0) { + // The current tag is less than NDT. Push NDT to ndt_q. + event_t* dummy = _lf_create_dummy_events(env, NULL, NDT.time, NULL, NDT.microstep); + pqueue_insert(env->ndt_q, dummy); + } else { + // The current tag is greater than or equal to NDT. Send LTC, NET, and ABS messages. + } } /** diff --git a/core/reactor_common.c b/core/reactor_common.c index b7a3c9ac7..6f7d33d4d 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -292,6 +292,12 @@ void _lf_start_time_step(environment_t *env) { } } +#ifdef FEDERATED_CENTRALIZED + while (lf_tag_compare(((event_t*) pqueue_peek(env->ndt_q))->time, env->current_tag.time) < 0) { + // Remove elements of ndt_q with tag less than the current tag. + pqueue_remove(env->ndt_q, pqueue_peek(env->ndt_q)); + } +#endif #ifdef FEDERATED_DECENTRALIZED for (int i = 0; i < env->is_present_fields_size; i++) { // FIXME: For now, an intended tag of (NEVER, 0) diff --git a/include/core/environment.h b/include/core/environment.h index 6fb98ee48..62476f540 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -106,7 +106,7 @@ typedef struct environment_t { int _lf_intended_tag_fields_size; #endif // FEDERATED #ifdef FEDERATED_CENTRALIZED - pqueue_t* ndt_queue; + pqueue_t* ndt_q; #endif // FEDERATED_CENTRALIZED } environment_t;