Skip to content

Commit

Permalink
Make the NDT things optional in the RTI and federates
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun committed Nov 27, 2023
1 parent e337d38 commit db9db08
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 9 deletions.
6 changes: 4 additions & 2 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
// #ifdef FEDERATED_CENTRALIZED
#ifdef FEDERATED_NDT_ENABLED
// FIXME: Create a queue saving tags instead of events. For now, ndt_q stores
// dummy events.
env->ndt_q = pqueue_tag_init(10);
// #endif
#endif // FEDERATED_NDT_ENABLED
#ifdef FEDERATED_DECENTRALIZED
env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory");
Expand Down Expand Up @@ -170,7 +170,9 @@ void environment_free(environment_t* env) {
pqueue_free(env->event_q);
pqueue_free(env->recycle_q);
pqueue_free(env->next_q);
#ifdef FEDERATED_NDT_ENABLED
pqueue_tag_free(env->ndt_q);
#endif // FEDERATED_NDT_ENABLED

environment_free_threaded(env);
environment_free_single_threaded(env);
Expand Down
11 changes: 9 additions & 2 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ void usage(int argc, const char* argv[]) {
lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n");
lf_print(" -a, --auth Turn on HMAC authentication options.\n");
lf_print(" -t, --tracing Turn on tracing.\n");
lf_print(" -v, --version The minimum required version of Lingua Franca.");
lf_print(" --ndt Turn on ndt optimization.\n");

lf_print("Command given:");
for (int i = 0; i < argc; i++) {
Expand Down Expand Up @@ -170,8 +172,11 @@ int process_clock_sync_args(int argc, const char* argv[]) {
}

int process_args(int argc, const char* argv[]) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--version") == 0) {
lf_print("%s", version_info);
return 0;
} else if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) {
if (argc < i + 2) {
lf_print_error("--id needs a string argument.");
usage(argc, argv);
Expand Down Expand Up @@ -232,6 +237,8 @@ int process_args(int argc, const char* argv[]) {
rti.authentication_enabled = true;
} else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) {
rti.base.tracing_enabled = true;
} else if (strcmp(argv[i], "--ndt" == 0)) {
rti.ndt_enabled = true;
} else if (strcmp(argv[i], " ") == 0) {
// Tolerate spaces
continue;
Expand Down
8 changes: 8 additions & 0 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ void determine_the_ndt_condition() {
}

void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag) {
if (!rti_remote->ndt_enabled) {
return;
}
// The RTI receives next_event_tag from the federated fed.
// It has to send NDT messages to the upstream federates of fed
// if the LTC message from an upstream federate is ealrier than the next_event_tag.
Expand Down Expand Up @@ -1330,6 +1333,10 @@ int receive_set_up_information(int socket_id, uint16_t fed_id) {
bool waiting_physical_action_information = true;
bool waiting_udp_message_and_set_up_clock_sync = true;

if (!rti_remote->ndt_enabled) {
waiting_physical_action_information = false;
}

// Buffer for incoming messages.
// This does not constrain the message size because messages
// are forwarded piece by piece.
Expand Down Expand Up @@ -1794,6 +1801,7 @@ void initialize_RTI(rti_remote_t *rti){
rti_remote->clock_sync_period_ns = MSEC(10);
rti_remote->clock_sync_exchanges_per_interval = 10;
rti_remote->authentication_enabled = false;
rti_remote->ndt_enabled = false;
rti_remote->base.tracing_enabled = false;
rti_remote->stop_in_progress = false;
}
Expand Down
7 changes: 6 additions & 1 deletion core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/////////////////////////////////////////////
//// Data structures

static char version_info[] = "0.5.0"; // Simply use Lingua Franca version name now.
static char version_info[] = "0.5.0"; // Simply use Lingua Franca version name for now.

typedef enum socket_type_t {
TCP,
Expand Down Expand Up @@ -163,6 +163,11 @@ typedef struct rti_remote_t {
*/
bool authentication_enabled;

/**
* Boolean indicating that NDT message is enabled.
*/
bool ndt_enabled;

/**
* Boolean indicating that a stop request is already in progress.
*/
Expand Down
16 changes: 16 additions & 0 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <openssl/rand.h> // For secure random number generation.
#include <openssl/hmac.h> // For HMAC-based authentication of federates.
#endif
#ifdef FEDERATED_NDT_ENABLED
#include "pqueue_tag.h"
#endif // FEDERATED_NDT_ENABLED

// Global variables defined in tag.c:
extern instant_t _lf_last_reported_unadjusted_physical_time_ns;
Expand Down Expand Up @@ -423,12 +425,14 @@ int send_timed_message(environment_t* env,
}

// Insert the intended tag into the ndt_q to send LTC to the RTI quickly.
#ifdef FEDERATED_NDT_ENABLED
if (pqueue_tag_size(env->ndt_q) != 0) {
// FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size
// is not enough to know whether this federate using the NDT optimization or not.
LF_PRINT_DEBUG("Insert NDT at the intended to send LTC and NET quickly.");
pqueue_tag_insert_if_no_match(env->ndt_q, current_message_intended_tag);
}
#endif // FEDERATED_NDT_ENABLED

// Trace the event when tracing is enabled
if (message_type == MSG_TYPE_TAGGED_MESSAGE) {
Expand Down Expand Up @@ -1153,13 +1157,15 @@ void connect_to_rti(const char* hostname, int port) {
// @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h
send_neighbor_structure_to_RTI(_fed.socket_TCP_RTI);

#ifdef FEDERATED_NDT_ENABLED
// Send whether this federate has a physical action to the RTI.
uint16_t has_physical_action = _fed.min_delay_from_physical_action_to_federate_output != NEVER;
unsigned char physical_action_message_buffer[1 + sizeof(uint16_t)];
physical_action_message_buffer[0] = MSG_TYPE_PHYSICAL_ACTION;
encode_uint16(has_physical_action, &(physical_action_message_buffer[1]));
write_to_socket_errexit(_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), physical_action_message_buffer,
"Failed to send physical action info to RTI.");
#endif // FEDERATED_NDT_ENABLED

uint16_t udp_port = setup_clock_synchronization_with_rti();

Expand Down Expand Up @@ -1420,6 +1426,7 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela

// FIXME: If port absent messages are not used when there is no zero-delay cycle,
// This part is not needed as we don't apply the NDT optimization for cycles.
#ifdef FEDERATED_NDT_ENABLED
if (pqueue_tag_size(env->ndt_q) != 0 ) {
// FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size
// is not enough to know whether this federate using the NDT optimization or not.
Expand All @@ -1433,6 +1440,7 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela
return;
}
}
#endif // FEDERATED_NDT_ENABLED

// Construct the message
size_t message_length = 1 + sizeof(port_ID) + sizeof(fed_ID) + sizeof(instant_t) + sizeof(microstep_t);
Expand Down Expand Up @@ -1948,6 +1956,7 @@ void _lf_logical_tag_complete(tag_t tag_to_send) {
environment_t *env;
_lf_get_environments(&env);
bool need_to_send_LTC = true;
#ifdef FEDERATED_NDT_ENABLED
if (pqueue_tag_size(env->ndt_q) != 0 ) {
// FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size
// is not enough to know whether this federate using the NDT optimization or not.
Expand All @@ -1961,6 +1970,7 @@ void _lf_logical_tag_complete(tag_t tag_to_send) {
need_to_send_LTC = false;
}
}
#endif // FEDERATED_NDT_ENABLED
if (need_to_send_LTC) {
LF_PRINT_LOG("Sending Logical Tag Complete (LTC) " PRINTF_TAG " to the RTI.",
tag_to_send.time - start_time,
Expand Down Expand Up @@ -2391,6 +2401,7 @@ void handle_stop_request_message() {
lf_mutex_unlock(&outbound_socket_mutex);
}

#ifdef FEDERATED_NDT_ENABLED
/**
* Handle a MSG_TYPE_NEXT_DOWNSTREAM_MESSAGE from the RTI
*
Expand Down Expand Up @@ -2425,6 +2436,7 @@ void handle_next_downstream_tag() {
pqueue_tag_insert_if_no_match(env->ndt_q, env->current_tag);
}
}
#endif // FEDERATED_NDT_ENABLED

/**
* Close sockets used to communicate with other federates, if they are open,
Expand Down Expand Up @@ -2643,9 +2655,11 @@ void* listen_to_rti_TCP(void* args) {
case MSG_TYPE_PORT_ABSENT:
handle_port_absent_message(_fed.socket_TCP_RTI, -1);
break;
#ifdef FEDERATED_NDT_ENABLED
case MSG_TYPE_NEXT_DOWNSTREAM_TAG:
handle_next_downstream_tag();
break;
#endif // FEDERATED_NDT_ENABLED
case MSG_TYPE_CLOCK_SYNC_T1:
case MSG_TYPE_CLOCK_SYNC_T4:
lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.",
Expand Down Expand Up @@ -2838,6 +2852,7 @@ tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply
// If there is no downstream events that require the NET of the current tag,
// do not send the NET.
bool need_to_send_NET = true;
#ifdef FEDERATED_NDT_ENABLED
if (pqueue_tag_size(env->ndt_q) != 0 ) {
// FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size
// is not enough to know whether this federate using the NDT optimization or not.
Expand All @@ -2851,6 +2866,7 @@ tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply
need_to_send_NET = false;
}
}
#endif // FEDERATED_NDT_ENABLED
if (need_to_send_NET) {
_lf_send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag, wait_for_reply);
_fed.last_sent_NET = tag;
Expand Down
4 changes: 2 additions & 2 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void _lf_start_time_step(environment_t *env) {
}
}

// #ifdef FEDERATED_CENTRALIZED
#ifdef FEDERATED_NDT_ENABLED
while (pqueue_tag_size(env->ndt_q) != 0
&& lf_tag_compare(pqueue_tag_peek(env->ndt_q)->tag, env->current_tag) < 0) {
// Remove elements of ndt_q with tag less than the current tag.
Expand All @@ -302,7 +302,7 @@ void _lf_start_time_step(environment_t *env) {
tag_to_remove.time - start_time, tag_to_remove.microstep,
env->current_tag.time - start_time, env->current_tag.microstep);
}
// #endif
#endif
#ifdef FEDERATED_DECENTRALIZED
for (int i = 0; i < env->is_present_fields_size; i++) {
// FIXME: For now, an intended tag of (NEVER, 0)
Expand Down
4 changes: 2 additions & 2 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ typedef struct environment_t {
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
#endif // FEDERATED
// #ifdef FEDERATED_CENTRALIZED
#ifdef FEDERATED_NDT_ENABLED
pqueue_tag_t* ndt_q;
// #endif // FEDERATED_CENTRALIZED
#endif // FEDERATED_NDT_ENABLED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t *enclave_info;
#endif
Expand Down

0 comments on commit db9db08

Please sign in to comment.