Skip to content

Commit

Permalink
Initialized ndt queue in the same manner with the event queue
Browse files Browse the repository at this point in the history
For reusing the pqueue.c, save ndt_queue stores dummy events that has
the tag of NDTs. Nonetheless, current pqueue.c can only use time as the
priority. So new pqueue should be implemented.
  • Loading branch information
byeonggiljun committed Sep 10, 2023
1 parent 19969eb commit 79eef2a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 3 deletions.
6 changes: 4 additions & 2 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED_CENTRALIZED
// TODO: init ndt_queue with proper functions.
// env->ndt_queue = pqueue_init(10, tag_in_reverse_order, )
// FIXME: Create a queue saving tags instead of events. For now, ndt_q stores
// dummy events.
env->ndt_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time,
get_event_position, set_event_position, event_matches, print_event);
#endif
#ifdef FEDERATED_DECENTRALIZED
env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*));
Expand Down
10 changes: 10 additions & 0 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,16 @@ void handle_next_downstream_tag() {

LF_PRINT_LOG("Received from RTI a MSG_TYPE_NEXT_DOWNSTREAM_TAG message with elapsed tag " PRINTF_TAG ".",
NDT.time - start_time, NDT.microstep);
environment_t* env;
_lf_get_environments(&env);

if (lf_tag_compare(env->current_tag, NDT) < 0) {
// The current tag is less than NDT. Push NDT to ndt_q.
event_t* dummy = _lf_create_dummy_events(env, NULL, NDT.time, NULL, NDT.microstep);
pqueue_insert(env->ndt_q, dummy);
} else {
// The current tag is greater than or equal to NDT. Send LTC, NET, and ABS messages.
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ void _lf_start_time_step(environment_t *env) {
}
}

#ifdef FEDERATED_CENTRALIZED
while (lf_tag_compare(((event_t*) pqueue_peek(env->ndt_q))->time, env->current_tag.time) < 0) {
// Remove elements of ndt_q with tag less than the current tag.
pqueue_remove(env->ndt_q, pqueue_peek(env->ndt_q));
}
#endif
#ifdef FEDERATED_DECENTRALIZED
for (int i = 0; i < env->is_present_fields_size; i++) {
// FIXME: For now, an intended tag of (NEVER, 0)
Expand Down
2 changes: 1 addition & 1 deletion include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ typedef struct environment_t {
int _lf_intended_tag_fields_size;
#endif // FEDERATED
#ifdef FEDERATED_CENTRALIZED
pqueue_t* ndt_queue;
pqueue_t* ndt_q;
#endif // FEDERATED_CENTRALIZED
} environment_t;

Expand Down

0 comments on commit 79eef2a

Please sign in to comment.