diff --git a/CMakeLists.txt b/CMakeLists.txt index 9eadfeda2..d78e3a20f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,7 @@ set(PlatformLib platform) include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_SOURCE_DIR}/include/core) include_directories(${CMAKE_SOURCE_DIR}/include/core/federated) +include_directories(${CMAKE_SOURCE_DIR}/include/core/federated/network) include_directories(${CMAKE_SOURCE_DIR}/include/core/modal_models) include_directories(${CMAKE_SOURCE_DIR}/include/core/platform) include_directories(${CMAKE_SOURCE_DIR}/include/core/threaded) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index fbd41261f..888e583d4 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -18,6 +18,7 @@ add_library(core ${GENERAL_SOURCES}) # Add sources for either threaded or single-threaded runtime if (DEFINED FEDERATED) include(federated/CMakeLists.txt) + include(federated/network/CMakeLists.txt) endif() # Add sources for either threaded or single-threaded runtime @@ -43,6 +44,7 @@ include(utils/CMakeLists.txt) include(modal_models/CMakeLists.txt) include(platform/CMakeLists.txt) + # Print sources used for compilation list(JOIN INFO_SOURCES ", " PRINTABLE_SOURCE_LIST) message(STATUS "Including the following sources: " ${PRINTABLE_SOURCE_LIST}) @@ -50,6 +52,7 @@ message(STATUS "Including the following sources: " ${PRINTABLE_SOURCE_LIST}) target_include_directories(core PUBLIC ../include) target_include_directories(core PUBLIC ../include/core) target_include_directories(core PUBLIC ../include/core/federated) +target_include_directories(core PUBLIC ../include/core/federated/network) target_include_directories(core PUBLIC ../include/core/platform) target_include_directories(core PUBLIC ../include/core/modal_models) target_include_directories(core PUBLIC ../include/core/threaded) diff --git a/core/federated/CMakeLists.txt b/core/federated/CMakeLists.txt index d5af17c2b..c0e32abde 100644 --- a/core/federated/CMakeLists.txt +++ b/core/federated/CMakeLists.txt @@ -1,4 +1,4 @@ -set(FEDERATED_SOURCES clock-sync.c federate.c net_util.c) +set(FEDERATED_SOURCES clock-sync.c federate.c) list(APPEND INFO_SOURCES ${FEDERATED_SOURCES}) list(TRANSFORM FEDERATED_SOURCES PREPEND federated/) diff --git a/core/federated/RTI/CMakeLists.txt b/core/federated/RTI/CMakeLists.txt index fc33b47bc..75d812ab7 100644 --- a/core/federated/RTI/CMakeLists.txt +++ b/core/federated/RTI/CMakeLists.txt @@ -52,6 +52,7 @@ endif() set(IncludeDir ../../../include/core) include_directories(${IncludeDir}) include_directories(${IncludeDir}/federated) +include_directories(${IncludeDir}/federated/network) include_directories(${IncludeDir}/modal_models) include_directories(${IncludeDir}/platform) include_directories(${IncludeDir}/utils) @@ -68,7 +69,7 @@ add_executable( ${CoreLib}/platform/lf_unix_clock_support.c ${CoreLib}/utils/util.c ${CoreLib}/tag.c - ${CoreLib}/federated/net_util.c + ${CoreLib}/federated/network/net_util.c ${CoreLib}/utils/pqueue_base.c ${CoreLib}/utils/pqueue_tag.c ${CoreLib}/utils/pqueue.c diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index bfe68ec37..c1fa0c0ae 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -199,7 +199,7 @@ int process_args(int argc, const char* argv[]) { return 0; } rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines - lf_print("RTI: Number of federates: %d\n", rti.base.number_of_scheduling_nodes); + lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { if (argc < i + 2) { lf_print_error( @@ -291,6 +291,7 @@ int main(int argc, const char* argv[]) { int socket_descriptor = start_rti_server(rti.user_specified_port); wait_for_federates(socket_descriptor); + free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes); lf_print("RTI is exiting."); return 0; } diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index 406115252..088a0b49f 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -33,6 +33,16 @@ void initialize_rti_common(rti_common_t * _rti_common) { // Currently, all calls to tracepoint_from_federate() and // tracepoint_to_federate() are in rti_lib.c +#define IS_IN_ZERO_DELAY_CYCLE 1 +#define IS_IN_CYCLE 2 + +void invalidate_min_delays_upstream(scheduling_node_t* node) { + if(node->min_delays != NULL) free(node->min_delays); + node->min_delays = NULL; + node->num_min_delays = 0; + node->flags = 0; // All flags cleared because they get set lazily. +} + void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { e->id = id; e->completed = NEVER_TAG; @@ -46,10 +56,9 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { e->downstream = NULL; e->num_downstream = 0; e->mode = REALTIME; - e->is_in_cycle = false; e->has_physical_action = false; e->enable_ndt = false; - + invalidate_min_delays_upstream(e); } void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { @@ -76,6 +85,36 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { lf_mutex_unlock(rti_common->mutex); } +tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { + // First, we need to find the shortest path (minimum delay) path to each upstream node + // and then find the minimum of the node's recorded NET plus the minimum path delay. + // Update the shortest paths, if necessary. + update_min_delays_upstream(e); + + // Next, find the tag of the earliest possible incoming message from upstream enclaves or + // federates, which will be the smallest upstream NET plus the least delay. + // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. + tag_t t_d = FOREVER_TAG; + for (int i = 0; i < e->num_min_delays; i++) { + // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. + scheduling_node_t* upstream = rti_common->scheduling_nodes[e->min_delays[i].id]; + // If we haven't heard from the upstream node, then assume it can send an event at the start time. + if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { + tag_t start_tag = {.time = start_time, .microstep = 0}; + upstream->next_event = start_tag; + } + tag_t earliest_tag_from_upstream = lf_tag_add(upstream->next_event, e->min_delays[i].min_delay); + LF_PRINT_DEBUG("RTI: Earliest next event upstream of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", + e->id, + upstream->id, + earliest_tag_from_upstream.time - start_time, earliest_tag_from_upstream.microstep); + if (lf_tag_compare(earliest_tag_from_upstream, t_d) < 0) { + t_d = earliest_tag_from_upstream; + } + } + return t_d; +} + tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { tag_advance_grant_t result = {.tag = NEVER_TAG, .is_provisional = false}; @@ -85,7 +124,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { for (int j = 0; j < e->num_upstream; j++) { scheduling_node_t *upstream = rti_common->scheduling_nodes[e->upstream[j]]; - // Ignore this enclave if it no longer connected. + // Ignore this enclave/federate if it is not connected. if (upstream->state == NOT_CONNECTED) continue; // Adjust by the "after" delay. @@ -112,91 +151,52 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { // If all (transitive) upstream scheduling_nodes of the enclave // have earliest event tags such that the // enclave can now advance its tag, then send it a TAG message. - // Find the earliest event time of each such upstream enclave, - // adjusted by delays on the connections. - - // To handle cycles, need to create a boolean array to keep - // track of which upstream enclave have been visited. - bool *visited = (bool *)calloc(rti_common->number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. - - // Find the tag of the earliest possible incoming message from - // upstream enclaves. - tag_t t_d_nonzero_delay = FOREVER_TAG; - // The tag of the earliest possible incoming message from a zero-delay connection. - // Delayed connections are not guarded from STP violations by the MLAA; this property is - // acceptable because delayed connections impose no deadlock risk and in some cases (startup) - // this property is necessary to avoid deadlocks. However, it requires some special care here - // when potentially sending a PTAG because we must not send a PTAG for a tag at which data may - // still be received over nonzero-delay connections. - tag_t t_d_zero_delay = FOREVER_TAG; - LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG, - FOREVER_TAG.time - start_time, FOREVER_TAG.microstep, - NEVER_TAG.time - start_time, 0); - - for (int j = 0; j < e->num_upstream; j++) { - scheduling_node_t *upstream = rti_common->scheduling_nodes[e->upstream[j]]; - - // Ignore this enclave if it is no longer connected. - if (upstream->state == NOT_CONNECTED) continue; - - // Find the (transitive) next event tag upstream. - tag_t upstream_next_event = transitive_next_event( - upstream, upstream->next_event, visited); - - LF_PRINT_DEBUG("RTI: Earliest next event upstream of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", - e->id, - upstream->id, - upstream_next_event.time - start_time, upstream_next_event.microstep); - - // Adjust by the "after" delay. - // Note that "no delay" is encoded as NEVER, - // whereas one microstep delay is encoded as 0LL. - tag_t candidate = lf_delay_strict(upstream_next_event, e->upstream_delay[j]); - - if (e->upstream_delay[j] == NEVER) { - if (lf_tag_compare(candidate, t_d_zero_delay) < 0) { - t_d_zero_delay = candidate; - } - } else { - if (lf_tag_compare(candidate, t_d_nonzero_delay) < 0) { - t_d_nonzero_delay = candidate; - } - } - } - free(visited); - tag_t t_d = (lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0) ? t_d_zero_delay : t_d_nonzero_delay; - - LF_PRINT_LOG("RTI: Earliest next event upstream has tag " PRINTF_TAG ".", - t_d.time - start_time, t_d.microstep); - - if ( - lf_tag_compare(t_d, e->next_event) > 0 // The enclave has something to do. + // Find the tag of the earliest event that may be later received from an upstream enclave + // or federate (which includes any after delays on the connections). + tag_t t_d = earliest_future_incoming_message_tag(e); + + LF_PRINT_LOG("RTI: Earliest next event upstream of node %d has tag " PRINTF_TAG ".", + e->id, t_d.time - start_time, t_d.microstep); + + // Given an EIMT (earliest incoming message tag) there are these possible scenarios: + // 1) The EIMT is greater than the NET we want to advance to. Grant a TAG. + // 2) The EIMT is equal to the NET and the federate is part of a zero-delay cycle (ZDC). + // 3) The EIMT is equal to the NET and the federate is not part of a ZDC. + // 4) The EIMT is less than the NET + // In (1) we can give a TAG to NET. In (2) we can give a PTAG. + // In (3) and (4), we wait for further updates from upstream federates. + + if ( // Scenario (1) above + lf_tag_compare(t_d, e->next_event) > 0 // EIMT greater than NET && lf_tag_compare(t_d, e->last_provisionally_granted) >= 0 // The grant is not redundant // (equal is important to override any previous // PTAGs). - && lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant. + && lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant. ) { - // All upstream scheduling_nodes have events with a larger tag than fed, so it is safe to send a TAG. - LF_PRINT_LOG("Earliest upstream message time for fed/encl %d is " PRINTF_TAG - "(adjusted by after delay). Granting tag advance for " PRINTF_TAG, + // No upstream node can send events that will be received with a tag less than or equal to + // e->next_event, so it is safe to send a TAG. + LF_PRINT_LOG("RTI: Earliest upstream message time for fed/encl %d is " PRINTF_TAG + "(adjusted by after delay). Granting tag advance (TAG) for " PRINTF_TAG, e->id, t_d.time - lf_time_start(), t_d.microstep, e->next_event.time - lf_time_start(), e->next_event.microstep); result.tag = e->next_event; - } else if ( - lf_tag_compare(t_d_zero_delay, e->next_event) == 0 // The enclave has something to do. - && lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0 // The statuses of nonzero-delay connections are known at tag t_d_zero_delay - && lf_tag_compare(t_d_zero_delay, e->last_provisionally_granted) > 0 // The grant is not redundant. - && lf_tag_compare(t_d_zero_delay, e->last_granted) > 0 // The grant is not redundant. - ) { - // Some upstream scheduling_nodes has an event that has the same tag as fed's next event, so we can only provisionally - // grant a TAG (via a PTAG). - LF_PRINT_LOG("Earliest upstream message time for fed/encl %d is " PRINTF_TAG - " (adjusted by after delay). Granting provisional tag advance.", + } else if( // Scenario (2) or (3) above + lf_tag_compare(t_d, e->next_event) == 0 // EIMT equal to NET + && is_in_zero_delay_cycle(e) // The node is part of a ZDC + && lf_tag_compare(t_d, e->last_provisionally_granted) > 0 // The grant is not redundant + && lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant. + ) { + // Some upstream node may send an event that has the same tag as this node's next event, + // so we can only grant a PTAG. + LF_PRINT_LOG("RTI: Earliest upstream message time for fed/encl %d is " PRINTF_TAG + " (adjusted by after delay). Granting provisional tag advance (PTAG) for " PRINTF_TAG, e->id, - t_d_zero_delay.time - start_time, t_d_zero_delay.microstep); - result.tag = t_d_zero_delay; + t_d.time - start_time, t_d.microstep, + e->next_event.time - lf_time_start(), + e->next_event.microstep); + result.tag = e->next_event; result.is_provisional = true; } return result; @@ -227,16 +227,18 @@ void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t ne // nor expect a reply. It just proceeds to advance time. if (e->num_upstream > 0) { notify_advance_grant_if_safe(e); + } else { + // Even though there was no grant, mark the tag as if there was. + e->last_granted = next_event_tag; } // Check downstream scheduling_nodes to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep - // track of which upstream scheduling_nodes have been visited. + // track of which downstream scheduling_nodes have been visited. bool *visited = (bool *)calloc(rti_common->number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. notify_downstream_advance_grant_if_safe(e, visited); free(visited); } - void notify_advance_grant_if_safe(scheduling_node_t* e) { tag_advance_grant_t grant = tag_advance_grant_if_safe(e); if (lf_tag_compare(grant.tag, NEVER_TAG) != 0) { @@ -248,82 +250,141 @@ void notify_advance_grant_if_safe(scheduling_node_t* e) { } } -tag_t transitive_next_event(scheduling_node_t* e, tag_t candidate, bool visited[]) { - if (visited[e->id] || e->state == NOT_CONNECTED) { - // Enclave has stopped executing or we have visited it before. +// Local function used recursively to find minimum delays upstream. +// Return in count the number of non-FOREVER_TAG entries in path_delays[]. +static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_t* intermediate, tag_t path_delays[], size_t* count) { + // On first call, intermediate will be NULL, so the path delay is initialized to zero. + tag_t delay_from_intermediate_so_far = ZERO_TAG; + if (intermediate == NULL) { + intermediate = end; + } else { + // Not the first call, so intermediate is upstream of end. + delay_from_intermediate_so_far = path_delays[intermediate->id]; + } + if (intermediate->state == NOT_CONNECTED) { + // Enclave or federate is not connected. // No point in checking upstream scheduling_nodes. - return candidate; + return; } - - visited[e->id] = true; - tag_t result = e->next_event; - - // If the candidate is less than this enclave's next_event, use the candidate. - if (lf_tag_compare(candidate, result) < 0) { - result = candidate; + // Check nodes upstream of intermediate (or end on first call). + // NOTE: It would be better to iterate through these sorted by minimum delay, + // but for most programs, the gain might be negligible since there are relatively few + // upstream nodes. + for (int i = 0; i < intermediate->num_upstream; i++) { + // Add connection delay to path delay so far. + tag_t path_delay = lf_delay_tag(delay_from_intermediate_so_far, intermediate->upstream_delay[i]); + // If the path delay is less than the so-far recorded path delay from upstream, update upstream. + if (lf_tag_compare(path_delay, path_delays[intermediate->upstream[i]]) < 0) { + if (path_delays[intermediate->upstream[i]].time == FOREVER) { + // Found a finite path. + *count = *count + 1; + } + path_delays[intermediate->upstream[i]] = path_delay; + // Since the path delay to upstream has changed, recursively update those upstream of it. + // Do not do this, however, if the upstream node is the end node because this means we have + // completed a cycle. + if (end->id != intermediate->upstream[i]) { + _update_min_delays_upstream(end, rti_common->scheduling_nodes[intermediate->upstream[i]], path_delays, count); + } else { + // Found a cycle. + end->flags = end->flags | IS_IN_CYCLE; + // Is it a zero-delay cycle? + if (lf_tag_compare(path_delay, ZERO_TAG) == 0 && intermediate->upstream_delay[i] < 0) { + end->flags = end->flags | IS_IN_ZERO_DELAY_CYCLE; + } else { + // Clear the flag. + end->flags = end->flags & ~IS_IN_ZERO_DELAY_CYCLE; + } + } + } } +} - // The result cannot be earlier than the start time. - if (result.time < start_time) { - // Earliest next event cannot be before the start time. - result = (tag_t){.time = start_time, .microstep = 0u}; - } +// bool check_cycle(scheduling_node_t* e, int target_id, bool visited[]) { +// if (visited[e->id] || e->state == NOT_CONNECTED) { +// if (e->id == target_id) { +// return true; +// } else { +// return false; +// } +// } + +// visited[e->id] = true; + +// for (int i = 0; i < e->num_upstream; i++) { +// if (check_cycle(rti_common->scheduling_nodes[e->upstream[i]], target_id, visited)) { +// return true; +// } +// } +// return false; +// } + +// FIXME: Implement this function again by adopting the function update_min_delays_upstream +bool check_physical_action_of_transitive_downstreams(scheduling_node_t* e, bool visited[]) { + return false; + // if (visited[e->id] || e->state == NOT_CONNECTED) { + // return false; + // } + + // visited[e->id] = true; + + // for (int i = 0; i < e->num_downstream; i++) { + // if (check_physical_action_of_transitive_downstreams(rti_common->scheduling_nodes[e->downstream[i]], visited)) { + // return true; + // } + // } + // if (e->has_physical_action) { + // return true; + // } else { + // return false; + // } +} - // Check upstream scheduling_nodes to see whether any of them might send - // an event that would result in an earlier next event. - for (int i = 0; i < e->num_upstream; i++) { - tag_t upstream_result = transitive_next_event( - rti_common->scheduling_nodes[e->upstream[i]], result, visited); +void update_min_delays_upstream(scheduling_node_t* node) { + // Check whether cached result is valid. + if (node->min_delays == NULL) { - // Add the "after" delay of the connection to the result. - upstream_result = lf_delay_tag(upstream_result, e->upstream_delay[i]); + // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. + // There must be a name for this algorithm. - // If the adjusted event time is less than the result so far, update the result. - if (lf_tag_compare(upstream_result, result) < 0) { - result = upstream_result; - } - } - if (lf_tag_compare(result, e->completed) < 0) { - result = e->completed; - } - return result; -} + // Array of results on the stack: + tag_t path_delays[rti_common->number_of_scheduling_nodes]; + // This will be the number of non-FOREVER entries put into path_delays. + size_t count = 0; -bool check_cycle(scheduling_node_t* e, int target_id, bool visited[]) { - if (visited[e->id] || e->state == NOT_CONNECTED) { - if (e->id == target_id) { - return true; - } else { - return false; + for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { + path_delays[i] = FOREVER_TAG; } - } - - visited[e->id] = true; - - for (int i = 0; i < e->num_upstream; i++) { - if (check_cycle(rti_common->scheduling_nodes[e->upstream[i]], target_id, visited)) { - return true; + _update_min_delays_upstream(node, NULL, path_delays, &count); + + // Put the results onto the node's struct. + node->num_min_delays = count; + node->min_delays = (minimum_delay_t*)malloc(count * sizeof(minimum_delay_t)); + LF_PRINT_DEBUG("++++ Node %hu(is in ZDC: %d\n", node->id, node->flags & IS_IN_ZERO_DELAY_CYCLE); + int k = 0; + for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { + if (lf_tag_compare(path_delays[i], FOREVER_TAG) < 0) { + // Node i is upstream. + if (k >= count) { + lf_print_error_and_exit("Internal error! Count of upstream nodes %zu for node %d is wrong!", count, i); + } + minimum_delay_t min_delay = {.id = i, .min_delay = path_delays[i]}; + node->min_delays[k++] = min_delay; + // N^2 debug statement could be a problem with large benchmarks. + // LF_PRINT_DEBUG("++++ Node %hu is upstream with delay" PRINTF_TAG "\n", i, path_delays[i].time, path_delays[i].microstep); + } } } - return false; } -bool check_physical_action_of_transitive_downstreams(scheduling_node_t* e, bool visited[]) { - if (visited[e->id] || e->state == NOT_CONNECTED) { - return false; - } - - visited[e->id] = true; +bool is_in_zero_delay_cycle(scheduling_node_t* node) { + update_min_delays_upstream(node); + return node->flags & IS_IN_ZERO_DELAY_CYCLE; +} - for (int i = 0; i < e->num_downstream; i++) { - if (check_physical_action_of_transitive_downstreams(rti_common->scheduling_nodes[e->downstream[i]], visited)) { - return true; - } - } - if (e->has_physical_action) { - return true; - } else { - return false; - } +bool is_in_cycle(scheduling_node_t* node) { + update_min_delays_upstream(node); + return node->flags & IS_IN_CYCLE; } -#endif \ No newline at end of file + +#endif diff --git a/core/federated/RTI/rti_common.h b/core/federated/RTI/rti_common.h index c646f6a2c..2f9840eb9 100644 --- a/core/federated/RTI/rti_common.h +++ b/core/federated/RTI/rti_common.h @@ -34,6 +34,12 @@ typedef enum scheduling_node_state_t { PENDING // Waiting for upstream scheduling nodes. } scheduling_node_state_t; +/** Struct for minimum delays from upstream nodes. */ +typedef struct minimum_delay_t { + int id; // ID of the upstream node. + tag_t min_delay; // Minimum delay from upstream. +} minimum_delay_t; + /** * Information about the scheduling nodes coordinated by the RTI. * The abstract scheduling node could either be an enclave or a federate. @@ -62,6 +68,9 @@ typedef struct scheduling_node_t { bool is_in_cycle; bool has_physical_action; bool enable_ndt; + minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node. + size_t num_min_delays; // Size of min_delays array. + int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE } scheduling_node_t; /** @@ -165,7 +174,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag); void notify_advance_grant_if_safe(scheduling_node_t* e); /** - * Nontify a provisional tag advance grant (PTAG) message to the specified scheduling node. + * Notify a provisional tag advance grant (PTAG) message to the specified scheduling node. * Do not notify it if a previously sent PTAG or TAG was greater or equal. * * This function will keep a record of this PTAG in the node's last_provisionally_granted @@ -210,7 +219,6 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag); */ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e); - /** * @brief Update the next event tag of an scheduling node. * @@ -224,29 +232,49 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e); void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t next_event_tag); /** - * Find the earliest tag at which the specified federate may - * experience its next event. This is the least next event tag (NET) - * of the specified federate and (transitively) upstream federates - * (with delays of the connections added). For upstream federates, - * we assume (conservatively) that federate upstream of those - * may also send an event. The result will never be less than - * the completion time of the federate (which may be NEVER, - * if the federate has not yet completed a logical time). - * - * FIXME: This could be made less conservative by building - * at code generation time a causality interface table indicating - * which outputs can be triggered by which inputs. For now, we - * assume any output can be triggered by any input. - * - * @param e The scheduling node. - * @param candidate A candidate tag (for the first invocation, - * this should be fed->next_event). - * @param visited An array of booleans indicating which federates - * have been visited (for the first invocation, this should be - * an array of falses of size _RTI.number_of_federates). - * @return The earliest next event tag of the scheduling node e. + * Given a node (enclave or federate), find the tag of the earliest possible incoming + * message from upstream enclaves or federates, which will be the smallest upstream NET + * plus the least delay. This could be NEVER_TAG if the RTI has not seen a NET from some + * upstream node. + * @param e The target node. + * @return The earliest possible incoming message tag. + */ +tag_t earliest_future_incoming_message_tag(scheduling_node_t* e); + +/** + * Return true if the node is in a zero-delay cycle. + * @param node The node. + */ +bool is_in_zero_delay_cycle(scheduling_node_t* node); + +/** + * Return true if the node is in a cycle (possibly a zero-delay cycle). + * @param node The node. + */ +bool is_in_cycle(scheduling_node_t* node); + +/** + * For the given scheduling node (enclave or federate), if necessary, update the `min_delays`, + * `num_min_delays`, and the fields that indicate cycles. These fields will be + * updated only if they have not been previously updated or if invalidate_min_delays_upstream + * has been called since they were last updated. + * @param node The node. + */ +void update_min_delays_upstream(scheduling_node_t* node); + +/** + * For the given scheduling node (enclave or federate), invalidate the `min_delays`, + * `num_min_delays`, and the fields that indicate cycles. + * This should be called whenever the structure of the connections upstream of the + * given node have changed. + * @param node The node. + */ +void invalidate_min_delays_upstream(scheduling_node_t* node); + +/** + * Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself. */ -tag_t transitive_next_event(scheduling_node_t *e, tag_t candidate, bool visited[]); +void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes); bool check_cycle(scheduling_node_t* e, int target_id, bool visited[]); diff --git a/core/federated/RTI/rti_local.c b/core/federated/RTI/rti_local.c index 7ca63a873..1f6cc0928 100644 --- a/core/federated/RTI/rti_local.c +++ b/core/federated/RTI/rti_local.c @@ -35,7 +35,7 @@ static rti_local_t * rti_local; lf_mutex_t rti_mutex; void initialize_local_rti(environment_t *envs, int num_envs) { - rti_local = malloc(sizeof(rti_local_t)); + rti_local = (rti_local_t*)malloc(sizeof(rti_local_t)); LF_ASSERT(rti_local, "Out of memory"); initialize_rti_common(&rti_local->base); @@ -60,6 +60,11 @@ void initialize_local_rti(environment_t *envs, int num_envs) { } } +void free_local_rti() { + free_scheduling_nodes(rti_local->base.scheduling_nodes, rti_local->base.number_of_scheduling_nodes); + free(rti_local); +} + void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * env) { initialize_scheduling_node(&enclave->base, idx); @@ -186,4 +191,9 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ", e->id, tag.time - lf_time_start(), tag.microstep); } + +void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { + // Nothing to do here. +} + #endif //LF_ENCLAVES diff --git a/core/federated/RTI/rti_local.h b/core/federated/RTI/rti_local.h index 9420d49c8..8960ad1b1 100644 --- a/core/federated/RTI/rti_local.h +++ b/core/federated/RTI/rti_local.h @@ -29,10 +29,14 @@ typedef struct { /** * @brief Dynamically create and initialize the local RTI. - * */ void initialize_local_rti(environment_t* envs, int num_envs); +/** + * @brief Free memory associated with the local the RTI and the local RTI iself. + */ +void free_local_rti(); + /** * @brief Initialize the enclave object. * @@ -41,7 +45,8 @@ void initialize_local_rti(environment_t* envs, int num_envs); void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *env); /** - * @brief This function call may block. A call to this function serves two purposes. + * @brief Notify the local RTI of a next event tag (NET). + * This function call may block. A call to this function serves two purposes. * 1) It is a promise that, unless receiving events from other enclaves, this * enclave will not produce any event until the next_event_tag (NET) argument. * 2) It is a request for permission to advance the logical tag of the enclave diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 8e0352ba6..8c23dc559 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -250,30 +250,25 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // Send PTAG to all upstream federates, if they have not had // a later or equal PTAG or TAG sent previously and if their transitive // NET is greater than or equal to the tag. + // This is needed to stimulate absent messages from upstream and break deadlocks. // NOTE: This could later be replaced with a TNET mechanism once // we have an available encoding of causality interfaces. // That might be more efficient. + // NOTE: This is not needed for enclaves because zero-delay loops are prohibited. + // It's only needed for federates, which is why this is implemented here. for (int j = 0; j < e->num_upstream; j++) { - federate_info_t* upstream = GET_FED_INFO(e->upstream[j]); + scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]]; // Ignore this federate if it has resigned. - if (upstream->enclave.state == NOT_CONNECTED) continue; - // To handle cycles, need to create a boolean array to keep - // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. - - // Find the (transitive) next event tag upstream. - tag_t upstream_next_event = transitive_next_event( - &(upstream->enclave), upstream->enclave.next_event, visited); - free(visited); - // If these tags are equal, then - // a TAG or PTAG should have already been granted, - // in which case, another will not be sent. But it - // may not have been already granted. - if (lf_tag_compare(upstream_next_event, tag) >= 0) { - notify_provisional_tag_advance_grant(&(upstream->enclave), tag); - } + if (upstream->state == NOT_CONNECTED) continue; + + tag_t earliest = earliest_future_incoming_message_tag(upstream); + // If these tags are equal, then a TAG or PTAG should have already been granted, + // in which case, another will not be sent. But it may not have been already granted. + if (lf_tag_compare(earliest, tag) >= 0) { + notify_provisional_tag_advance_grant(upstream, tag); + } } } } @@ -291,26 +286,26 @@ void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_even update_scheduling_node_next_event_tag_locked(&(fed->enclave), next_event_tag); } -/** - * Update the cycle information of every federate. -*/ -void update_cycle_information() { - bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); - for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { - scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i]; - if (check_cycle(target_node, i, visited)) { - target_node->is_in_cycle = true; - LF_PRINT_DEBUG("There is a cycle including federate %d.", i); - } else { - target_node->is_in_cycle = false; - LF_PRINT_DEBUG("There is no cycle including federate %d.", i); - } - for (int j = 0; j < rti_remote->base.number_of_scheduling_nodes; j++) { - visited[j] = false; - } - } - free(visited); -} +// /** +// * Update the cycle information of every federate. +// */ +// void update_cycle_information() { +// bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); +// for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { +// scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i]; +// if (check_cycle(target_node, i, visited)) { +// target_node->is_in_cycle = true; +// LF_PRINT_DEBUG("There is a cycle including federate %d.", i); +// } else { +// target_node->is_in_cycle = false; +// LF_PRINT_DEBUG("There is no cycle including federate %d.", i); +// } +// for (int j = 0; j < rti_remote->base.number_of_scheduling_nodes; j++) { +// visited[j] = false; +// } +// } +// free(visited); +// } /** * Determine whether or not to use the NDT messages. @@ -319,7 +314,7 @@ void determine_the_ndt_condition() { bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i]; - if (target_node->is_in_cycle || check_physical_action_of_transitive_downstreams(target_node, visited)) { + if (is_in_cycle(target_node) || check_physical_action_of_transitive_downstreams(target_node, visited)) { target_node->enable_ndt = false; LF_PRINT_DEBUG("There is a cycle including federate %d or a transitive downstream federate of" " the federate %d has a physical action.", i, i); @@ -1805,4 +1800,15 @@ void initialize_RTI(rti_remote_t *rti){ rti_remote->base.tracing_enabled = false; rti_remote->stop_in_progress = false; } + +void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { + for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) { + // FIXME: Gives error freeing memory not allocated!!!! + scheduling_node_t* node = scheduling_nodes[i]; + if (node->upstream != NULL) free(node->upstream); + if (node->downstream != NULL) free(node->downstream); + } + free(scheduling_nodes); +} + #endif // STANDALONE_RTI diff --git a/core/federated/federate.c b/core/federated/federate.c index 60b9588a9..1c2c9c8d7 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2130,8 +2130,8 @@ void spawn_staa_thread(){ * If current_time is less than the specified PTAG, then this will * also insert into the event_q a dummy event with the specified tag. * This will ensure that the federate advances time to the specified - * tag and, for centralized coordination, inserts blocking reactions - * and null-message-sending output reactions at that tag. + * tag and, for centralized coordination, stimulates null-message-sending + * output reactions at that tag. * * @note This function is similar to handle_tag_advance_grant() except that * it sets last_TAG_was_provisional to true and also it does not update the diff --git a/core/federated/network/CMakeLists.txt b/core/federated/network/CMakeLists.txt new file mode 100644 index 000000000..82795b7bb --- /dev/null +++ b/core/federated/network/CMakeLists.txt @@ -0,0 +1,5 @@ +set(LF_NETWORK_FILES net_util.c) +list(APPEND INFO_SOURCES ${LF_NETWORK_FILES}) + +list(TRANSFORM LF_NETWORK_FILES PREPEND federated/network/) +target_sources(core PRIVATE ${LF_NETWORK_FILES}) diff --git a/core/federated/net_util.c b/core/federated/network/net_util.c similarity index 100% rename from core/federated/net_util.c rename to core/federated/network/net_util.c diff --git a/core/reactor.c b/core/reactor.c index e715514e9..ce95b057d 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -371,6 +371,11 @@ int lf_reactor_c_main(int argc, const char* argv[]) { initialize_global(); // Set start time start_time = lf_time_physical(); + + LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG, + FOREVER_TAG.time - start_time, FOREVER_TAG.microstep, + NEVER_TAG.time - start_time, 0); + environment_init_tags(env, start_time, duration); // Start tracing if enalbed start_trace(env->trace); diff --git a/core/reactor_common.c b/core/reactor_common.c index 111cab93a..19febbac5 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -1760,7 +1760,7 @@ void termination(void) { continue; } // Stop any tracing, if it is running. - stop_trace(env->trace); + stop_trace_locked(env->trace); _lf_start_time_step(env); diff --git a/core/tag.c b/core/tag.c index 86ca45474..eee10699f 100644 --- a/core/tag.c +++ b/core/tag.c @@ -116,14 +116,22 @@ tag_t lf_tag(void *env) { return ((environment_t *)env)->current_tag; } +tag_t lf_tag_add(tag_t a, tag_t b) { + if (a.time == NEVER || b.time == NEVER) return NEVER_TAG; + if (a.time == FOREVER || b.time == FOREVER) return FOREVER_TAG; + tag_t result = {.time = a.time + b.time, .microstep = a.microstep + b.microstep}; + if (result.microstep < a.microstep) return FOREVER_TAG; + if (result.time < a.time && b.time > 0) return FOREVER_TAG; + if (result.time > a.time && b.time < 0) return NEVER_TAG; + return result; +} + int lf_tag_compare(tag_t tag1, tag_t tag2) { if (tag1.time < tag2.time) { - LF_PRINT_DEBUG(PRINTF_TIME " < " PRINTF_TIME, tag1.time, tag2.time); return -1; } else if (tag1.time > tag2.time) { return 1; } else if (tag1.microstep < tag2.microstep) { - LF_PRINT_DEBUG(PRINTF_TIME " and microstep < " PRINTF_TIME, tag1.time, tag2.time); return -1; } else if (tag1.microstep > tag2.microstep) { return 1; @@ -134,7 +142,8 @@ int lf_tag_compare(tag_t tag1, tag_t tag2) { tag_t lf_delay_tag(tag_t tag, interval_t interval) { if (tag.time == NEVER || interval < 0LL) return tag; - if (tag.time >= FOREVER - interval) return tag; + // Note that overflow in C is undefined for signed variables. + if (tag.time >= FOREVER - interval) return FOREVER_TAG; // Overflow. tag_t result = tag; if (interval == 0LL) { // Note that unsigned variables will wrap on overflow. @@ -142,12 +151,7 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { // microsteps. result.microstep++; } else { - // Note that overflow in C is undefined for signed variables. - if (FOREVER - interval < result.time) { - result.time = FOREVER; - } else { - result.time += interval; - } + result.time += interval; result.microstep = 0; } return result; @@ -156,7 +160,6 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { tag_t lf_delay_strict(tag_t tag, interval_t interval) { tag_t result = lf_delay_tag(tag, interval); if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) { - LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time); result.time -= 1; result.microstep = UINT_MAX; } diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index bafa81949..6ebf6c8c6 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -924,8 +924,9 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) #ifdef FEDERATED stall_advance_level_federation(env, *next_reaction_level); #endif - *next_reaction_level += 1; + if (*next_reaction_level < SIZE_MAX) *next_reaction_level += 1; } + /** * The main looping logic of each LF worker thread. * This function assumes the caller holds the mutex lock. @@ -1216,6 +1217,9 @@ int lf_reactor_c_main(int argc, const char* argv[]) { LF_PRINT_LOG("---- All worker threads exited successfully."); } } +#if defined LF_ENCLAVES + free_local_rti(); +#endif return 0; } diff --git a/core/trace.c b/core/trace.c index 144e9baf7..6fffbc7bf 100644 --- a/core/trace.c +++ b/core/trace.c @@ -459,6 +459,11 @@ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, i void stop_trace(trace_t* trace) { lf_critical_section_enter(trace->env); + stop_trace_locked(trace); + lf_critical_section_exit(trace->env); +} + +void stop_trace_locked(trace_t* trace) { if (trace->_lf_trace_stop) { // Trace was already stopped. Nothing to do. return; @@ -480,7 +485,6 @@ void stop_trace(trace_t* trace) { fclose(trace->_lf_trace_file); trace->_lf_trace_file = NULL; LF_PRINT_DEBUG("Stopped tracing."); - lf_critical_section_exit(trace->env); } //////////////////////////////////////////////////////////// diff --git a/core/utils/util.c b/core/utils/util.c index 82e57e070..23daef364 100644 --- a/core/utils/util.c +++ b/core/utils/util.c @@ -32,6 +32,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "util.h" + +#ifndef STANDALONE_RTI +#include "environment.h" +#endif + #include #include #include @@ -49,10 +54,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define SOCKET_READ_RETRY_INTERVAL 1000000 /** - * The ID of this federate. For a non-federated execution, this will - * be -1. For a federated execution, it will be assigned when the generated function - * _lf_initialize_trigger_objects() is called. - * @see xtext/org.icyphy.linguafranca/src/org/icyphy/generator/CGenerator.xtend. + * The ID of this federate. For a non-federated execution, this will be -1. + * For a federated execution, it will be assigned in the generated code. */ int _lf_my_fed_id = -1; @@ -65,9 +68,6 @@ print_message_function_t* print_message_function = NULL; /** The level of messages to redirect to print_message_function. */ int print_message_level = -1; -/** - * Return the federate ID or -1 if this program is not part of a federation. - */ int lf_fed_id() { return _lf_my_fed_id; } @@ -107,14 +107,31 @@ void _lf_message_print( // If we make multiple calls to printf(), then the results could be // interleaved between threads. // vprintf() is a version that takes an arg list rather than multiple args. - size_t length = strlen(prefix) + strlen(format) + 32; - char* message = (char*) malloc(length + 1); + char* message; if (_lf_my_fed_id < 0) { + size_t length = strlen(prefix) + strlen(format) + 32; + message = (char*) malloc(length + 1); snprintf(message, length, "%s%s\n", prefix, format); } else { - snprintf(message, length, "Federate %d: %s%s\n", - _lf_my_fed_id, prefix, format); +#if defined STANDALONE_RTI + size_t length = strlen(prefix) + strlen(format) + 37; + message = (char*) malloc(length + 1); + snprintf(message, length, "RTI: %s%s\n", + prefix, format); +#else + // Get the federate name from the top-level environment, which by convention is the first. + environment_t *envs; + _lf_get_environments(&envs); + char* name = envs->name; + size_t length = strlen(prefix) + strlen(format) + +strlen(name) + 32; + message = (char*) malloc(length + 1); + // If the name has prefix "federate__", strip that out. + if (strncmp(name, "federate__", 10) == 0) name += 10; + + snprintf(message, length, "Fed %d (%s): %s%s\n", + _lf_my_fed_id, name, prefix, format); +#endif // STANDALONE_RTI } if (print_message_function == NULL) { if (is_error) { @@ -129,14 +146,6 @@ void _lf_message_print( } } -/** - * Report an informational message on stdout with - * a newline appended at the end. - * If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). - */ void lf_print(const char* format, ...) { va_list args; va_start (args, format); @@ -144,21 +153,10 @@ void lf_print(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print" - */ void lf_vprint(const char* format, va_list args) { _lf_message_print(0, "", format, args, LOG_LEVEL_INFO); } -/** - * Report an log message on stdout with the prefix - * "LOG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). - */ void lf_print_log(const char* format, ...) { va_list args; va_start (args, format); @@ -166,22 +164,10 @@ void lf_print_log(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_log" - */ void lf_vprint_log(const char* format, va_list args) { _lf_message_print(0, "LOG: ", format, args, LOG_LEVEL_LOG); } - -/** - * Report an debug message on stdout with the prefix - * "DEBUG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). - */ void lf_print_debug(const char* format, ...) { va_list args; va_start (args, format); @@ -189,17 +175,10 @@ void lf_print_debug(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_debug" - */ void lf_vprint_debug(const char* format, va_list args) { _lf_message_print(0, "DEBUG: ", format, args, LOG_LEVEL_DEBUG); } -/** - * Report an error with the prefix "ERROR: " and a newline appended - * at the end. The arguments are just like printf(). - */ void lf_print_error(const char* format, ...) { va_list args; va_start (args, format); @@ -207,17 +186,10 @@ void lf_print_error(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_error" - */ void lf_vprint_error(const char* format, va_list args) { _lf_message_print(1, "ERROR: ", format, args, LOG_LEVEL_ERROR); } -/** - * Report a warning with the prefix "WARNING: " and a newline appended - * at the end. The arguments are just like printf(). - */ void lf_print_warning(const char* format, ...) { va_list args; va_start (args, format); @@ -225,18 +197,10 @@ void lf_print_warning(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_warning" - */ void lf_vprint_warning(const char* format, va_list args) { _lf_message_print(1, "WARNING: ", format, args, LOG_LEVEL_WARNING); } -/** - * Report an error with the prefix "ERROR: " and a newline appended - * at the end, then exit with the failure code EXIT_FAILURE. - * The arguments are just like printf(). - */ void lf_print_error_and_exit(const char* format, ...) { va_list args; va_start (args, format); @@ -245,25 +209,10 @@ void lf_print_error_and_exit(const char* format, ...) { exit(EXIT_FAILURE); } -/** - * varargs alternative of "lf_print_error_and_exit" - */ void lf_vprint_error_and_exit(const char* format, va_list args) { _lf_message_print(1, "FATAL ERROR: ", format, args, LOG_LEVEL_ERROR); } -/** - * Register a function to display messages. After calling this, - * all messages passed to the above print functions will be - * printed using the specified function rather than printf - * if their log level is greater than the specified level. - * The level should be one of LOG_LEVEL_ERROR, LOG_LEVEL_WARNING, - * LOG_LEVEL_INFO, LOG_LEVEL_LOG, or LOG_LEVEL_DEBUG. - * - * @param function The print message function or NULL to revert - * to using printf. - * @param log_level The level of messages to redirect. - */ void lf_register_print_function(print_message_function_t* function, int log_level) { print_message_function = function; print_message_level = log_level; diff --git a/include/core/environment.h b/include/core/environment.h index 717c1b4ff..2ac58ac8d 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -156,4 +156,12 @@ void environment_init_tags( environment_t *env, instant_t start_time, interval_t duration ); +/** + * @brief Will update the argument to point to the beginning of the array of environments in this program + * @note Is code-generated by the compiler + * @param envs A double pointer which will be dereferenced and modified + * @return int The number of environments in the array + */ +int _lf_get_environments(environment_t **envs); + #endif diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index aca041e5b..880408ec6 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -363,7 +363,7 @@ void stall_advance_level_federation(environment_t* env, size_t level); /** * @brief Update the max level allowed to advance (MLAA). * If the specified tag is greater than the current_tag of the top-level environment - * (or equal an is_provisional is false), then set the MLAA to MAX_INT and return. + * (or equal and is_provisional is false), then set the MLAA to MAX_INT and return. * This removes any barriers on execution at the current tag due to network inputs. * Otherwise, set the MLAA to the minimum level over all (non-physical) network input ports * where the status of the input port is not known at that current_tag. diff --git a/include/core/federated/net_common.h b/include/core/federated/network/net_common.h similarity index 100% rename from include/core/federated/net_common.h rename to include/core/federated/network/net_common.h diff --git a/include/core/federated/net_util.h b/include/core/federated/network/net_util.h similarity index 99% rename from include/core/federated/net_util.h rename to include/core/federated/network/net_util.h index cc621115d..5c6bcb966 100644 --- a/include/core/federated/net_util.h +++ b/include/core/federated/network/net_util.h @@ -48,8 +48,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include -#include "../platform.h" -#include "../tag.h" +#include "../../platform.h" +#include "../../tag.h" #define HOST_LITTLE_ENDIAN 1 #define HOST_BIG_ENDIAN 2 diff --git a/include/core/reactor.h b/include/core/reactor.h index 0e56c0f3e..d9ee515b5 100644 --- a/include/core/reactor.h +++ b/include/core/reactor.h @@ -558,15 +558,6 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, // See reactor.h for doc. int _lf_fd_send_stop_request_to_rti(tag_t stop_tag); -/** - * @brief Will update the argument to point to the beginning of the array of environments in this program - * @note Is code-generated by the compiler - * @param envs A double pointer which will be dereferenced and modified - * @return int The number of environments in the array - */ -int _lf_get_environments(environment_t **envs); - - /** * @brief Will create and initialize the required number of environments for the program * @note Will be code generated by the compiler diff --git a/include/core/tag.h b/include/core/tag.h index b598cce6a..0a480a40b 100644 --- a/include/core/tag.h +++ b/include/core/tag.h @@ -40,6 +40,7 @@ #define FOREVER_TAG (tag_t) { .time = FOREVER, .microstep = FOREVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers #define FOREVER_TAG_INITIALIZER { FOREVER, FOREVER_MICROSTEP } +#define ZERO_TAG (tag_t) { .time = 0LL, .microstep = 0u } // Convenience for converting times #define BILLION 1000000000LL @@ -82,6 +83,14 @@ typedef struct { */ tag_t lf_tag(void* env); +/** + * Add two tags. If either tag has has NEVER or FOREVER in its time field, then + * return NEVER_TAG or FOREVER_TAG, respectively. Also return NEVER_TAG or FOREVER_TAG + * if the result underflows or overflows when adding the times. + * If the microstep overflows, also return FOREVER_TAG. + */ +tag_t lf_tag_add(tag_t a, tag_t b); + /** * Compare two tags. Return -1 if the first is less than * the second, 0 if they are equal, and +1 if the first is diff --git a/include/core/trace.h b/include/core/trace.h index 7f3a897d6..fcee5efc8 100644 --- a/include/core/trace.h +++ b/include/core/trace.h @@ -439,6 +439,7 @@ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, i * close the files. */ void stop_trace(trace_t* trace); +void stop_trace_locked(trace_t* trace); //////////////////////////////////////////////////////////// //// For federated execution @@ -544,6 +545,7 @@ typedef struct trace_t trace_t; #define start_trace(...) #define stop_trace(...) +#define stop_trace_locked(...) #define trace_new(...) NULL #define trace_free(...) diff --git a/include/core/utils/util.h b/include/core/utils/util.h index 7d90d7420..d93f5de24 100644 --- a/include/core/utils/util.h +++ b/include/core/utils/util.h @@ -124,12 +124,9 @@ extern int _lf_my_fed_id; int lf_fed_id(void); /** - * Report an informational message on stdout with - * a newline appended at the end. - * If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). + * Report an informational message on stdout with a newline appended at the end. + * If this execution is federated, then the message will be prefaced by identifying + * information for the federate. The arguments are just like printf(). */ void lf_print(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); @@ -139,12 +136,9 @@ void lf_print(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); void lf_vprint(const char* format, va_list args) ATTRIBUTE_FORMAT_PRINTF(1, 0); /** - * Report an log message on stdout with the prefix - * "LOG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). + * Report an log message on stdout with the prefix "LOG: " and a newline appended + * at the end. If this execution is federated, then the message will be prefaced by + * identifying information for the federate. The arguments are just like printf(). */ void lf_print_log(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); @@ -176,12 +170,9 @@ void lf_vprint_log(const char* format, va_list args) ATTRIBUTE_FORMAT_PRINTF(1, } } while (0) /** - * Report an debug message on stdout with the prefix - * "DEBUG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). + * Report an debug message on stdout with the prefix "DEBUG: " and a newline appended + * at the end. If this execution is federated, then the message will be prefaced by + * identifying information for the federate. The arguments are just like printf(). */ void lf_print_debug(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); diff --git a/test/src_gen_stub.c b/test/src_gen_stub.c index 6ac14c84e..46f451096 100644 --- a/test/src_gen_stub.c +++ b/test/src_gen_stub.c @@ -1,8 +1,22 @@ #include #include "tag.h" +#include "environment.h" + +/** + * This file enables unit tests to run without there having been an actual code generation + * from a Lingua Franca program. It defines (mostly empty) functions that would normally be + * code generated. Of course, this strategy will only work for tests that do not actually + * need functional versions of these functions. + */ + +environment_t _env; void _lf_initialize_trigger_objects() {} void terminate_execution() {} void _lf_set_default_command_line_options() {} void _lf_initialize_watchdog_mutexes() {} void logical_tag_complete(tag_t tag_to_send) {} +int _lf_get_environments(environment_t ** envs) { + *envs = &_env; + return 1; +} \ No newline at end of file