Skip to content

Commit

Permalink
Merge branch 'main' into rti-NET-forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun committed Dec 4, 2023
2 parents 65e30da + 61261af commit 7749df4
Show file tree
Hide file tree
Showing 28 changed files with 437 additions and 336 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(PlatformLib platform)
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_SOURCE_DIR}/include/core)
include_directories(${CMAKE_SOURCE_DIR}/include/core/federated)
include_directories(${CMAKE_SOURCE_DIR}/include/core/federated/network)
include_directories(${CMAKE_SOURCE_DIR}/include/core/modal_models)
include_directories(${CMAKE_SOURCE_DIR}/include/core/platform)
include_directories(${CMAKE_SOURCE_DIR}/include/core/threaded)
Expand Down
3 changes: 3 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ add_library(core ${GENERAL_SOURCES})
# Add sources for either threaded or single-threaded runtime
if (DEFINED FEDERATED)
include(federated/CMakeLists.txt)
include(federated/network/CMakeLists.txt)
endif()

# Add sources for either threaded or single-threaded runtime
Expand All @@ -43,13 +44,15 @@ include(utils/CMakeLists.txt)
include(modal_models/CMakeLists.txt)
include(platform/CMakeLists.txt)


# Print sources used for compilation
list(JOIN INFO_SOURCES ", " PRINTABLE_SOURCE_LIST)
message(STATUS "Including the following sources: " ${PRINTABLE_SOURCE_LIST})

target_include_directories(core PUBLIC ../include)
target_include_directories(core PUBLIC ../include/core)
target_include_directories(core PUBLIC ../include/core/federated)
target_include_directories(core PUBLIC ../include/core/federated/network)
target_include_directories(core PUBLIC ../include/core/platform)
target_include_directories(core PUBLIC ../include/core/modal_models)
target_include_directories(core PUBLIC ../include/core/threaded)
Expand Down
2 changes: 1 addition & 1 deletion core/federated/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
set(FEDERATED_SOURCES clock-sync.c federate.c net_util.c)
set(FEDERATED_SOURCES clock-sync.c federate.c)
list(APPEND INFO_SOURCES ${FEDERATED_SOURCES})

list(TRANSFORM FEDERATED_SOURCES PREPEND federated/)
Expand Down
3 changes: 2 additions & 1 deletion core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ endif()
set(IncludeDir ../../../include/core)
include_directories(${IncludeDir})
include_directories(${IncludeDir}/federated)
include_directories(${IncludeDir}/federated/network)
include_directories(${IncludeDir}/modal_models)
include_directories(${IncludeDir}/platform)
include_directories(${IncludeDir}/utils)
Expand All @@ -68,7 +69,7 @@ add_executable(
${CoreLib}/platform/lf_unix_clock_support.c
${CoreLib}/utils/util.c
${CoreLib}/tag.c
${CoreLib}/federated/net_util.c
${CoreLib}/federated/network/net_util.c
${CoreLib}/utils/pqueue_base.c
${CoreLib}/utils/pqueue_tag.c
${CoreLib}/utils/pqueue.c
Expand Down
3 changes: 2 additions & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ int process_args(int argc, const char* argv[]) {
return 0;
}
rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines
lf_print("RTI: Number of federates: %d\n", rti.base.number_of_scheduling_nodes);
lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes);
} else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) {
if (argc < i + 2) {
lf_print_error(
Expand Down Expand Up @@ -291,6 +291,7 @@ int main(int argc, const char* argv[]) {

int socket_descriptor = start_rti_server(rti.user_specified_port);
wait_for_federates(socket_descriptor);
free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes);
lf_print("RTI is exiting.");
return 0;
}
Expand Down
349 changes: 205 additions & 144 deletions core/federated/RTI/rti_common.c

Large diffs are not rendered by default.

76 changes: 52 additions & 24 deletions core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ typedef enum scheduling_node_state_t {
PENDING // Waiting for upstream scheduling nodes.
} scheduling_node_state_t;

/** Struct for minimum delays from upstream nodes. */
typedef struct minimum_delay_t {
int id; // ID of the upstream node.
tag_t min_delay; // Minimum delay from upstream.
} minimum_delay_t;

/**
* Information about the scheduling nodes coordinated by the RTI.
* The abstract scheduling node could either be an enclave or a federate.
Expand Down Expand Up @@ -62,6 +68,9 @@ typedef struct scheduling_node_t {
bool is_in_cycle;
bool has_physical_action;
bool enable_ndt;
minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node.
size_t num_min_delays; // Size of min_delays array.
int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
} scheduling_node_t;

/**
Expand Down Expand Up @@ -165,7 +174,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag);
void notify_advance_grant_if_safe(scheduling_node_t* e);

/**
* Nontify a provisional tag advance grant (PTAG) message to the specified scheduling node.
* Notify a provisional tag advance grant (PTAG) message to the specified scheduling node.
* Do not notify it if a previously sent PTAG or TAG was greater or equal.
*
* This function will keep a record of this PTAG in the node's last_provisionally_granted
Expand Down Expand Up @@ -210,7 +219,6 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag);
*/
tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e);


/**
* @brief Update the next event tag of an scheduling node.
*
Expand All @@ -224,29 +232,49 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e);
void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t next_event_tag);

/**
* Find the earliest tag at which the specified federate may
* experience its next event. This is the least next event tag (NET)
* of the specified federate and (transitively) upstream federates
* (with delays of the connections added). For upstream federates,
* we assume (conservatively) that federate upstream of those
* may also send an event. The result will never be less than
* the completion time of the federate (which may be NEVER,
* if the federate has not yet completed a logical time).
*
* FIXME: This could be made less conservative by building
* at code generation time a causality interface table indicating
* which outputs can be triggered by which inputs. For now, we
* assume any output can be triggered by any input.
*
* @param e The scheduling node.
* @param candidate A candidate tag (for the first invocation,
* this should be fed->next_event).
* @param visited An array of booleans indicating which federates
* have been visited (for the first invocation, this should be
* an array of falses of size _RTI.number_of_federates).
* @return The earliest next event tag of the scheduling node e.
* Given a node (enclave or federate), find the tag of the earliest possible incoming
* message from upstream enclaves or federates, which will be the smallest upstream NET
* plus the least delay. This could be NEVER_TAG if the RTI has not seen a NET from some
* upstream node.
* @param e The target node.
* @return The earliest possible incoming message tag.
*/
tag_t earliest_future_incoming_message_tag(scheduling_node_t* e);

/**
* Return true if the node is in a zero-delay cycle.
* @param node The node.
*/
bool is_in_zero_delay_cycle(scheduling_node_t* node);

/**
* Return true if the node is in a cycle (possibly a zero-delay cycle).
* @param node The node.
*/
bool is_in_cycle(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), if necessary, update the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles. These fields will be
* updated only if they have not been previously updated or if invalidate_min_delays_upstream
* has been called since they were last updated.
* @param node The node.
*/
void update_min_delays_upstream(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), invalidate the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles.
* This should be called whenever the structure of the connections upstream of the
* given node have changed.
* @param node The node.
*/
void invalidate_min_delays_upstream(scheduling_node_t* node);

/**
* Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself.
*/
tag_t transitive_next_event(scheduling_node_t *e, tag_t candidate, bool visited[]);
void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes);

bool check_cycle(scheduling_node_t* e, int target_id, bool visited[]);

Expand Down
12 changes: 11 additions & 1 deletion core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static rti_local_t * rti_local;
lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = malloc(sizeof(rti_local_t));
rti_local = (rti_local_t*)malloc(sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");

initialize_rti_common(&rti_local->base);
Expand All @@ -60,6 +60,11 @@ void initialize_local_rti(environment_t *envs, int num_envs) {
}
}

void free_local_rti() {
free_scheduling_nodes(rti_local->base.scheduling_nodes, rti_local->base.number_of_scheduling_nodes);
free(rti_local);
}

void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * env) {
initialize_scheduling_node(&enclave->base, idx);

Expand Down Expand Up @@ -186,4 +191,9 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ",
e->id, tag.time - lf_time_start(), tag.microstep);
}

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
// Nothing to do here.
}

#endif //LF_ENCLAVES
9 changes: 7 additions & 2 deletions core/federated/RTI/rti_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ typedef struct {

/**
* @brief Dynamically create and initialize the local RTI.
*
*/
void initialize_local_rti(environment_t* envs, int num_envs);

/**
* @brief Free memory associated with the local the RTI and the local RTI iself.
*/
void free_local_rti();

/**
* @brief Initialize the enclave object.
*
Expand All @@ -41,7 +45,8 @@ void initialize_local_rti(environment_t* envs, int num_envs);
void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *env);

/**
* @brief This function call may block. A call to this function serves two purposes.
* @brief Notify the local RTI of a next event tag (NET).
* This function call may block. A call to this function serves two purposes.
* 1) It is a promise that, unless receiving events from other enclaves, this
* enclave will not produce any event until the next_event_tag (NET) argument.
* 2) It is a request for permission to advance the logical tag of the enclave
Expand Down
82 changes: 44 additions & 38 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,30 +250,25 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// Send PTAG to all upstream federates, if they have not had
// a later or equal PTAG or TAG sent previously and if their transitive
// NET is greater than or equal to the tag.
// This is needed to stimulate absent messages from upstream and break deadlocks.
// NOTE: This could later be replaced with a TNET mechanism once
// we have an available encoding of causality interfaces.
// That might be more efficient.
// NOTE: This is not needed for enclaves because zero-delay loops are prohibited.
// It's only needed for federates, which is why this is implemented here.
for (int j = 0; j < e->num_upstream; j++) {
federate_info_t* upstream = GET_FED_INFO(e->upstream[j]);
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]];

// Ignore this federate if it has resigned.
if (upstream->enclave.state == NOT_CONNECTED) continue;
// To handle cycles, need to create a boolean array to keep
// track of which upstream federates have been visited.
bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0.

// Find the (transitive) next event tag upstream.
tag_t upstream_next_event = transitive_next_event(
&(upstream->enclave), upstream->enclave.next_event, visited);
free(visited);
// If these tags are equal, then
// a TAG or PTAG should have already been granted,
// in which case, another will not be sent. But it
// may not have been already granted.
if (lf_tag_compare(upstream_next_event, tag) >= 0) {
notify_provisional_tag_advance_grant(&(upstream->enclave), tag);
}
if (upstream->state == NOT_CONNECTED) continue;

tag_t earliest = earliest_future_incoming_message_tag(upstream);

// If these tags are equal, then a TAG or PTAG should have already been granted,
// in which case, another will not be sent. But it may not have been already granted.
if (lf_tag_compare(earliest, tag) >= 0) {
notify_provisional_tag_advance_grant(upstream, tag);
}
}
}
}
Expand All @@ -291,26 +286,26 @@ void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_even
update_scheduling_node_next_event_tag_locked(&(fed->enclave), next_event_tag);
}

/**
* Update the cycle information of every federate.
*/
void update_cycle_information() {
bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool));
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i];
if (check_cycle(target_node, i, visited)) {
target_node->is_in_cycle = true;
LF_PRINT_DEBUG("There is a cycle including federate %d.", i);
} else {
target_node->is_in_cycle = false;
LF_PRINT_DEBUG("There is no cycle including federate %d.", i);
}
for (int j = 0; j < rti_remote->base.number_of_scheduling_nodes; j++) {
visited[j] = false;
}
}
free(visited);
}
// /**
// * Update the cycle information of every federate.
// */
// void update_cycle_information() {
// bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool));
// for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
// scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i];
// if (check_cycle(target_node, i, visited)) {
// target_node->is_in_cycle = true;
// LF_PRINT_DEBUG("There is a cycle including federate %d.", i);
// } else {
// target_node->is_in_cycle = false;
// LF_PRINT_DEBUG("There is no cycle including federate %d.", i);
// }
// for (int j = 0; j < rti_remote->base.number_of_scheduling_nodes; j++) {
// visited[j] = false;
// }
// }
// free(visited);
// }

/**
* Determine whether or not to use the NDT messages.
Expand All @@ -319,7 +314,7 @@ void determine_the_ndt_condition() {
bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool));
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i];
if (target_node->is_in_cycle || check_physical_action_of_transitive_downstreams(target_node, visited)) {
if (is_in_cycle(target_node) || check_physical_action_of_transitive_downstreams(target_node, visited)) {
target_node->enable_ndt = false;
LF_PRINT_DEBUG("There is a cycle including federate %d or a transitive downstream federate of"
" the federate %d has a physical action.", i, i);
Expand Down Expand Up @@ -1805,4 +1800,15 @@ void initialize_RTI(rti_remote_t *rti){
rti_remote->base.tracing_enabled = false;
rti_remote->stop_in_progress = false;
}

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
// FIXME: Gives error freeing memory not allocated!!!!
scheduling_node_t* node = scheduling_nodes[i];
if (node->upstream != NULL) free(node->upstream);
if (node->downstream != NULL) free(node->downstream);
}
free(scheduling_nodes);
}

#endif // STANDALONE_RTI
4 changes: 2 additions & 2 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2130,8 +2130,8 @@ void spawn_staa_thread(){
* If current_time is less than the specified PTAG, then this will
* also insert into the event_q a dummy event with the specified tag.
* This will ensure that the federate advances time to the specified
* tag and, for centralized coordination, inserts blocking reactions
* and null-message-sending output reactions at that tag.
* tag and, for centralized coordination, stimulates null-message-sending
* output reactions at that tag.
*
* @note This function is similar to handle_tag_advance_grant() except that
* it sets last_TAG_was_provisional to true and also it does not update the
Expand Down
5 changes: 5 additions & 0 deletions core/federated/network/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
set(LF_NETWORK_FILES net_util.c)
list(APPEND INFO_SOURCES ${LF_NETWORK_FILES})

list(TRANSFORM LF_NETWORK_FILES PREPEND federated/network/)
target_sources(core PRIVATE ${LF_NETWORK_FILES})
File renamed without changes.
5 changes: 5 additions & 0 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
initialize_global();
// Set start time
start_time = lf_time_physical();

LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG,
FOREVER_TAG.time - start_time, FOREVER_TAG.microstep,
NEVER_TAG.time - start_time, 0);

environment_init_tags(env, start_time, duration);
// Start tracing if enalbed
start_trace(env->trace);
Expand Down
2 changes: 1 addition & 1 deletion core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,7 @@ void termination(void) {
continue;
}
// Stop any tracing, if it is running.
stop_trace(env->trace);
stop_trace_locked(env->trace);

_lf_start_time_step(env);

Expand Down
Loading

0 comments on commit 7749df4

Please sign in to comment.