diff --git a/core/environment.c b/core/environment.c index c0715e5e1..a9e10e9f4 100644 --- a/core/environment.c +++ b/core/environment.c @@ -106,11 +106,11 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st * @brief Initialize the federation-specific parts of the environment struct. */ static void environment_init_federated(environment_t* env, int num_is_present_fields) { -// #ifdef FEDERATED_CENTRALIZED +#ifdef FEDERATED_NDT_ENABLED // FIXME: Create a queue saving tags instead of events. For now, ndt_q stores // dummy events. env->ndt_q = pqueue_tag_init(10); -// #endif +#endif // FEDERATED_NDT_ENABLED #ifdef FEDERATED_DECENTRALIZED env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*)); LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory"); @@ -170,7 +170,9 @@ void environment_free(environment_t* env) { pqueue_free(env->event_q); pqueue_free(env->recycle_q); pqueue_free(env->next_q); + #ifdef FEDERATED_NDT_ENABLED pqueue_tag_free(env->ndt_q); + #endif // FEDERATED_NDT_ENABLED environment_free_threaded(env); environment_free_single_threaded(env); diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 941590285..bfe68ec37 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -99,6 +99,8 @@ void usage(int argc, const char* argv[]) { lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n"); lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); + lf_print(" -v, --version The minimum required version of Lingua Franca."); + lf_print(" --ndt Turn on ndt optimization.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { @@ -170,8 +172,11 @@ int process_clock_sync_args(int argc, const char* argv[]) { } int process_args(int argc, const char* argv[]) { - for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--version") == 0) { + lf_print("%s", version_info); + return 0; + } else if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) { if (argc < i + 2) { lf_print_error("--id needs a string argument."); usage(argc, argv); @@ -232,6 +237,8 @@ int process_args(int argc, const char* argv[]) { rti.authentication_enabled = true; } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; + } else if (strcmp(argv[i], "--ndt" == 0)) { + rti.ndt_enabled = true; } else if (strcmp(argv[i], " ") == 0) { // Tolerate spaces continue; diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 1961deac1..8e0352ba6 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -335,6 +335,9 @@ void determine_the_ndt_condition() { } void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag) { + if (!rti_remote->ndt_enabled) { + return; + } // The RTI receives next_event_tag from the federated fed. // It has to send NDT messages to the upstream federates of fed // if the LTC message from an upstream federate is ealrier than the next_event_tag. @@ -1330,6 +1333,10 @@ int receive_set_up_information(int socket_id, uint16_t fed_id) { bool waiting_physical_action_information = true; bool waiting_udp_message_and_set_up_clock_sync = true; + if (!rti_remote->ndt_enabled) { + waiting_physical_action_information = false; + } + // Buffer for incoming messages. // This does not constrain the message size because messages // are forwarded piece by piece. @@ -1794,6 +1801,7 @@ void initialize_RTI(rti_remote_t *rti){ rti_remote->clock_sync_period_ns = MSEC(10); rti_remote->clock_sync_exchanges_per_interval = 10; rti_remote->authentication_enabled = false; + rti_remote->ndt_enabled = false; rti_remote->base.tracing_enabled = false; rti_remote->stop_in_progress = false; } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 55753fde2..bfe4370bf 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -35,7 +35,7 @@ ///////////////////////////////////////////// //// Data structures -static char version_info[] = "0.5.0"; // Simply use Lingua Franca version name now. +static char version_info[] = "0.5.0"; // Simply use Lingua Franca version name for now. typedef enum socket_type_t { TCP, @@ -163,6 +163,11 @@ typedef struct rti_remote_t { */ bool authentication_enabled; + /** + * Boolean indicating that NDT message is enabled. + */ + bool ndt_enabled; + /** * Boolean indicating that a stop request is already in progress. */ diff --git a/core/federated/federate.c b/core/federated/federate.c index 77ef59005..60b9588a9 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -67,7 +67,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include // For secure random number generation. #include // For HMAC-based authentication of federates. #endif +#ifdef FEDERATED_NDT_ENABLED #include "pqueue_tag.h" +#endif // FEDERATED_NDT_ENABLED // Global variables defined in tag.c: extern instant_t _lf_last_reported_unadjusted_physical_time_ns; @@ -423,12 +425,14 @@ int send_timed_message(environment_t* env, } // Insert the intended tag into the ndt_q to send LTC to the RTI quickly. +#ifdef FEDERATED_NDT_ENABLED if (pqueue_tag_size(env->ndt_q) != 0) { // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size // is not enough to know whether this federate using the NDT optimization or not. LF_PRINT_DEBUG("Insert NDT at the intended to send LTC and NET quickly."); pqueue_tag_insert_if_no_match(env->ndt_q, current_message_intended_tag); } +#endif // FEDERATED_NDT_ENABLED // Trace the event when tracing is enabled if (message_type == MSG_TYPE_TAGGED_MESSAGE) { @@ -1153,6 +1157,7 @@ void connect_to_rti(const char* hostname, int port) { // @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h send_neighbor_structure_to_RTI(_fed.socket_TCP_RTI); +#ifdef FEDERATED_NDT_ENABLED // Send whether this federate has a physical action to the RTI. uint16_t has_physical_action = _fed.min_delay_from_physical_action_to_federate_output != NEVER; unsigned char physical_action_message_buffer[1 + sizeof(uint16_t)]; @@ -1160,6 +1165,7 @@ void connect_to_rti(const char* hostname, int port) { encode_uint16(has_physical_action, &(physical_action_message_buffer[1])); write_to_socket_errexit(_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), physical_action_message_buffer, "Failed to send physical action info to RTI."); +#endif // FEDERATED_NDT_ENABLED uint16_t udp_port = setup_clock_synchronization_with_rti(); @@ -1420,6 +1426,7 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela // FIXME: If port absent messages are not used when there is no zero-delay cycle, // This part is not needed as we don't apply the NDT optimization for cycles. +#ifdef FEDERATED_NDT_ENABLED if (pqueue_tag_size(env->ndt_q) != 0 ) { // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size // is not enough to know whether this federate using the NDT optimization or not. @@ -1433,6 +1440,7 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela return; } } +#endif // FEDERATED_NDT_ENABLED // Construct the message size_t message_length = 1 + sizeof(port_ID) + sizeof(fed_ID) + sizeof(instant_t) + sizeof(microstep_t); @@ -1948,6 +1956,7 @@ void _lf_logical_tag_complete(tag_t tag_to_send) { environment_t *env; _lf_get_environments(&env); bool need_to_send_LTC = true; +#ifdef FEDERATED_NDT_ENABLED if (pqueue_tag_size(env->ndt_q) != 0 ) { // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size // is not enough to know whether this federate using the NDT optimization or not. @@ -1961,6 +1970,7 @@ void _lf_logical_tag_complete(tag_t tag_to_send) { need_to_send_LTC = false; } } +#endif // FEDERATED_NDT_ENABLED if (need_to_send_LTC) { LF_PRINT_LOG("Sending Logical Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time, @@ -2391,6 +2401,7 @@ void handle_stop_request_message() { lf_mutex_unlock(&outbound_socket_mutex); } +#ifdef FEDERATED_NDT_ENABLED /** * Handle a MSG_TYPE_NEXT_DOWNSTREAM_MESSAGE from the RTI * @@ -2425,6 +2436,7 @@ void handle_next_downstream_tag() { pqueue_tag_insert_if_no_match(env->ndt_q, env->current_tag); } } +#endif // FEDERATED_NDT_ENABLED /** * Close sockets used to communicate with other federates, if they are open, @@ -2643,9 +2655,11 @@ void* listen_to_rti_TCP(void* args) { case MSG_TYPE_PORT_ABSENT: handle_port_absent_message(_fed.socket_TCP_RTI, -1); break; +#ifdef FEDERATED_NDT_ENABLED case MSG_TYPE_NEXT_DOWNSTREAM_TAG: handle_next_downstream_tag(); break; +#endif // FEDERATED_NDT_ENABLED case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", @@ -2838,6 +2852,7 @@ tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply // If there is no downstream events that require the NET of the current tag, // do not send the NET. bool need_to_send_NET = true; +#ifdef FEDERATED_NDT_ENABLED if (pqueue_tag_size(env->ndt_q) != 0 ) { // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size // is not enough to know whether this federate using the NDT optimization or not. @@ -2851,6 +2866,7 @@ tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply need_to_send_NET = false; } } +#endif // FEDERATED_NDT_ENABLED if (need_to_send_NET) { _lf_send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag, wait_for_reply); _fed.last_sent_NET = tag; diff --git a/core/reactor_common.c b/core/reactor_common.c index 0f45b6103..111cab93a 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -293,7 +293,7 @@ void _lf_start_time_step(environment_t *env) { } } -// #ifdef FEDERATED_CENTRALIZED +#ifdef FEDERATED_NDT_ENABLED while (pqueue_tag_size(env->ndt_q) != 0 && lf_tag_compare(pqueue_tag_peek(env->ndt_q)->tag, env->current_tag) < 0) { // Remove elements of ndt_q with tag less than the current tag. @@ -302,7 +302,7 @@ void _lf_start_time_step(environment_t *env) { tag_to_remove.time - start_time, tag_to_remove.microstep, env->current_tag.time - start_time, env->current_tag.microstep); } -// #endif +#endif #ifdef FEDERATED_DECENTRALIZED for (int i = 0; i < env->is_present_fields_size; i++) { // FIXME: For now, an intended tag of (NEVER, 0) diff --git a/include/core/environment.h b/include/core/environment.h index 9684cb057..717c1b4ff 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -107,9 +107,9 @@ typedef struct environment_t { tag_t** _lf_intended_tag_fields; int _lf_intended_tag_fields_size; #endif // FEDERATED -// #ifdef FEDERATED_CENTRALIZED +#ifdef FEDERATED_NDT_ENABLED pqueue_tag_t* ndt_q; -// #endif // FEDERATED_CENTRALIZED +#endif // FEDERATED_NDT_ENABLED #ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef enclave_info_t *enclave_info; #endif