diff --git a/core/federated/RTI/CMakeLists.txt b/core/federated/RTI/CMakeLists.txt index 9208f6df0..fc33b47bc 100644 --- a/core/federated/RTI/CMakeLists.txt +++ b/core/federated/RTI/CMakeLists.txt @@ -69,6 +69,8 @@ add_executable( ${CoreLib}/utils/util.c ${CoreLib}/tag.c ${CoreLib}/federated/net_util.c + ${CoreLib}/utils/pqueue_base.c + ${CoreLib}/utils/pqueue_tag.c ${CoreLib}/utils/pqueue.c message_record/message_record.c ) diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 941590285..fdc234ced 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -194,7 +194,7 @@ int process_args(int argc, const char* argv[]) { return 0; } rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines - lf_print("RTI: Number of federates: %d\n", rti.base.number_of_scheduling_nodes); + lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { if (argc < i + 2) { lf_print_error( @@ -284,6 +284,7 @@ int main(int argc, const char* argv[]) { int socket_descriptor = start_rti_server(rti.user_specified_port); wait_for_federates(socket_descriptor); + free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes); lf_print("RTI is exiting."); return 0; } diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index 4c55cdb16..2151a6564 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -33,6 +33,16 @@ void initialize_rti_common(rti_common_t * _rti_common) { // Currently, all calls to tracepoint_from_federate() and // tracepoint_to_federate() are in rti_lib.c +#define IS_IN_ZERO_DELAY_CYCLE 1 +#define IS_IN_CYCLE 2 + +void invalidate_min_delays_upstream(scheduling_node_t* node) { + if(node->min_delays != NULL) free(node->min_delays); + node->min_delays = NULL; + node->num_min_delays = 0; + node->flags = 0; // All flags cleared because they get set lazily. +} + void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { e->id = id; e->completed = NEVER_TAG; @@ -46,7 +56,7 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { e->downstream = NULL; e->num_downstream = 0; e->mode = REALTIME; - + invalidate_min_delays_upstream(e); } void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { @@ -73,6 +83,36 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { lf_mutex_unlock(rti_common->mutex); } +tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { + // First, we need to find the shortest path (minimum delay) path to each upstream node + // and then find the minimum of the node's recorded NET plus the minimum path delay. + // Update the shortest paths, if necessary. + update_min_delays_upstream(e); + + // Next, find the tag of the earliest possible incoming message from upstream enclaves or + // federates, which will be the smallest upstream NET plus the least delay. + // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. + tag_t t_d = FOREVER_TAG; + for (int i = 0; i < e->num_min_delays; i++) { + // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. + scheduling_node_t* upstream = rti_common->scheduling_nodes[e->min_delays[i].id]; + // If we haven't heard from the upstream node, then assume it can send an event at the start time. + if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { + tag_t start_tag = {.time = start_time, .microstep = 0}; + upstream->next_event = start_tag; + } + tag_t earliest_tag_from_upstream = lf_tag_add(upstream->next_event, e->min_delays[i].min_delay); + LF_PRINT_DEBUG("RTI: Earliest next event upstream of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", + e->id, + upstream->id, + earliest_tag_from_upstream.time - start_time, earliest_tag_from_upstream.microstep); + if (lf_tag_compare(earliest_tag_from_upstream, t_d) < 0) { + t_d = earliest_tag_from_upstream; + } + } + return t_d; +} + tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { tag_advance_grant_t result = {.tag = NEVER_TAG, .is_provisional = false}; @@ -82,7 +122,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { for (int j = 0; j < e->num_upstream; j++) { scheduling_node_t *upstream = rti_common->scheduling_nodes[e->upstream[j]]; - // Ignore this enclave if it no longer connected. + // Ignore this enclave/federate if it is not connected. if (upstream->state == NOT_CONNECTED) continue; // Adjust by the "after" delay. @@ -109,91 +149,56 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { // If all (transitive) upstream scheduling_nodes of the enclave // have earliest event tags such that the // enclave can now advance its tag, then send it a TAG message. - // Find the earliest event time of each such upstream enclave, - // adjusted by delays on the connections. - - // To handle cycles, need to create a boolean array to keep - // track of which upstream enclave have been visited. - bool *visited = (bool *)calloc(rti_common->number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. + // Find the tag of the earliest event that may be later received from an upstream enclave + // or federate (which includes any after delays on the connections). + tag_t t_d = earliest_future_incoming_message_tag(e); - // Find the tag of the earliest possible incoming message from - // upstream enclaves. - tag_t t_d_nonzero_delay = FOREVER_TAG; - // The tag of the earliest possible incoming message from a zero-delay connection. - // Delayed connections are not guarded from STP violations by the MLAA; this property is - // acceptable because delayed connections impose no deadlock risk and in some cases (startup) - // this property is necessary to avoid deadlocks. However, it requires some special care here - // when potentially sending a PTAG because we must not send a PTAG for a tag at which data may - // still be received over nonzero-delay connections. - tag_t t_d_zero_delay = FOREVER_TAG; LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG, FOREVER_TAG.time - start_time, FOREVER_TAG.microstep, NEVER_TAG.time - start_time, 0); - for (int j = 0; j < e->num_upstream; j++) { - scheduling_node_t *upstream = rti_common->scheduling_nodes[e->upstream[j]]; - - // Ignore this enclave if it is no longer connected. - if (upstream->state == NOT_CONNECTED) continue; - - // Find the (transitive) next event tag upstream. - tag_t upstream_next_event = transitive_next_event( - upstream, upstream->next_event, visited); - - LF_PRINT_DEBUG("RTI: Earliest next event upstream of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".", - e->id, - upstream->id, - upstream_next_event.time - start_time, upstream_next_event.microstep); + LF_PRINT_LOG("RTI: Earliest next event upstream of node %d has tag " PRINTF_TAG ".", + e->id, t_d.time - start_time, t_d.microstep); - // Adjust by the "after" delay. - // Note that "no delay" is encoded as NEVER, - // whereas one microstep delay is encoded as 0LL. - tag_t candidate = lf_delay_strict(upstream_next_event, e->upstream_delay[j]); + // Given an EIMT (earliest incoming message tag) there are these possible scenarios: + // 1) The EIMT is greater than the NET we want to advance to. Grant a TAG. + // 2) The EIMT is equal to the NET and the federate is part of a zero-delay cycle (ZDC). + // 3) The EIMT is equal to the NET and the federate is not part of a ZDC. + // 4) The EIMT is less than the NET + // In (1) we can give a TAG to NET. In (2) we can give a PTAG. + // In (3) and (4), we wait for further updates from upstream federates. - if (e->upstream_delay[j] == NEVER) { - if (lf_tag_compare(candidate, t_d_zero_delay) < 0) { - t_d_zero_delay = candidate; - } - } else { - if (lf_tag_compare(candidate, t_d_nonzero_delay) < 0) { - t_d_nonzero_delay = candidate; - } - } - } - free(visited); - tag_t t_d = (lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0) ? t_d_zero_delay : t_d_nonzero_delay; - - LF_PRINT_LOG("RTI: Earliest next event upstream has tag " PRINTF_TAG ".", - t_d.time - start_time, t_d.microstep); - - if ( - lf_tag_compare(t_d, e->next_event) > 0 // The enclave has something to do. + if ( // Scenario (1) above + lf_tag_compare(t_d, e->next_event) > 0 // EIMT greater than NET && lf_tag_compare(t_d, e->last_provisionally_granted) >= 0 // The grant is not redundant // (equal is important to override any previous // PTAGs). - && lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant. + && lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant. ) { - // All upstream scheduling_nodes have events with a larger tag than fed, so it is safe to send a TAG. - LF_PRINT_LOG("Earliest upstream message time for fed/encl %d is " PRINTF_TAG - "(adjusted by after delay). Granting tag advance for " PRINTF_TAG, + // No upstream node can send events that will be received with a tag less than or equal to + // e->next_event, so it is safe to send a TAG. + LF_PRINT_LOG("RTI: Earliest upstream message time for fed/encl %d is " PRINTF_TAG + "(adjusted by after delay). Granting tag advance (TAG) for " PRINTF_TAG, e->id, t_d.time - lf_time_start(), t_d.microstep, e->next_event.time - lf_time_start(), e->next_event.microstep); result.tag = e->next_event; - } else if ( - lf_tag_compare(t_d_zero_delay, e->next_event) == 0 // The enclave has something to do. - && lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0 // The statuses of nonzero-delay connections are known at tag t_d_zero_delay - && lf_tag_compare(t_d_zero_delay, e->last_provisionally_granted) > 0 // The grant is not redundant. - && lf_tag_compare(t_d_zero_delay, e->last_granted) > 0 // The grant is not redundant. - ) { - // Some upstream scheduling_nodes has an event that has the same tag as fed's next event, so we can only provisionally - // grant a TAG (via a PTAG). - LF_PRINT_LOG("Earliest upstream message time for fed/encl %d is " PRINTF_TAG - " (adjusted by after delay). Granting provisional tag advance.", + } else if( // Scenario (2) or (3) above + lf_tag_compare(t_d, e->next_event) >= 0 // EIMT greater than or equal to NET + && is_in_zero_delay_cycle(e) // The node is part of a ZDC + && lf_tag_compare(t_d, e->last_provisionally_granted) >= 0 // The grant is not redundant + && lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant. + ) { + // Some upstream node may send an event that has the same tag as this node's next event, + // so we can only grant a PTAG. + LF_PRINT_LOG("RTI: Earliest upstream message time for fed/encl %d is " PRINTF_TAG + " (adjusted by after delay). Granting provisional tag advance (PTAG) for " PRINTF_TAG, e->id, - t_d_zero_delay.time - start_time, t_d_zero_delay.microstep); - result.tag = t_d_zero_delay; + t_d.time - start_time, t_d.microstep, + e->next_event.time - lf_time_start(), + e->next_event.microstep); + result.tag = e->next_event; result.is_provisional = true; } return result; @@ -224,10 +229,13 @@ void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t ne // nor expect a reply. It just proceeds to advance time. if (e->num_upstream > 0) { notify_advance_grant_if_safe(e); + } else { + // Even though there was no grant, mark the tag as if there was. + e->last_granted = next_event_tag; } // Check downstream scheduling_nodes to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep - // track of which upstream scheduling_nodes have been visited. + // track of which downstream scheduling_nodes have been visited. bool *visited = (bool *)calloc(rti_common->number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. notify_downstream_advance_grant_if_safe(e, visited); free(visited); @@ -245,44 +253,101 @@ void notify_advance_grant_if_safe(scheduling_node_t* e) { } } -tag_t transitive_next_event(scheduling_node_t* e, tag_t candidate, bool visited[]) { - if (visited[e->id] || e->state == NOT_CONNECTED) { - // Enclave has stopped executing or we have visited it before. +// Local function used recursively to find minimum delays upstream. +// Return in count the number of non-FOREVER_TAG entries in path_delays[]. +static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_t* intermediate, tag_t path_delays[], size_t* count) { + // On first call, intermediate will be NULL, so the path delay is initialized to zero. + tag_t delay_from_intermediate_so_far = ZERO_TAG; + if (intermediate == NULL) { + intermediate = end; + } else { + // Not the first call, so intermediate is upstream of end. + delay_from_intermediate_so_far = path_delays[intermediate->id]; + } + if (intermediate->state == NOT_CONNECTED) { + // Enclave or federate is not connected. // No point in checking upstream scheduling_nodes. - return candidate; + return; } - - visited[e->id] = true; - tag_t result = e->next_event; - - // If the candidate is less than this enclave's next_event, use the candidate. - if (lf_tag_compare(candidate, result) < 0) { - result = candidate; + // Check nodes upstream of intermediate (or end on first call). + // NOTE: It would be better to iterate through these sorted by minimum delay, + // but for most programs, the gain might be negligible since there are relatively few + // upstream nodes. + for (int i = 0; i < intermediate->num_upstream; i++) { + // Add connection delay to path delay so far. + tag_t path_delay = lf_delay_tag(delay_from_intermediate_so_far, intermediate->upstream_delay[i]); + // If the path delay is less than the so-far recorded path delay from upstream, update upstream. + if (lf_tag_compare(path_delay, path_delays[intermediate->upstream[i]]) < 0) { + if (path_delays[intermediate->upstream[i]].time == FOREVER) { + // Found a finite path. + *count = *count + 1; + } + path_delays[intermediate->upstream[i]] = path_delay; + // Since the path delay to upstream has changed, recursively update those upstream of it. + // Do not do this, however, if the upstream node is the end node because this means we have + // completed a cycle. + if (end->id != intermediate->upstream[i]) { + _update_min_delays_upstream(end, rti_common->scheduling_nodes[intermediate->upstream[i]], path_delays, count); + } else { + // Found a cycle. + end->flags = end->flags | IS_IN_CYCLE; + // Is it a zero-delay cycle? + if (lf_tag_compare(path_delay, ZERO_TAG) == 0 && intermediate->upstream_delay[i] < 0) { + end->flags = end->flags | IS_IN_ZERO_DELAY_CYCLE; + } else { + // Clear the flag. + end->flags = end->flags & ~IS_IN_ZERO_DELAY_CYCLE; + } + } + } } +} - // The result cannot be earlier than the start time. - if (result.time < start_time) { - // Earliest next event cannot be before the start time. - result = (tag_t){.time = start_time, .microstep = 0u}; - } +void update_min_delays_upstream(scheduling_node_t* node) { + // Check whether cached result is valid. + if (node->min_delays == NULL) { - // Check upstream scheduling_nodes to see whether any of them might send - // an event that would result in an earlier next event. - for (int i = 0; i < e->num_upstream; i++) { - tag_t upstream_result = transitive_next_event( - rti_common->scheduling_nodes[e->upstream[i]], result, visited); + // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. + // There must be a name for this algorithm. - // Add the "after" delay of the connection to the result. - upstream_result = lf_delay_tag(upstream_result, e->upstream_delay[i]); + // Array of results on the stack: + tag_t path_delays[rti_common->number_of_scheduling_nodes]; + // This will be the number of non-FOREVER entries put into path_delays. + size_t count = 0; - // If the adjusted event time is less than the result so far, update the result. - if (lf_tag_compare(upstream_result, result) < 0) { - result = upstream_result; + for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { + path_delays[i] = FOREVER_TAG; + } + _update_min_delays_upstream(node, NULL, path_delays, &count); + + // Put the results onto the node's struct. + node->num_min_delays = count; + node->min_delays = (minimum_delay_t*)malloc(count * sizeof(minimum_delay_t)); + LF_PRINT_DEBUG("++++ Node %hu(is in ZDC: %d\n", node->id, node->flags & IS_IN_ZERO_DELAY_CYCLE); + int k = 0; + for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { + if (lf_tag_compare(path_delays[i], FOREVER_TAG) < 0) { + // Node i is upstream. + if (k >= count) { + lf_print_error_and_exit("Internal error! Count of upstream nodes %zu for node %d is wrong!", count, i); + } + minimum_delay_t min_delay = {.id = i, .min_delay = path_delays[i]}; + node->min_delays[k++] = min_delay; + // N^2 debug statement could be a problem with large benchmarks. + // LF_PRINT_DEBUG("++++ Node %hu is upstream with delay" PRINTF_TAG "\n", i, path_delays[i].time, path_delays[i].microstep); + } } } - if (lf_tag_compare(result, e->completed) < 0) { - result = e->completed; - } - return result; } + +bool is_in_zero_delay_cycle(scheduling_node_t* node) { + update_min_delays_upstream(node); + return node->flags & IS_IN_ZERO_DELAY_CYCLE; +} + +bool is_in_cycle(scheduling_node_t* node) { + update_min_delays_upstream(node); + return node->flags & IS_IN_CYCLE; +} + #endif diff --git a/core/federated/RTI/rti_common.h b/core/federated/RTI/rti_common.h index a789ad936..d71751a98 100644 --- a/core/federated/RTI/rti_common.h +++ b/core/federated/RTI/rti_common.h @@ -34,6 +34,12 @@ typedef enum scheduling_node_state_t { PENDING // Waiting for upstream scheduling nodes. } scheduling_node_state_t; +/** Struct for minimum delays from upstream nodes. */ +typedef struct minimum_delay_t { + int id; // ID of the upstream node. + tag_t min_delay; // Minimum delay from upstream. +} minimum_delay_t; + /** * Information about the scheduling nodes coordinated by the RTI. * The abstract scheduling node could either be an enclave or a federate. @@ -59,6 +65,9 @@ typedef struct scheduling_node_t { int* downstream; // Array of downstream scheduling node ids. int num_downstream; // Size of the array of downstream scheduling nodes. execution_mode_t mode; // FAST or REALTIME. + minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node. + size_t num_min_delays; // Size of min_delays array. + int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE } scheduling_node_t; /** @@ -162,7 +171,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag); void notify_advance_grant_if_safe(scheduling_node_t* e); /** - * Nontify a provisional tag advance grant (PTAG) message to the specified scheduling node. + * Notify a provisional tag advance grant (PTAG) message to the specified scheduling node. * Do not notify it if a previously sent PTAG or TAG was greater or equal. * * This function will keep a record of this PTAG in the node's last_provisionally_granted @@ -207,7 +216,6 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag); */ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e); - /** * @brief Update the next event tag of an scheduling node. * @@ -221,29 +229,49 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e); void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t next_event_tag); /** - * Find the earliest tag at which the specified federate may - * experience its next event. This is the least next event tag (NET) - * of the specified federate and (transitively) upstream federates - * (with delays of the connections added). For upstream federates, - * we assume (conservatively) that federate upstream of those - * may also send an event. The result will never be less than - * the completion time of the federate (which may be NEVER, - * if the federate has not yet completed a logical time). - * - * FIXME: This could be made less conservative by building - * at code generation time a causality interface table indicating - * which outputs can be triggered by which inputs. For now, we - * assume any output can be triggered by any input. - * - * @param e The scheduling node. - * @param candidate A candidate tag (for the first invocation, - * this should be fed->next_event). - * @param visited An array of booleans indicating which federates - * have been visited (for the first invocation, this should be - * an array of falses of size _RTI.number_of_federates). - * @return The earliest next event tag of the scheduling node e. - */ -tag_t transitive_next_event(scheduling_node_t *e, tag_t candidate, bool visited[]); + * Given a node (enclave or federate), find the tag of the earliest possible incoming + * message from upstream enclaves or federates, which will be the smallest upstream NET + * plus the least delay. This could be NEVER_TAG if the RTI has not seen a NET from some + * upstream node. + * @param e The target node. + * @return The earliest possible incoming message tag. + */ +tag_t earliest_future_incoming_message_tag(scheduling_node_t* e); + +/** + * Return true if the node is in a zero-delay cycle. + * @param node The node. + */ +bool is_in_zero_delay_cycle(scheduling_node_t* node); + +/** + * Return true if the node is in a cycle (possibly a zero-delay cycle). + * @param node The node. + */ +bool is_in_cycle(scheduling_node_t* node); + +/** + * For the given scheduling node (enclave or federate), if necessary, update the `min_delays`, + * `num_min_delays`, and the fields that indicate cycles. These fields will be + * updated only if they have not been previously updated or if invalidate_min_delays_upstream + * has been called since they were last updated. + * @param node The node. + */ +void update_min_delays_upstream(scheduling_node_t* node); + +/** + * For the given scheduling node (enclave or federate), invalidate the `min_delays`, + * `num_min_delays`, and the fields that indicate cycles. + * This should be called whenever the structure of the connections upstream of the + * given node have changed. + * @param node The node. + */ +void invalidate_min_delays_upstream(scheduling_node_t* node); + +/** + * Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself. + */ +void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes); #endif // RTI_COMMON_H #endif // STANDALONE_RTI || LF_ENCLAVES diff --git a/core/federated/RTI/rti_local.c b/core/federated/RTI/rti_local.c index 7ca63a873..1f6cc0928 100644 --- a/core/federated/RTI/rti_local.c +++ b/core/federated/RTI/rti_local.c @@ -35,7 +35,7 @@ static rti_local_t * rti_local; lf_mutex_t rti_mutex; void initialize_local_rti(environment_t *envs, int num_envs) { - rti_local = malloc(sizeof(rti_local_t)); + rti_local = (rti_local_t*)malloc(sizeof(rti_local_t)); LF_ASSERT(rti_local, "Out of memory"); initialize_rti_common(&rti_local->base); @@ -60,6 +60,11 @@ void initialize_local_rti(environment_t *envs, int num_envs) { } } +void free_local_rti() { + free_scheduling_nodes(rti_local->base.scheduling_nodes, rti_local->base.number_of_scheduling_nodes); + free(rti_local); +} + void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * env) { initialize_scheduling_node(&enclave->base, idx); @@ -186,4 +191,9 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ", e->id, tag.time - lf_time_start(), tag.microstep); } + +void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { + // Nothing to do here. +} + #endif //LF_ENCLAVES diff --git a/core/federated/RTI/rti_local.h b/core/federated/RTI/rti_local.h index 9420d49c8..8960ad1b1 100644 --- a/core/federated/RTI/rti_local.h +++ b/core/federated/RTI/rti_local.h @@ -29,10 +29,14 @@ typedef struct { /** * @brief Dynamically create and initialize the local RTI. - * */ void initialize_local_rti(environment_t* envs, int num_envs); +/** + * @brief Free memory associated with the local the RTI and the local RTI iself. + */ +void free_local_rti(); + /** * @brief Initialize the enclave object. * @@ -41,7 +45,8 @@ void initialize_local_rti(environment_t* envs, int num_envs); void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *env); /** - * @brief This function call may block. A call to this function serves two purposes. + * @brief Notify the local RTI of a next event tag (NET). + * This function call may block. A call to this function serves two purposes. * 1) It is a promise that, unless receiving events from other enclaves, this * enclave will not produce any event until the next_event_tag (NET) argument. * 2) It is a request for permission to advance the logical tag of the enclave diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index d6e6285ab..2fce8b1bf 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -250,30 +250,25 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // Send PTAG to all upstream federates, if they have not had // a later or equal PTAG or TAG sent previously and if their transitive // NET is greater than or equal to the tag. + // This is needed to stimulate absent messages from upstream and break deadlocks. // NOTE: This could later be replaced with a TNET mechanism once // we have an available encoding of causality interfaces. // That might be more efficient. + // NOTE: This is not needed for enclaves because zero-delay loops are prohibited. + // It's only needed for federates, which is why this is implemented here. for (int j = 0; j < e->num_upstream; j++) { - federate_info_t* upstream = GET_FED_INFO(e->upstream[j]); + scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]]; // Ignore this federate if it has resigned. - if (upstream->enclave.state == NOT_CONNECTED) continue; - // To handle cycles, need to create a boolean array to keep - // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. - - // Find the (transitive) next event tag upstream. - tag_t upstream_next_event = transitive_next_event( - &(upstream->enclave), upstream->enclave.next_event, visited); - free(visited); - // If these tags are equal, then - // a TAG or PTAG should have already been granted, - // in which case, another will not be sent. But it - // may not have been already granted. - if (lf_tag_compare(upstream_next_event, tag) >= 0) { - notify_provisional_tag_advance_grant(&(upstream->enclave), tag); - } + if (upstream->state == NOT_CONNECTED) continue; + + tag_t earliest = earliest_future_incoming_message_tag(upstream); + // If these tags are equal, then a TAG or PTAG should have already been granted, + // in which case, another will not be sent. But it may not have been already granted. + if (lf_tag_compare(earliest, tag) >= 0) { + notify_provisional_tag_advance_grant(upstream, tag); + } } } } @@ -1265,14 +1260,22 @@ int receive_connection_information(int socket_id, uint16_t fed_id) { fed_id); // Allocate memory for the upstream and downstream pointers - fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream); - fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream); - - // Allocate memory for the upstream delay pointers - fed->enclave.upstream_delay = - (interval_t*)malloc( - sizeof(interval_t) * fed->enclave.num_upstream - ); + if (fed->enclave.num_upstream > 0) { + fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream); + // Allocate memory for the upstream delay pointers + fed->enclave.upstream_delay = + (interval_t*)malloc( + sizeof(interval_t) * fed->enclave.num_upstream + ); + } else { + fed->enclave.upstream = (int*)NULL; + fed->enclave.upstream_delay = (interval_t*)NULL; + } + if (fed->enclave.num_downstream > 0) { + fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream); + } else { + fed->enclave.downstream = (int*)NULL; + } size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_upstream) + (sizeof(uint16_t) * fed->enclave.num_downstream); @@ -1646,4 +1649,15 @@ void initialize_RTI(rti_remote_t *rti){ rti_remote->base.tracing_enabled = false; rti_remote->stop_in_progress = false; } + +void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { + for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) { + // FIXME: Gives error freeing memory not allocated!!!! + scheduling_node_t* node = scheduling_nodes[i]; + if (node->upstream != NULL) free(node->upstream); + if (node->downstream != NULL) free(node->downstream); + } + free(scheduling_nodes); +} + #endif // STANDALONE_RTI diff --git a/core/reactor_common.c b/core/reactor_common.c index 5422d1453..eb31c761e 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -1749,7 +1749,7 @@ void termination(void) { continue; } // Stop any tracing, if it is running. - stop_trace(env->trace); + stop_trace_locked(env->trace); _lf_start_time_step(env); diff --git a/core/tag.c b/core/tag.c index 0fe48817d..eee10699f 100644 --- a/core/tag.c +++ b/core/tag.c @@ -116,14 +116,22 @@ tag_t lf_tag(void *env) { return ((environment_t *)env)->current_tag; } +tag_t lf_tag_add(tag_t a, tag_t b) { + if (a.time == NEVER || b.time == NEVER) return NEVER_TAG; + if (a.time == FOREVER || b.time == FOREVER) return FOREVER_TAG; + tag_t result = {.time = a.time + b.time, .microstep = a.microstep + b.microstep}; + if (result.microstep < a.microstep) return FOREVER_TAG; + if (result.time < a.time && b.time > 0) return FOREVER_TAG; + if (result.time > a.time && b.time < 0) return NEVER_TAG; + return result; +} + int lf_tag_compare(tag_t tag1, tag_t tag2) { if (tag1.time < tag2.time) { - LF_PRINT_DEBUG(PRINTF_TIME " < " PRINTF_TIME, tag1.time, tag2.time); return -1; } else if (tag1.time > tag2.time) { return 1; } else if (tag1.microstep < tag2.microstep) { - LF_PRINT_DEBUG(PRINTF_TIME " and microstep < " PRINTF_TIME, tag1.time, tag2.time); return -1; } else if (tag1.microstep > tag2.microstep) { return 1; @@ -134,7 +142,8 @@ int lf_tag_compare(tag_t tag1, tag_t tag2) { tag_t lf_delay_tag(tag_t tag, interval_t interval) { if (tag.time == NEVER || interval < 0LL) return tag; - if (tag.time >= FOREVER - interval) return tag; + // Note that overflow in C is undefined for signed variables. + if (tag.time >= FOREVER - interval) return FOREVER_TAG; // Overflow. tag_t result = tag; if (interval == 0LL) { // Note that unsigned variables will wrap on overflow. @@ -142,12 +151,7 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { // microsteps. result.microstep++; } else { - // Note that overflow in C is undefined for signed variables. - if (FOREVER - interval < result.time) { - result.time = FOREVER; - } else { - result.time += interval; - } + result.time += interval; result.microstep = 0; } return result; @@ -156,7 +160,6 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { tag_t lf_delay_strict(tag_t tag, interval_t interval) { tag_t result = lf_delay_tag(tag, interval); if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) { - LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time); result.time -= 1; result.microstep = UINT_MAX; } @@ -168,9 +171,6 @@ instant_t lf_time_logical(void *env) { return ((environment_t *) env)->current_tag.time; } -/** - * Return the elapsed logical time in nanoseconds since the start of execution. - */ interval_t lf_time_logical_elapsed(void *env) { return lf_time_logical(env) - start_time; } diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index bafa81949..6ebf6c8c6 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -924,8 +924,9 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) #ifdef FEDERATED stall_advance_level_federation(env, *next_reaction_level); #endif - *next_reaction_level += 1; + if (*next_reaction_level < SIZE_MAX) *next_reaction_level += 1; } + /** * The main looping logic of each LF worker thread. * This function assumes the caller holds the mutex lock. @@ -1216,6 +1217,9 @@ int lf_reactor_c_main(int argc, const char* argv[]) { LF_PRINT_LOG("---- All worker threads exited successfully."); } } +#if defined LF_ENCLAVES + free_local_rti(); +#endif return 0; } diff --git a/core/trace.c b/core/trace.c index 144e9baf7..6fffbc7bf 100644 --- a/core/trace.c +++ b/core/trace.c @@ -459,6 +459,11 @@ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, i void stop_trace(trace_t* trace) { lf_critical_section_enter(trace->env); + stop_trace_locked(trace); + lf_critical_section_exit(trace->env); +} + +void stop_trace_locked(trace_t* trace) { if (trace->_lf_trace_stop) { // Trace was already stopped. Nothing to do. return; @@ -480,7 +485,6 @@ void stop_trace(trace_t* trace) { fclose(trace->_lf_trace_file); trace->_lf_trace_file = NULL; LF_PRINT_DEBUG("Stopped tracing."); - lf_critical_section_exit(trace->env); } //////////////////////////////////////////////////////////// diff --git a/core/utils/CMakeLists.txt b/core/utils/CMakeLists.txt index 41e96ff50..fea1e5468 100644 --- a/core/utils/CMakeLists.txt +++ b/core/utils/CMakeLists.txt @@ -1,4 +1,4 @@ -set(UTIL_SOURCES vector.c pqueue.c util.c semaphore.c) +set(UTIL_SOURCES vector.c pqueue_base.c pqueue_tag.c pqueue.c util.c semaphore.c) list(APPEND INFO_SOURCES ${UTIL_SOURCES}) diff --git a/core/utils/pqueue.c b/core/utils/pqueue.c index aa1f1bd15..f7fe4bb67 100644 --- a/core/utils/pqueue.c +++ b/core/utils/pqueue.c @@ -1,473 +1,65 @@ -/* - * Copyright (c) 2014, Volkan Yazıcı - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Modified by Marten Lohstroh (May, 2019). - * Changes: - * - Require implementation of a pqueue_eq_elem_f function to determine - * whether two elements are equal or not; and - * - The provided pqueue_eq_elem_f implementation is used to test and - * search for equal elements present in the queue; and - * - Removed capability to reassign priorities. +/** + * @file pqueue.c + * @author Marten Lohstroh + * @author Edward A. Lee + * @copyright (c) 2020-2023, The University of California at Berkeley. + * License: BSD 2-clause + * + * @brief Priority queue definitions for the event queue and reaction queue. */ -#include -#include -#include -#include -#include - #include "platform.h" #include "pqueue.h" #include "util.h" #include "lf_types.h" -#define LF_LEFT(i) ((i) << 1) -#define LF_RIGHT(i) (((i) << 1) + 1) -#define LF_PARENT(i) ((i) >> 1) - -/** - * Find an element in the queue that matches the given element up to - * and including the given maximum priority. - */ -void* find_equal(pqueue_t *q, void *e, int pos, pqueue_pri_t max) { - if (pos < 0) { - lf_print_error_and_exit("find_equal() called with a negative pos index."); - } - - // Stop the recursion when we've reached the end of the - // queue. This has to be done before accessing the queue - // to avoid segmentation fault. - if (!q || (size_t)pos >= q->size) { - return NULL; - } - - void* rval; - void* curr = q->d[pos]; - - // Stop the recursion when we've surpassed the maximum priority. - if (!curr || q->cmppri(q->getpri(curr), max)) { - return NULL; - } - - if (q->eqelem(curr, e)) { - return curr; - } else { - rval = find_equal(q, e, LF_LEFT(pos), max); - if (rval) - return rval; - else - return find_equal(q, e, LF_RIGHT(pos), max); - } - return NULL; -} - -/** - * Find an element in the queue that matches the given element up to - * but not including the given maximum priority. The matching element - * has to _also_ have the same priority. - */ -void* find_equal_same_priority(pqueue_t *q, void *e, int pos) { - if (pos < 0) { - lf_print_error_and_exit("find_equal_same_priority() called with a negative pos index."); - } - - // Stop the recursion when we've reached the end of the - // queue. This has to be done before accessing the queue - // to avoid segmentation fault. - if (!q || (size_t)pos >= q->size) { - return NULL; - } - - void* rval; - void* curr = q->d[pos]; - - // Stop the recursion once we've surpassed the priority of the element - // we're looking for. - if (!curr || q->cmppri(q->getpri(curr), q->getpri(e))) { - return NULL; - } - - if (q->getpri(curr) == q->getpri(e) && q->eqelem(curr, e)) { - return curr; - } else { - rval = find_equal_same_priority(q, e, LF_LEFT(pos)); - if (rval) - return rval; - else - return find_equal_same_priority(q, e, LF_RIGHT(pos)); - } - - // for (int i=1; i < q->size; i++) { - // if (q->d[i] == e) { - // return q->d[i]; - // } - // } - return NULL; -} - -pqueue_t * pqueue_init(size_t n, - pqueue_cmp_pri_f cmppri, - pqueue_get_pri_f getpri, - pqueue_get_pos_f getpos, - pqueue_set_pos_f setpos, - pqueue_eq_elem_f eqelem, - pqueue_print_entry_f prt) { - pqueue_t *q; - - if (!(q = (pqueue_t*)malloc(sizeof(pqueue_t)))) - return NULL; - - /* Need to allocate n+1 elements since element 0 isn't used. */ - if (!(q->d = (void**)malloc((n + 1) * sizeof(void *)))) { - free(q); - return NULL; - } - - q->size = 1; - q->avail = q->step = (n+1); /* see comment above about n+1 */ - q->cmppri = cmppri; - q->getpri = getpri; - q->getpos = getpos; - q->setpos = setpos; - q->eqelem = eqelem; - q->prt = prt; - return q; -} - -void pqueue_free(pqueue_t *q) { - free(q->d); - free(q); -} - -size_t pqueue_size(pqueue_t *q) { - if (!q) return 0; - // Queue element 0 exists but doesn't count since it isn't used. - return (q->size - 1); -} - -static size_t maxchild(pqueue_t *q, size_t i) { - size_t child_node = LF_LEFT(i); - - if (child_node >= q->size) - return 0; - - if ((child_node+1) < q->size && - (q->cmppri(q->getpri(q->d[child_node]), q->getpri(q->d[child_node+1])))) - child_node++; /* use right child instead of left */ - - return child_node; -} - -static size_t bubble_up(pqueue_t *q, size_t i) { - size_t parent_node; - void *moving_node = q->d[i]; - pqueue_pri_t moving_pri = q->getpri(moving_node); - - for (parent_node = LF_PARENT(i); - ((i > 1) && q->cmppri(q->getpri(q->d[parent_node]), moving_pri)); - i = parent_node, parent_node = LF_PARENT(i)) - { - q->d[i] = q->d[parent_node]; - q->setpos(q->d[i], i); - } - - q->d[i] = moving_node; - q->setpos(moving_node, i); - return i; -} - -static void percolate_down(pqueue_t *q, size_t i) { - size_t child_node; - void *moving_node = q->d[i]; - pqueue_pri_t moving_pri = q->getpri(moving_node); - - while ((child_node = maxchild(q, i)) && - q->cmppri(moving_pri, q->getpri(q->d[child_node]))) - { - q->d[i] = q->d[child_node]; - q->setpos(q->d[i], i); - i = child_node; - } - - q->d[i] = moving_node; - q->setpos(moving_node, i); -} - -void* pqueue_find_equal_same_priority(pqueue_t *q, void *e) { - return find_equal_same_priority(q, e, 1); -} - -void* pqueue_find_equal(pqueue_t *q, void *e, pqueue_pri_t max) { - return find_equal(q, e, 1, max); -} - -int pqueue_insert(pqueue_t *q, void *d) { - void **tmp; - size_t i; - size_t newsize; - - if (!q) return 1; - - /* allocate more memory if necessary */ - if (q->size >= q->avail) { - newsize = q->size + q->step; - if (!(tmp = (void**)realloc(q->d, sizeof(void *) * newsize))) - return 1; - q->d = tmp; - q->avail = newsize; - } - /* insert item and organize the tree */ - i = q->size++; - q->d[i] = d; - bubble_up(q, i); - - return 0; -} - -int pqueue_remove(pqueue_t *q, void *d) { - if (q->size == 1) return 0; // Nothing to remove - size_t posn = q->getpos(d); - q->d[posn] = q->d[--q->size]; - if (q->cmppri(q->getpri(d), q->getpri(q->d[posn]))) - bubble_up(q, posn); - else - percolate_down(q, posn); - - return 0; -} - -void* pqueue_pop(pqueue_t *q) { - if (!q || q->size == 1) - return NULL; - - void* head; - - head = q->d[1]; - q->d[1] = q->d[--q->size]; - percolate_down(q, 1); - - return head; -} - -/** - * @brief Empty 'src' into 'dest'. - * - * As an optimization, this function might swap 'src' and 'dest'. - * - * @param dest The queue to fill up - * @param src The queue to empty - */ -void pqueue_empty_into(pqueue_t** dest, pqueue_t** src) { - assert(src); - assert(dest); - assert(*src); - assert(*dest); - void* item; - if ((*dest)->size >= (*src)->size) { - while ((item = pqueue_pop(*src))) { - pqueue_insert(*dest, item); - } - } else { - while ((item = pqueue_pop(*dest))) { - pqueue_insert(*src, item); - } - - pqueue_t* tmp = *dest; - *dest = *src; - *src = tmp; - } -} - -void* pqueue_peek(pqueue_t *q) { - void *d; - if (!q || q->size == 1) - return NULL; - d = q->d[1]; - return d; -} - -void pqueue_dump(pqueue_t *q, pqueue_print_entry_f print) { - size_t i; - - LF_PRINT_DEBUG("posn\tleft\tright\tparent\tmaxchild\t..."); - for (i = 1; i < q->size ;i++) { - LF_PRINT_DEBUG("%zu\t%zu\t%zu\t%zu\t%ul\t", - i, - LF_LEFT(i), LF_RIGHT(i), LF_PARENT(i), - (unsigned int)maxchild(q, i)); - print(q->d[i]); - } -} - -void pqueue_print(pqueue_t *q, pqueue_print_entry_f print) { - pqueue_t *dup; - void *e; - - dup = pqueue_init(q->size, - q->cmppri, q->getpri, - q->getpos, q->setpos, q->eqelem, q->prt); - dup->size = q->size; - dup->avail = q->avail; - dup->step = q->step; - - memcpy(dup->d, q->d, (q->size * sizeof(void *))); - - while ((e = pqueue_pop(dup))) - print(e); - - pqueue_free(dup); -} - -static int subtree_is_valid(pqueue_t *q, int pos) { - if (pos < 0) { - lf_print_error_and_exit("subtree_is_valid() called with a negative pos index."); - } - - int left_pos = LF_LEFT(pos); - if (left_pos < 0) { - lf_print_error_and_exit("subtree_is_valid(): index overflow detected."); - } - - if ((size_t)left_pos < q->size) { - /* has a left child */ - if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_LEFT(pos)]))) - return 0; - if (!subtree_is_valid(q, LF_LEFT(pos))) - return 0; - } - - int right_pos = LF_RIGHT(pos); - if (right_pos < 0) { - lf_print_error_and_exit("subtree_is_valid(): index overflow detected."); - } - if ((size_t)right_pos < q->size) { - /* has a right child */ - if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_RIGHT(pos)]))) - return 0; - if (!subtree_is_valid(q, LF_RIGHT(pos))) - return 0; - } - return 1; -} - -int pqueue_is_valid(pqueue_t *q) { - return subtree_is_valid(q, 1); -} - -// ********** Priority Queue Support Start - -/** - * Return whether the first and second argument are given in reverse order. - */ int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that); } -/** - * Return false (0) regardless of reaction order. - */ int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) { - return false; + return 0; } -/** - * Return whether or not the given events have matching triggers. - */ -int event_matches(void* next, void* curr) { - return (((event_t*)next)->trigger == ((event_t*)curr)->trigger); +int event_matches(void* event1, void* event2) { + return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger); } -/** - * Return whether or not the given reaction_t pointers - * point to the same struct. - */ -int reaction_matches(void* next, void* curr) { - return (next == curr); +int reaction_matches(void* a, void* b) { + return (a == b); } -/** - * Report a priority equal to the time of the given event. - * Used for sorting pointers to event_t structs in the event queue. - */ -pqueue_pri_t get_event_time(void *a) { - return (pqueue_pri_t)(((event_t*) a)->time); +pqueue_pri_t get_event_time(void *event) { + return (pqueue_pri_t)(((event_t*) event)->time); } -/** - * Report a priority equal to the index of the given reaction. - * Used for sorting pointers to reaction_t structs in the - * blocked and executing queues. - */ -pqueue_pri_t get_reaction_index(void *a) { - return ((reaction_t*) a)->index; +pqueue_pri_t get_reaction_index(void *reaction) { + return ((reaction_t*) reaction)->index; } -/** - * Return the given event's position in the queue. - */ -size_t get_event_position(void *a) { - return ((event_t*) a)->pos; +size_t get_event_position(void *event) { + return ((event_t*) event)->pos; } -/** - * Return the given reaction's position in the queue. - */ -size_t get_reaction_position(void *a) { - return ((reaction_t*) a)->pos; +size_t get_reaction_position(void *reaction) { + return ((reaction_t*) reaction)->pos; } -/** - * Set the given event's position in the queue. - */ -void set_event_position(void *a, size_t pos) { - ((event_t*) a)->pos = pos; +void set_event_position(void *event, size_t pos) { + ((event_t*) event)->pos = pos; } -/** - * Return the given reaction's position in the queue. - */ -void set_reaction_position(void *a, size_t pos) { - ((reaction_t*) a)->pos = pos; +void set_reaction_position(void *reaction, size_t pos) { + ((reaction_t*) reaction)->pos = pos; } -/** - * Print some information about the given reaction. - * - * DEBUG function only. - */ void print_reaction(void *reaction) { reaction_t *r = (reaction_t*)reaction; - LF_PRINT_DEBUG("%s: chain_id:%llu, index: %llx, reaction: %p", + LF_PRINT_DEBUG("%s: chain_id: %llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, r); } -/** - * Print some information about the given event. - * - * DEBUG function only. - */ void print_event(void *event) { event_t *e = (event_t*)event; LF_PRINT_DEBUG("time: " PRINTF_TIME ", trigger: %p, token: %p", diff --git a/core/utils/pqueue_base.c b/core/utils/pqueue_base.c new file mode 100644 index 000000000..9bba5289e --- /dev/null +++ b/core/utils/pqueue_base.c @@ -0,0 +1,361 @@ +/* + * Copyright (c) 2014, Volkan Yazıcı + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Modified by Marten Lohstroh (May, 2019). + * Changes: + * - Require implementation of a pqueue_eq_elem_f function to determine + * whether two elements are equal or not; and + * - The provided pqueue_eq_elem_f implementation is used to test and + * search for equal elements present in the queue; and + * - Removed capability to reassign priorities. + */ + +#include +#include +#include +#include +#include + +#include "pqueue_base.h" +#include "util.h" + +#define LF_LEFT(i) ((i) << 1) +#define LF_RIGHT(i) (((i) << 1) + 1) +#define LF_PARENT(i) ((i) >> 1) + +void* find_equal(pqueue_t *q, void *e, int pos, pqueue_pri_t max) { + if (pos < 0) { + lf_print_error_and_exit("find_equal() called with a negative pos index."); + } + + // Stop the recursion when we've reached the end of the + // queue. This has to be done before accessing the queue + // to avoid segmentation fault. + if (!q || (size_t)pos >= q->size) { + return NULL; + } + + void* rval; + void* curr = q->d[pos]; + + // Stop the recursion when we've surpassed the maximum priority. + if (!curr || q->cmppri(q->getpri(curr), max)) { + return NULL; + } + + if (q->eqelem(curr, e)) { + return curr; + } else { + rval = find_equal(q, e, LF_LEFT(pos), max); + if (rval) + return rval; + else + return find_equal(q, e, LF_RIGHT(pos), max); + } + return NULL; +} + +void* find_equal_same_priority(pqueue_t *q, void *e, int pos) { + if (pos < 0) { + lf_print_error_and_exit("find_equal_same_priority() called with a negative pos index."); + } + + // Stop the recursion when we've reached the end of the + // queue. This has to be done before accessing the queue + // to avoid segmentation fault. + if (!q || (size_t)pos >= q->size) { + return NULL; + } + + void* rval; + void* curr = q->d[pos]; + + // Stop the recursion once we've surpassed the priority of the element + // we're looking for. + if (!curr || q->cmppri(q->getpri(curr), q->getpri(e))) { + return NULL; + } + + if (q->getpri(curr) == q->getpri(e) && q->eqelem(curr, e)) { + return curr; + } else { + rval = find_equal_same_priority(q, e, LF_LEFT(pos)); + if (rval) + return rval; + else + return find_equal_same_priority(q, e, LF_RIGHT(pos)); + } + + // for (int i=1; i < q->size; i++) { + // if (q->d[i] == e) { + // return q->d[i]; + // } + // } + return NULL; +} + +pqueue_t * pqueue_init(size_t n, + pqueue_cmp_pri_f cmppri, + pqueue_get_pri_f getpri, + pqueue_get_pos_f getpos, + pqueue_set_pos_f setpos, + pqueue_eq_elem_f eqelem, + pqueue_print_entry_f prt) { + pqueue_t *q; + + if (!(q = (pqueue_t*)malloc(sizeof(pqueue_t)))) + return NULL; + + /* Need to allocate n+1 elements since element 0 isn't used. */ + if (!(q->d = (void**)malloc((n + 1) * sizeof(void *)))) { + free(q); + return NULL; + } + + q->size = 1; + q->avail = q->step = (n+1); /* see comment above about n+1 */ + q->cmppri = cmppri; + q->getpri = getpri; + q->getpos = getpos; + q->setpos = setpos; + q->eqelem = eqelem; + q->prt = prt; + return q; +} + +void pqueue_free(pqueue_t *q) { + free(q->d); + free(q); +} + +size_t pqueue_size(pqueue_t *q) { + if (!q) return 0; + // Queue element 0 exists but doesn't count since it isn't used. + return (q->size - 1); +} + +static size_t maxchild(pqueue_t *q, size_t i) { + size_t child_node = LF_LEFT(i); + + if (child_node >= q->size) + return 0; + + if ((child_node+1) < q->size && + (q->cmppri(q->getpri(q->d[child_node]), q->getpri(q->d[child_node+1])))) + child_node++; /* use right child instead of left */ + + return child_node; +} + +static size_t bubble_up(pqueue_t *q, size_t i) { + size_t parent_node; + void *moving_node = q->d[i]; + pqueue_pri_t moving_pri = q->getpri(moving_node); + + for (parent_node = LF_PARENT(i); + ((i > 1) && q->cmppri(q->getpri(q->d[parent_node]), moving_pri)); + i = parent_node, parent_node = LF_PARENT(i)) + { + q->d[i] = q->d[parent_node]; + q->setpos(q->d[i], i); + } + + q->d[i] = moving_node; + q->setpos(moving_node, i); + return i; +} + +static void percolate_down(pqueue_t *q, size_t i) { + size_t child_node; + void *moving_node = q->d[i]; + pqueue_pri_t moving_pri = q->getpri(moving_node); + + while ((child_node = maxchild(q, i)) && + q->cmppri(moving_pri, q->getpri(q->d[child_node]))) + { + q->d[i] = q->d[child_node]; + q->setpos(q->d[i], i); + i = child_node; + } + + q->d[i] = moving_node; + q->setpos(moving_node, i); +} + +void* pqueue_find_equal_same_priority(pqueue_t *q, void *e) { + return find_equal_same_priority(q, e, 1); +} + +void* pqueue_find_equal(pqueue_t *q, void *e, pqueue_pri_t max) { + return find_equal(q, e, 1, max); +} + +int pqueue_insert(pqueue_t *q, void *d) { + void **tmp; + size_t i; + size_t newsize; + + if (!q) return 1; + + /* allocate more memory if necessary */ + if (q->size >= q->avail) { + newsize = q->size + q->step; + if (!(tmp = (void**)realloc(q->d, sizeof(void *) * newsize))) + return 1; + q->d = tmp; + q->avail = newsize; + } + /* insert item and organize the tree */ + i = q->size++; + q->d[i] = d; + bubble_up(q, i); + + return 0; +} + +int pqueue_remove(pqueue_t *q, void *d) { + if (q->size == 1) return 0; // Nothing to remove + size_t posn = q->getpos(d); + q->d[posn] = q->d[--q->size]; + if (q->cmppri(q->getpri(d), q->getpri(q->d[posn]))) + bubble_up(q, posn); + else + percolate_down(q, posn); + + return 0; +} + +void* pqueue_pop(pqueue_t *q) { + if (!q || q->size == 1) + return NULL; + + void* head; + + head = q->d[1]; + q->d[1] = q->d[--q->size]; + percolate_down(q, 1); + + return head; +} + +void pqueue_empty_into(pqueue_t** dest, pqueue_t** src) { + assert(src); + assert(dest); + assert(*src); + assert(*dest); + void* item; + if ((*dest)->size >= (*src)->size) { + while ((item = pqueue_pop(*src))) { + pqueue_insert(*dest, item); + } + } else { + while ((item = pqueue_pop(*dest))) { + pqueue_insert(*src, item); + } + + pqueue_t* tmp = *dest; + *dest = *src; + *src = tmp; + } +} + +void* pqueue_peek(pqueue_t *q) { + void *d; + if (!q || q->size == 1) + return NULL; + d = q->d[1]; + return d; +} + +void pqueue_dump(pqueue_t *q, pqueue_print_entry_f print) { + size_t i; + + LF_PRINT_DEBUG("posn\tleft\tright\tparent\tmaxchild\t..."); + for (i = 1; i < q->size ;i++) { + LF_PRINT_DEBUG("%zu\t%zu\t%zu\t%zu\t%ul\t", + i, + LF_LEFT(i), LF_RIGHT(i), LF_PARENT(i), + (unsigned int)maxchild(q, i)); + print(q->d[i]); + } +} + +void pqueue_print(pqueue_t *q, pqueue_print_entry_f print) { + pqueue_t *dup; + void *e; + + dup = pqueue_init(q->size, + q->cmppri, q->getpri, + q->getpos, q->setpos, q->eqelem, q->prt); + dup->size = q->size; + dup->avail = q->avail; + dup->step = q->step; + + memcpy(dup->d, q->d, (q->size * sizeof(void *))); + + while ((e = pqueue_pop(dup))) { + if (print == NULL) { + q->prt(e); + } else { + print(e); + } + } + pqueue_free(dup); +} + +static int subtree_is_valid(pqueue_t *q, int pos) { + if (pos < 0) { + lf_print_error_and_exit("subtree_is_valid() called with a negative pos index."); + } + + int left_pos = LF_LEFT(pos); + if (left_pos < 0) { + lf_print_error_and_exit("subtree_is_valid(): index overflow detected."); + } + + if ((size_t)left_pos < q->size) { + /* has a left child */ + if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_LEFT(pos)]))) + return 0; + if (!subtree_is_valid(q, LF_LEFT(pos))) + return 0; + } + + int right_pos = LF_RIGHT(pos); + if (right_pos < 0) { + lf_print_error_and_exit("subtree_is_valid(): index overflow detected."); + } + if ((size_t)right_pos < q->size) { + /* has a right child */ + if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_RIGHT(pos)]))) + return 0; + if (!subtree_is_valid(q, LF_RIGHT(pos))) + return 0; + } + return 1; +} + +int pqueue_is_valid(pqueue_t *q) { + return subtree_is_valid(q, 1); +} diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c new file mode 100644 index 000000000..579926f99 --- /dev/null +++ b/core/utils/pqueue_tag.c @@ -0,0 +1,155 @@ +/** + * @file pqueue_tag.c + * @author Byeonggil Jun + * @author Edward A. Lee + * @copyright (c) 2023, The University of California at Berkeley + * License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md) + * + * @brief Priority queue that uses tags for sorting. + */ + +#include + +#include "pqueue_tag.h" +#include "util.h" // For lf_print +#include "platform.h" // For PRINTF_TAG + +////////////////// +// Local functions, not intended for use outside this file. + +/** + * @brief Callback function to get the priority of an element. + * Return the pointer argument cast to pqueue_pri_t because the + * element is also the priority. This function is of type pqueue_get_pri_f. + * @param element A pointer to a pqueue_tag_element_t, cast to void*. + */ +static pqueue_pri_t pqueue_tag_get_priority(void *element) { + return (pqueue_pri_t) element; +} + +/** + * @brief Callback comparison function for the tag-based priority queue. + * Return 0 if the first argument is less than second and 1 otherwise. + * This function is of type pqueue_cmp_pri_f. + * @param priority1 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t. + * @param priority2 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t. +*/ +static int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2) { + return (lf_tag_compare(((pqueue_tag_element_t*) priority1)->tag, ((pqueue_tag_element_t*) priority2)->tag) > 0); +} + +/** + * @brief Callback function to determine whether two elements are equivalent. + * Return 1 if the tags contained by given elements are identical, 0 otherwise. + * This function is of type pqueue_eq_elem_f. + * @param element1 A pointer to a pqueue_tag_element_t, cast to void*. + * @param element2 A pointer to a pqueue_tag_element_t, cast to void*. + */ +static int pqueue_tag_matches(void* element1, void* element2) { + return lf_tag_compare(((pqueue_tag_element_t*) element1)->tag, ((pqueue_tag_element_t*) element2)->tag) == 0; +} + +/** + * @brief Callback function to return the position of an element. + * This function is of type pqueue_get_pos_f. + * @param element A pointer to a pqueue_tag_element_t, cast to void*. + */ +static size_t pqueue_tag_get_position(void *element) { + return ((pqueue_tag_element_t*)element)->pos; +} + +/** + * @brief Callback function to set the position of an element. + * This function is of type pqueue_set_pos_f. + * @param element A pointer to a pqueue_tag_element_t, cast to void*. + * @param pos The position. + */ +static void pqueue_tag_set_position(void *element, size_t pos) { + ((pqueue_tag_element_t*)element)->pos = pos; +} + +/** + * @brief Callback function to print information about an element. + * This is a function of type pqueue_print_entry_f. + * @param element A pointer to a pqueue_tag_element_t, cast to void*. + */ +static void pqueue_tag_print_element(void *element) { + tag_t tag = ((pqueue_tag_element_t*) element)->tag; + lf_print("Element with tag " PRINTF_TAG ".", tag.time, tag.microstep); +} + +////////////////// +// Functions defined in pqueue_tag.h. + +pqueue_tag_t* pqueue_tag_init(size_t initial_size) { + return (pqueue_tag_t*) pqueue_init( + initial_size, + pqueue_tag_compare, + pqueue_tag_get_priority, + pqueue_tag_get_position, + pqueue_tag_set_position, + pqueue_tag_matches, + pqueue_tag_print_element); +} + +void pqueue_tag_free(pqueue_tag_t *q) { + for (int i = 1; i < q->size ;i++) { + if (q->d[i] != NULL && ((pqueue_tag_element_t*)q->d[i])->is_dynamic) { + free(q->d[i]); + } + } + pqueue_free((pqueue_t*)q); +} + +size_t pqueue_tag_size(pqueue_tag_t *q) { + return pqueue_size((pqueue_t*)q); +} + +int pqueue_tag_insert(pqueue_tag_t* q, pqueue_tag_element_t* d) { + return pqueue_insert((pqueue_t*)q, (void*)d); +} + +int pqueue_tag_insert_tag(pqueue_tag_t* q, tag_t t) { + pqueue_tag_element_t* d = (pqueue_tag_element_t*) malloc(sizeof(pqueue_tag_element_t)); + d->is_dynamic = 1; + d->tag = t; + return pqueue_tag_insert(q, d); +} + +pqueue_tag_element_t* pqueue_tag_find_with_tag(pqueue_tag_t *q, tag_t t) { + // Create elements on the stack. These elements are only needed during + // the duration of this function call, so putting them on the stack is OK. + pqueue_tag_element_t element = {.tag = t, .pos = 0, .is_dynamic = false}; + pqueue_tag_element_t forever = {.tag = FOREVER_TAG, .pos = 0, .is_dynamic = false}; + return pqueue_find_equal((pqueue_t*)q, (void*)&element, (pqueue_pri_t)&forever); +} + +int pqueue_tag_insert_if_no_match(pqueue_tag_t* q, tag_t t) { + if (pqueue_tag_find_with_tag(q, t) == NULL) { + return pqueue_tag_insert_tag(q, t); + } else { + return 1; + } +} + +pqueue_tag_element_t* pqueue_tag_pop(pqueue_tag_t* q) { + return (pqueue_tag_element_t*)pqueue_pop((pqueue_t*)q); +} + +tag_t pqueue_tag_pop_tag(pqueue_tag_t* q) { + pqueue_tag_element_t* element = (pqueue_tag_element_t*)pqueue_tag_pop(q); + if (element == NULL) return FOREVER_TAG; + else { + tag_t result = element->tag; + if (element->is_dynamic) free(element); + return result; + } +} + +int pqueue_tag_remove(pqueue_tag_t* q, pqueue_tag_element_t* e) { + return pqueue_remove((pqueue_t*) q, (void*) e); +} + +pqueue_tag_element_t* pqueue_tag_peek(pqueue_tag_t* q) { + return (pqueue_tag_element_t*) pqueue_peek((pqueue_t*)q); +} diff --git a/core/utils/util.c b/core/utils/util.c index 82e57e070..df325b125 100644 --- a/core/utils/util.c +++ b/core/utils/util.c @@ -32,6 +32,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "util.h" + +#ifndef STANDALONE_RTI +#include "environment.h" +#endif + #include #include #include @@ -48,12 +53,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /** Number of nanoseconds to sleep before retrying a socket read. */ #define SOCKET_READ_RETRY_INTERVAL 1000000 -/** - * The ID of this federate. For a non-federated execution, this will - * be -1. For a federated execution, it will be assigned when the generated function - * _lf_initialize_trigger_objects() is called. - * @see xtext/org.icyphy.linguafranca/src/org/icyphy/generator/CGenerator.xtend. - */ int _lf_my_fed_id = -1; /** @@ -65,9 +64,6 @@ print_message_function_t* print_message_function = NULL; /** The level of messages to redirect to print_message_function. */ int print_message_level = -1; -/** - * Return the federate ID or -1 if this program is not part of a federation. - */ int lf_fed_id() { return _lf_my_fed_id; } @@ -107,14 +103,31 @@ void _lf_message_print( // If we make multiple calls to printf(), then the results could be // interleaved between threads. // vprintf() is a version that takes an arg list rather than multiple args. - size_t length = strlen(prefix) + strlen(format) + 32; - char* message = (char*) malloc(length + 1); + char* message; if (_lf_my_fed_id < 0) { + size_t length = strlen(prefix) + strlen(format) + 32; + message = (char*) malloc(length + 1); snprintf(message, length, "%s%s\n", prefix, format); } else { - snprintf(message, length, "Federate %d: %s%s\n", - _lf_my_fed_id, prefix, format); +#if defined STANDALONE_RTI + size_t length = strlen(prefix) + strlen(format) + 37; + message = (char*) malloc(length + 1); + snprintf(message, length, "RTI: %s%s\n", + prefix, format); +#else + // Get the federate name from the top-level environment, which by convention is the first. + environment_t *envs; + _lf_get_environments(&envs); + char* name = envs->name; + size_t length = strlen(prefix) + strlen(format) + +strlen(name) + 32; + message = (char*) malloc(length + 1); + // If the name has prefix "federate__", strip that out. + if (strncmp(name, "federate__", 10) == 0) name += 10; + + snprintf(message, length, "Fed %d (%s): %s%s\n", + _lf_my_fed_id, name, prefix, format); +#endif // STANDALONE_RTI } if (print_message_function == NULL) { if (is_error) { @@ -129,14 +142,6 @@ void _lf_message_print( } } -/** - * Report an informational message on stdout with - * a newline appended at the end. - * If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). - */ void lf_print(const char* format, ...) { va_list args; va_start (args, format); @@ -144,21 +149,10 @@ void lf_print(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print" - */ void lf_vprint(const char* format, va_list args) { _lf_message_print(0, "", format, args, LOG_LEVEL_INFO); } -/** - * Report an log message on stdout with the prefix - * "LOG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). - */ void lf_print_log(const char* format, ...) { va_list args; va_start (args, format); @@ -166,22 +160,10 @@ void lf_print_log(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_log" - */ void lf_vprint_log(const char* format, va_list args) { _lf_message_print(0, "LOG: ", format, args, LOG_LEVEL_LOG); } - -/** - * Report an debug message on stdout with the prefix - * "DEBUG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). - */ void lf_print_debug(const char* format, ...) { va_list args; va_start (args, format); @@ -189,17 +171,10 @@ void lf_print_debug(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_debug" - */ void lf_vprint_debug(const char* format, va_list args) { _lf_message_print(0, "DEBUG: ", format, args, LOG_LEVEL_DEBUG); } -/** - * Report an error with the prefix "ERROR: " and a newline appended - * at the end. The arguments are just like printf(). - */ void lf_print_error(const char* format, ...) { va_list args; va_start (args, format); @@ -207,17 +182,10 @@ void lf_print_error(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_error" - */ void lf_vprint_error(const char* format, va_list args) { _lf_message_print(1, "ERROR: ", format, args, LOG_LEVEL_ERROR); } -/** - * Report a warning with the prefix "WARNING: " and a newline appended - * at the end. The arguments are just like printf(). - */ void lf_print_warning(const char* format, ...) { va_list args; va_start (args, format); @@ -225,18 +193,10 @@ void lf_print_warning(const char* format, ...) { va_end (args); } -/** - * varargs alternative of "lf_print_warning" - */ void lf_vprint_warning(const char* format, va_list args) { _lf_message_print(1, "WARNING: ", format, args, LOG_LEVEL_WARNING); } -/** - * Report an error with the prefix "ERROR: " and a newline appended - * at the end, then exit with the failure code EXIT_FAILURE. - * The arguments are just like printf(). - */ void lf_print_error_and_exit(const char* format, ...) { va_list args; va_start (args, format); @@ -245,25 +205,10 @@ void lf_print_error_and_exit(const char* format, ...) { exit(EXIT_FAILURE); } -/** - * varargs alternative of "lf_print_error_and_exit" - */ void lf_vprint_error_and_exit(const char* format, va_list args) { _lf_message_print(1, "FATAL ERROR: ", format, args, LOG_LEVEL_ERROR); } -/** - * Register a function to display messages. After calling this, - * all messages passed to the above print functions will be - * printed using the specified function rather than printf - * if their log level is greater than the specified level. - * The level should be one of LOG_LEVEL_ERROR, LOG_LEVEL_WARNING, - * LOG_LEVEL_INFO, LOG_LEVEL_LOG, or LOG_LEVEL_DEBUG. - * - * @param function The print message function or NULL to revert - * to using printf. - * @param log_level The level of messages to redirect. - */ void lf_register_print_function(print_message_function_t* function, int log_level) { print_message_function = function; print_message_level = log_level; diff --git a/docs/README.md b/docs/README.md index 3ece865a8..2174ca664 100644 --- a/docs/README.md +++ b/docs/README.md @@ -17,11 +17,9 @@ To build the doc files locally in your clone of the reactor-c repo, we use sphin - Install `python3`, `pip3` and `doxygen` - Install the required Python modules: - - `pip3 install sphinx` - - `pip3 install sphinx_sitemap` - - `pip3 install sphinx-rtd-theme` - - `pip3 install breathe` - - `pip3 install exhale` +``` + pip3 install sphinx sphinx_sitemap sphinx-rtd-theme breathe exhale +``` ### Build Documentation Files diff --git a/include/core/environment.h b/include/core/environment.h index 78819af40..d4852ddca 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -152,4 +152,12 @@ void environment_init_tags( environment_t *env, instant_t start_time, interval_t duration ); +/** + * @brief Will update the argument to point to the beginning of the array of environments in this program + * @note Is code-generated by the compiler + * @param envs A double pointer which will be dereferenced and modified + * @return int The number of environments in the array + */ +int _lf_get_environments(environment_t **envs); + #endif diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index aca041e5b..880408ec6 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -363,7 +363,7 @@ void stall_advance_level_federation(environment_t* env, size_t level); /** * @brief Update the max level allowed to advance (MLAA). * If the specified tag is greater than the current_tag of the top-level environment - * (or equal an is_provisional is false), then set the MLAA to MAX_INT and return. + * (or equal and is_provisional is false), then set the MLAA to MAX_INT and return. * This removes any barriers on execution at the current tag due to network inputs. * Otherwise, set the MLAA to the minimum level over all (non-physical) network input ports * where the status of the input port is not known at that current_tag. diff --git a/include/core/reactor.h b/include/core/reactor.h index 0e56c0f3e..d9ee515b5 100644 --- a/include/core/reactor.h +++ b/include/core/reactor.h @@ -558,15 +558,6 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, // See reactor.h for doc. int _lf_fd_send_stop_request_to_rti(tag_t stop_tag); -/** - * @brief Will update the argument to point to the beginning of the array of environments in this program - * @note Is code-generated by the compiler - * @param envs A double pointer which will be dereferenced and modified - * @return int The number of environments in the array - */ -int _lf_get_environments(environment_t **envs); - - /** * @brief Will create and initialize the required number of environments for the program * @note Will be code generated by the compiler diff --git a/include/core/tag.h b/include/core/tag.h index b598cce6a..0a480a40b 100644 --- a/include/core/tag.h +++ b/include/core/tag.h @@ -40,6 +40,7 @@ #define FOREVER_TAG (tag_t) { .time = FOREVER, .microstep = FOREVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers #define FOREVER_TAG_INITIALIZER { FOREVER, FOREVER_MICROSTEP } +#define ZERO_TAG (tag_t) { .time = 0LL, .microstep = 0u } // Convenience for converting times #define BILLION 1000000000LL @@ -82,6 +83,14 @@ typedef struct { */ tag_t lf_tag(void* env); +/** + * Add two tags. If either tag has has NEVER or FOREVER in its time field, then + * return NEVER_TAG or FOREVER_TAG, respectively. Also return NEVER_TAG or FOREVER_TAG + * if the result underflows or overflows when adding the times. + * If the microstep overflows, also return FOREVER_TAG. + */ +tag_t lf_tag_add(tag_t a, tag_t b); + /** * Compare two tags. Return -1 if the first is less than * the second, 0 if they are equal, and +1 if the first is diff --git a/include/core/trace.h b/include/core/trace.h index 6e22332ff..598c669bf 100644 --- a/include/core/trace.h +++ b/include/core/trace.h @@ -435,6 +435,7 @@ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, i * close the files. */ void stop_trace(trace_t* trace); +void stop_trace_locked(trace_t* trace); //////////////////////////////////////////////////////////// //// For federated execution @@ -540,6 +541,7 @@ typedef struct trace_t trace_t; #define start_trace(...) #define stop_trace(...) +#define stop_trace_locked(...) #define trace_new(...) NULL #define trace_free(...) diff --git a/include/core/utils/pqueue.h b/include/core/utils/pqueue.h index 5c3e7fe2b..edfd4968c 100644 --- a/include/core/utils/pqueue.h +++ b/include/core/utils/pqueue.h @@ -1,230 +1,99 @@ -/* - * Copyright (c) 2014, Volkan Yazıcı - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * Modified by Marten Lohstroh (May, 2019). - * Changes: - * - Require implementation of a pqueue_eq_elem_f function to determine - * whether two elements are equal or not; and - * - The provided pqueue_eq_elem_f implementation is used to test and - * search for equal elements present in the queue; and - * - Removed capability to reassign priorities. - */ - /** - * @file pqueue.h - * @brief Priority Queue function declarations - * - * @{ + * @file pqueue.h + * @author Marten Lohstroh + * @author Edward A. Lee + * @copyright (c) 2020-2023, The University of California at Berkeley. + * License: BSD 2-clause + * + * @brief Priority queue declarations for the event queue and reaction queue. */ - #ifndef PQUEUE_H #define PQUEUE_H -#include - -/** priority data type */ -typedef unsigned long long pqueue_pri_t; - -/** callback functions to get/set/compare the priority of an element */ -typedef pqueue_pri_t (*pqueue_get_pri_f)(void *a); -typedef void (*pqueue_set_pri_f)(void *a, pqueue_pri_t pri); -typedef int (*pqueue_cmp_pri_f)(pqueue_pri_t next, pqueue_pri_t curr); -typedef int (*pqueue_eq_elem_f)(void* next, void* curr); - -/** callback functions to get/set the position of an element */ -typedef size_t (*pqueue_get_pos_f)(void *a); -typedef void (*pqueue_set_pos_f)(void *a, size_t pos); - -/** debug callback function to print a entry */ -typedef void (*pqueue_print_entry_f)(void *a); - -/** the priority queue handle */ -typedef struct pqueue_t -{ - size_t size; /**< number of elements in this queue plus 1 */ - size_t avail; /**< slots available in this queue */ - size_t step; /**< growth stepping setting */ - pqueue_cmp_pri_f cmppri; /**< callback to compare priorities */ - pqueue_get_pri_f getpri; /**< callback to get priority of a node */ - pqueue_get_pos_f getpos; /**< callback to get position of a node */ - pqueue_set_pos_f setpos; /**< callback to set position of a node */ - pqueue_eq_elem_f eqelem; /**< callback to compare elements */ - pqueue_print_entry_f prt; /**< callback to print elements */ - void **d; /**< The actual queue in binary heap form */ -} pqueue_t; - -/** - * initialize the queue - * - * @param n the initial estimate of the number of queue items for which memory - * should be preallocated - * @param cmppri The callback function to run to compare two elements - * This callback should return 0 for 'lower' and non-zero - * for 'higher', or vice versa if reverse priority is desired - * @param getpri the callback function to run to set a score to an element - * @param getpos the callback function to get the current element's position - * @param setpos the callback function to set the current element's position - * - * @return the handle or NULL for insufficent memory - */ -pqueue_t * -pqueue_init(size_t n, - pqueue_cmp_pri_f cmppri, - pqueue_get_pri_f getpri, - pqueue_get_pos_f getpos, - pqueue_set_pos_f setpos, - pqueue_eq_elem_f eqelem, - pqueue_print_entry_f prt); - +#include "pqueue_base.h" /** - * free all memory used by the queue - * @param q the queue + * Return 1 if the first argument is greater than the second and zero otherwise. + * @param thiz First argument. + * @param that Second argument. */ -void pqueue_free(pqueue_t *q); - - -/** - * return the size of the queue. - * @param q the queue - */ -size_t pqueue_size(pqueue_t *q); +int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that); /** - * Insert an element into the queue. - * @param q the queue - * @param e the element - * @return 0 on success + * Return 0 regardless of argument order. + * @param thiz First argument. + * @param that Second argument. */ -int pqueue_insert(pqueue_t *q, void *d); +int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that); /** - * Move an existing entry to a different priority. - * @param q the queue - * @param new_pri the new priority - * @param d the entry + * Return 1 if the two events have the same trigger. + * @param event1 A pointer to an event_t. + * @param event2 A pointer to an event_t. */ -void -pqueue_change_priority(pqueue_t *q, - pqueue_pri_t new_pri, - void *d); - +int event_matches(void* event1, void* event2); /** - * Pop the highest-ranking item from the queue. - * @param q the queue - * @return NULL on error, otherwise the entry + * Return 1 if the two arguments are identical pointers. + * @param a First argument. + * @param b Second argument. */ -void *pqueue_pop(pqueue_t *q); +int reaction_matches(void* a, void* b); /** - * @brief Empty 'src' into 'dest'. - * - * As an optimization, this function might swap 'src' and 'dest'. - * - * @param dest The queue to fill up - * @param src The queue to empty + * Report a priority equal to the time of the given event. + * This is used for sorting pointers to event_t structs in the event queue. + * @param a A pointer to an event_t. */ -void pqueue_empty_into(pqueue_t** dest, pqueue_t** src); +pqueue_pri_t get_event_time(void *event); /** - * Find the highest-ranking item with the same priority that matches the - * supplied entry. - * @param q the queue - * @param e the entry to compare against - * @return NULL if no matching event has been found, otherwise the entry + * Report a priority equal to the index of the given reaction. + * Used for sorting pointers to reaction_t structs in the + * blocked and executing queues. + * @param reaction A pointer to a reaction_t. */ -void* pqueue_find_equal_same_priority(pqueue_t *q, void *e); +pqueue_pri_t get_reaction_index(void *reaction_t); /** - * Find the highest-ranking item with priority up to and including the given - * maximum priority that matches the supplied entry. - * @param q the queue - * @param e the entry to compare against - * @param max_priority the maximum priority to consider - * @return NULL if no matching event has been found, otherwise the entry + * Return the given event's position in the queue. + * @param event A pointer to an event_t. */ -void* pqueue_find_equal(pqueue_t *q, void *e, pqueue_pri_t max_priority); +size_t get_event_position(void *event); /** - * Remove an item from the queue. - * @param q the queue - * @param e the entry - * @return 0 on success + * Return the given reaction's position in the queue. + * @param reaction A pointer to a reaction_t. */ -int pqueue_remove(pqueue_t *q, void *e); +size_t get_reaction_position(void *reaction); /** - * Access highest-ranking item without removing it. - * @param q the queue - * @return NULL on error, otherwise the entry + * Set the given event's position in the queue. + * @param event A pointer to an event_t + * @param pos The position. */ -void *pqueue_peek(pqueue_t *q); +void set_event_position(void *event, size_t pos); /** - * Print the queue. - * @internal - * DEBUG function only - * @param q the queue - * @param the callback function to print the entry + * Set the given reaction's position in the queue. + * @param event A pointer to a reaction_t. + * @param pos The position. */ -void -pqueue_print(pqueue_t *q, - pqueue_print_entry_f print); +void set_reaction_position(void *reaction, size_t pos); /** - * Dump the queue and it's internal structure. - * @internal - * debug function only - * @param q the queue - * @param the callback function to print the entry + * Print some information about the given reaction. + * This only prints something if logging is set to DEBUG. + * @param reaction A pointer to a reaction_t. */ -void -pqueue_dump(pqueue_t *q, - pqueue_print_entry_f print); +void print_reaction(void *reaction); /** - * Check that the all entries are in the right order, etc. - * @internal - * debug function only - * @param q the queue + * Print some information about the given event. + * This only prints something if logging is set to DEBUG. + * @param event A pointer to an event_t. */ -int pqueue_is_valid(pqueue_t *q); - -// ********** Priority Queue Support Start -int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that); -int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that); -int event_matches(void* next, void* curr); -int reaction_matches(void* next, void* curr); -pqueue_pri_t get_event_time(void *a); -pqueue_pri_t get_reaction_index(void *a); -size_t get_event_position(void *a); -size_t get_reaction_position(void *a); -void set_event_position(void *a, size_t pos); -void set_reaction_position(void *a, size_t pos); -void print_reaction(void *reaction); void print_event(void *event); #endif /* PQUEUE_H */ -/** @} */ diff --git a/include/core/utils/pqueue_base.h b/include/core/utils/pqueue_base.h new file mode 100644 index 000000000..210cc0eec --- /dev/null +++ b/include/core/utils/pqueue_base.h @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2014, Volkan Yazıcı + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Modified by Marten Lohstroh (May, 2019). + * Changes: + * - Require implementation of a pqueue_eq_elem_f function to determine + * whether two elements are equal or not; and + * - The provided pqueue_eq_elem_f implementation is used to test and + * search for equal elements present in the queue; and + * - Removed capability to reassign priorities. + * + * @brief Priority Queue function declarations used as a base for Lingua Franca priority queues. + * + * @{ + */ + + +#ifndef PQUEUE_BASE_H +#define PQUEUE_BASE_H + +#include + +/** Priority data type. */ +typedef unsigned long long pqueue_pri_t; + +/** Callback to get the priority of an element. */ +typedef pqueue_pri_t (*pqueue_get_pri_f)(void *a); + +/** Callback to compare two priorities. */ +typedef int (*pqueue_cmp_pri_f)(pqueue_pri_t next, pqueue_pri_t curr); + +/** Callback to determine whether two elements are equivalent. */ +typedef int (*pqueue_eq_elem_f)(void* next, void* curr); + +/** Callback functions to get the position of an element. */ +typedef size_t (*pqueue_get_pos_f)(void *a); + +/** Callback functions to set the position of an element. */ +typedef void (*pqueue_set_pos_f)(void *a, size_t pos); + +/** Debug callback function to print a entry. */ +typedef void (*pqueue_print_entry_f)(void *a); + +/** The priority queue handle. */ +typedef struct pqueue_t +{ + size_t size; /**< number of elements in this queue plus 1 */ + size_t avail; /**< slots available in this queue */ + size_t step; /**< growth stepping setting */ + pqueue_cmp_pri_f cmppri; /**< callback to compare priorities */ + pqueue_get_pri_f getpri; /**< callback to get priority of a node */ + pqueue_get_pos_f getpos; /**< callback to get position of a node */ + pqueue_set_pos_f setpos; /**< callback to set position of a node */ + pqueue_eq_elem_f eqelem; /**< callback to compare elements */ + pqueue_print_entry_f prt; /**< callback to print elements */ + void **d; /**< The actual queue in binary heap form */ +} pqueue_t; + +/** + * @brief Allocate and initialize a priority queue. + * + * @param n the initial estimate of the number of queue items for which memory + * should be preallocated + * @param cmppri The callback function to run to compare two elements + * This callback should return 0 for 'lower' and non-zero + * for 'higher', or vice versa if reverse priority is desired + * @param getpri the callback function to run to set a score to an element + * @param getpos the callback function to get the current element's position + * @param setpos the callback function to set the current element's position + * @param eqelem the callback function to check equivalence of entries + * @param prt the callback function to print an element + * + * @return The handle or NULL for insufficent memory. + */ +pqueue_t * +pqueue_init(size_t n, + pqueue_cmp_pri_f cmppri, + pqueue_get_pri_f getpri, + pqueue_get_pos_f getpos, + pqueue_set_pos_f setpos, + pqueue_eq_elem_f eqelem, + pqueue_print_entry_f prt); + +/** + * free all memory used by the queue + * @param q the queue + */ +void pqueue_free(pqueue_t *q); + +/** + * return the size of the queue. + * @param q the queue + */ +size_t pqueue_size(pqueue_t *q); + +/** + * Insert an element into the queue. + * @param q the queue + * @param e the element + * @return 0 on success + */ +int pqueue_insert(pqueue_t *q, void *d); + +/** + * Move an existing entry to a different priority. + * @param q the queue + * @param new_pri the new priority + * @param d the entry + */ +void +pqueue_change_priority(pqueue_t *q, + pqueue_pri_t new_pri, + void *d); + +/** + * Pop the highest-ranking item from the queue. + * @param q the queue + * @return NULL on error, otherwise the entry + */ +void *pqueue_pop(pqueue_t *q); + +/** + * @brief Empty 'src' into 'dest'. + * + * As an optimization, this function might swap 'src' and 'dest'. + * + * @param dest The queue to fill up + * @param src The queue to empty + */ +void pqueue_empty_into(pqueue_t** dest, pqueue_t** src); + +/** + * Find the highest-ranking item with the same priority that matches the + * supplied entry. + * @param q the queue + * @param e the entry to compare against + * @return NULL if no matching event has been found, otherwise the entry + */ +void* pqueue_find_equal_same_priority(pqueue_t *q, void *e); + +/** + * Find the highest-ranking item with priority up to and including the given + * maximum priority that matches the supplied entry. + * @param q the queue + * @param e the entry to compare against + * @param max_priority the maximum priority to consider + * @return NULL if no matching event has been found, otherwise the entry + */ +void* pqueue_find_equal(pqueue_t *q, void *e, pqueue_pri_t max_priority); + +/** + * Remove an item from the queue. + * @param q the queue + * @param e the entry + * @return 0 on success + */ +int pqueue_remove(pqueue_t *q, void *e); + +/** + * Access highest-ranking item without removing it. + * @param q the queue + * @return NULL on error, otherwise the entry + */ +void *pqueue_peek(pqueue_t *q); + + +/** + * Print the contents of the queue. + * @param q The queue. + * @param print The callback function to print the entry or NULL to use the default. + */ +void pqueue_print(pqueue_t *q, pqueue_print_entry_f print); + +/** + * Dump the queue and it's internal structure. + * @internal + * debug function only + * @param q the queue + * @param the callback function to print the entry + */ +void +pqueue_dump(pqueue_t *q, + pqueue_print_entry_f print); + +/** + * Check that the all entries are in the right order, etc. + * @internal + * debug function only + * @param q the queue + */ +int pqueue_is_valid(pqueue_t *q); + +#endif /* PQUEUE_BASE_H */ +/** @} */ diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h new file mode 100644 index 000000000..aef72d507 --- /dev/null +++ b/include/core/utils/pqueue_tag.h @@ -0,0 +1,154 @@ +/** + * @file tag_pqueue.h + * @author Byeonggil Jun + * @author Edward A. Lee + * @copyright (c) 2023, The University of California at Berkeley + * License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md) + * @brief Priority queue that uses tags for sorting. + * + * This file extends the pqueue infrastructure with support for queues that are sorted + * by tag instead of by a long long. Elements in this queue are structs of type + * pqueue_tag_element_t or a derived struct, as explained below. What you put onto the + * queue is a pointer to a tagged_element_t struct. That pointer, when cast to pqueue_pri_t, + * an alias for long long, also serves as the "priority" for the queue. + */ + +#ifndef PQUEUE_TAG_H +#define PQUEUE_TAG_H + +#include "pqueue_base.h" +#include "tag.h" + +/** + * @brief The type for an element in a priority queue that is sorted by tag. + * + * In this design, a pointer to this struct is also a "priority" (it can be + * cast to pqueue_pri_t). The actual priority is the tag field of the struct, + * in that the queue is sorted from least tag to largest. + * + * If your struct is dynamically allocated using malloc or calloc, and you + * would like the memory freed when the queue is freed, then set the is_dynamic + * field of the element to a non-zero value. + * + * For a priority queue that contains only tags with no payload, you can + * avoid creating the element struct by using the functions + * pqueue_tag_insert_tag, pqueue_tag_insert_if_no_match, and pqueue_tag_pop_tag. + * + * To customize the element you put onto the queue, for example to carry + * a pyaload, you can create your own element struct type by simply declaring + * the first field to be a pqueue_tag_element_t. For example, if you want an + * element of the queue to include a pointer to your own payload, you can + * declare the following struct type: + *
+ *     typedef struct {
+ *         pqueue_tag_element_t base;
+ *         my_type* my_payload;
+ *     } my_element_type_t;
+ * 
+ * When inserting your struct into the queue, simply cast your pointer + * to (pqueue_tag_element_t*). When accessing your struct from the queue, + * simply cast the result to (my_element_type_t*); + */ +typedef struct { + tag_t tag; + size_t pos; // Needed by any pqueue element. + int is_dynamic; // Non-zero to free this struct when the queue is freed. +} pqueue_tag_element_t; + +/** + * Type of a priority queue sorted by tags. + */ +typedef pqueue_t pqueue_tag_t; + +/** + * @brief Create a priority queue sorted by tags. + * The elements of the priority queue will be of type pqueue_tag_element_t. + * The caller should call pqueue_tag_free() when finished with the queue. + * @return A dynamically allocated priority queue or NULL if memory allocation fails. + */ +pqueue_tag_t* pqueue_tag_init(size_t initial_size); + +/** + * Free all memory used by the queue including any elements that are marked is_dynamic. + * @param q The queue. + */ +void pqueue_tag_free(pqueue_tag_t *q); + +/** + * Return the size of the queue. + * @param q The queue. + */ +size_t pqueue_tag_size(pqueue_tag_t *q); + +/** + * Insert an element into the queue. + * @param q The queue. + * @param e The element to insert. + * @return 0 on success + */ +int pqueue_tag_insert(pqueue_tag_t* q, pqueue_tag_element_t* d); + +/** + * @brief Insert a tag into the queue. + * This automatically creates a dynamically allocated element in the queue + * and ensures that if the element is still on the queue when pqueue_tag_free + * is called, then that memory will be freed. + * @param q The queue. + * @param t The tag to insert. + * @return 0 on success + */ +int pqueue_tag_insert_tag(pqueue_tag_t* q, tag_t t); + +/** + * @brief Insert a tag into the queue if the tag is not already in the queue. + * This automatically creates a dynamically allocated element in the queue + * and ensures that if the element is still on the queue when pqueue_tag_free + * is called, then that memory will be freed. + * @param q The queue. + * @param t The tag to insert. + * @return 0 on success, 1 otherwise. + */ +int pqueue_tag_insert_if_no_match(pqueue_tag_t* q, tag_t t); + +/** + * @brief Pop the least-tag element from the queue and return its tag. + * If the queue is empty, return FOREVER_TAG. This function handles freeing + * the element struct if it was dynamically allocated. + * @param q The queue. + * @return NULL on error, otherwise the entry + */ +tag_t pqueue_tag_pop_tag(pqueue_tag_t* q); + +/** + * @brief Pop the least-tag element from the queue. + * If the entry was dynamically allocated, then it is now up to the caller + * to ensure that it is freed. It will not be freed by pqueue_tag_free. + * @param q The queue. + * @return NULL on error, otherwise the entry + */ +pqueue_tag_element_t* pqueue_tag_pop(pqueue_tag_t* q); + +/** + * Return the first item with the specified tag or NULL if there is none. + * @param q The queue. + * @param t The tag. + * @return An entry with the specified tag or NULL if there isn't one. + */ +pqueue_tag_element_t* pqueue_tag_find_with_tag(pqueue_tag_t *q, tag_t t); + +/** + * Remove an item from the queue. + * @param q The queue. + * @param e The entry to remove. + * @return 0 on success + */ +int pqueue_tag_remove(pqueue_tag_t* q, pqueue_tag_element_t* e); + +/** + * Access highest-ranking item without removing it. + * @param q The queue. + * @return NULL on error, otherwise the entry. + */ +pqueue_tag_element_t* pqueue_tag_peek(pqueue_tag_t* q); + +#endif // PQUEUE_TAG_H diff --git a/include/core/utils/util.h b/include/core/utils/util.h index 7d90d7420..d93f5de24 100644 --- a/include/core/utils/util.h +++ b/include/core/utils/util.h @@ -124,12 +124,9 @@ extern int _lf_my_fed_id; int lf_fed_id(void); /** - * Report an informational message on stdout with - * a newline appended at the end. - * If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). + * Report an informational message on stdout with a newline appended at the end. + * If this execution is federated, then the message will be prefaced by identifying + * information for the federate. The arguments are just like printf(). */ void lf_print(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); @@ -139,12 +136,9 @@ void lf_print(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); void lf_vprint(const char* format, va_list args) ATTRIBUTE_FORMAT_PRINTF(1, 0); /** - * Report an log message on stdout with the prefix - * "LOG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). + * Report an log message on stdout with the prefix "LOG: " and a newline appended + * at the end. If this execution is federated, then the message will be prefaced by + * identifying information for the federate. The arguments are just like printf(). */ void lf_print_log(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); @@ -176,12 +170,9 @@ void lf_vprint_log(const char* format, va_list args) ATTRIBUTE_FORMAT_PRINTF(1, } } while (0) /** - * Report an debug message on stdout with the prefix - * "DEBUG: " and a newline appended - * at the end. If this execution is federated, then - * the message will be prefaced by "Federate n: ", - * where n is the federate ID. - * The arguments are just like printf(). + * Report an debug message on stdout with the prefix "DEBUG: " and a newline appended + * at the end. If this execution is federated, then the message will be prefaced by + * identifying information for the federate. The arguments are just like printf(). */ void lf_print_debug(const char* format, ...) ATTRIBUTE_FORMAT_PRINTF(1, 2); diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 7e9223177..1f7391f92 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -enclaves4 +master diff --git a/test/general/utils/pqueue_test.c b/test/general/utils/pqueue_test.c new file mode 100644 index 000000000..f95492799 --- /dev/null +++ b/test/general/utils/pqueue_test.c @@ -0,0 +1,104 @@ +#include +#include +#include +#include +#include "pqueue_tag.h" +#include "tag.h" + +static void trivial(void) { + // Create an event queue. + pqueue_tag_t* q = pqueue_tag_init(1); + assert(q != NULL); + assert(pqueue_is_valid((pqueue_t*)q)); + pqueue_print((pqueue_t*)q, NULL); + pqueue_tag_free(q); +} + +static void insert_on_queue(pqueue_tag_t* q) { + tag_t t1 = {.time = USEC(3), .microstep = 0}; + tag_t t2 = {.time = USEC(2), .microstep = 1}; + tag_t t3 = {.time = USEC(2), .microstep = 0}; + tag_t t4 = {.time = USEC(1), .microstep = 2}; + assert(!pqueue_tag_insert_tag(q, t1)); + assert(!pqueue_tag_insert_tag(q, t2)); + assert(!pqueue_tag_insert_tag(q, t3)); + + assert(!pqueue_tag_insert_if_no_match(q, t4)); + assert(pqueue_tag_insert_if_no_match(q, t1)); + assert(pqueue_tag_insert_if_no_match(q, t4)); + printf("======== Contents of the queue:\n"); + pqueue_print((pqueue_t*)q, NULL); + assert(pqueue_tag_size(q) == 4); +} + +static void find_from_queue(pqueue_tag_t* q) { + tag_t t1 = {.time = USEC(3), .microstep = 0}; + tag_t t2 = {.time = USEC(2), .microstep = 1}; + tag_t t3 = {.time = USEC(2), .microstep = 0}; + tag_t t4 = {.time = USEC(1), .microstep = 2}; + tag_t t5 = {.time = USEC(0), .microstep = 0}; + tag_t t6 = {.time = USEC(3), .microstep = 2}; + assert(pqueue_tag_find_with_tag(q, t1) != NULL); + assert(pqueue_tag_find_with_tag(q, t2) != NULL); + assert(pqueue_tag_find_with_tag(q, t3) != NULL); + assert(pqueue_tag_find_with_tag(q, t4) != NULL); + assert(pqueue_tag_find_with_tag(q, t5) == NULL); + assert(pqueue_tag_find_with_tag(q, t6) == NULL); +} + +static void insert_if_no_match(pqueue_tag_t* q) { + int size = pqueue_tag_size(q); + tag_t t1 = {.time = USEC(3), .microstep = 0}; + tag_t t4 = {.time = USEC(1), .microstep = 2}; + // Return value is non-zero on failure to insert: + assert(pqueue_tag_insert_if_no_match(q, t1)); + assert(pqueue_tag_insert_if_no_match(q, t4)); + assert(size == pqueue_tag_size(q)); +} + +static void pop_from_queue(pqueue_tag_t* q) { + tag_t t1_back = pqueue_tag_pop_tag(q); + assert(t1_back.time == USEC(1)); + assert(t1_back.microstep == 2); + tag_t t2_back = pqueue_tag_pop_tag(q); + assert(t2_back.time == USEC(2)); + assert(t2_back.microstep == 0); + tag_t t3_back = pqueue_tag_pop_tag(q); + assert(t3_back.time == USEC(2)); + assert(t3_back.microstep == 1); + tag_t t4_back = pqueue_tag_pop_tag(q); + assert(t4_back.time == USEC(3)); + assert(t4_back.microstep == 0); +} + +static void pop_empty(pqueue_tag_t* q) { + assert(pqueue_tag_size(q) == 0); + assert(pqueue_tag_pop(q) == NULL); +} + +static void remove_from_queue(pqueue_tag_t* q, pqueue_tag_element_t* e1, pqueue_tag_element_t* e2) { + assert(pqueue_tag_insert(q, e1) == 0); + assert(pqueue_tag_insert(q, e2) == 0); + assert(pqueue_tag_remove(q, e1) == 0); + assert(pqueue_tag_peek(q) == e2); + assert(pqueue_tag_size(q) == 1); +} + +int main(int argc, char *argv[]) { + trivial(); + // Create an event queue. + pqueue_tag_t* q = pqueue_tag_init(2); + + insert_on_queue(q); + find_from_queue(q); + insert_if_no_match(q); + pop_from_queue(q); + pop_empty(q); + + pqueue_tag_element_t e1 = {.tag = {.time = USEC(3), .microstep = 0}, .pos = 0, .is_dynamic = 0}; + pqueue_tag_element_t e2 = {.tag = {.time = USEC(2), .microstep = 0}, .pos = 0, .is_dynamic = 0}; + + remove_from_queue(q, &e1, &e2); + + pqueue_tag_free(q); +} diff --git a/test/src_gen_stub.c b/test/src_gen_stub.c index 6ac14c84e..5b43d103b 100644 --- a/test/src_gen_stub.c +++ b/test/src_gen_stub.c @@ -1,8 +1,15 @@ #include #include "tag.h" +#include "environment.h" + +environment_t _env; void _lf_initialize_trigger_objects() {} void terminate_execution() {} void _lf_set_default_command_line_options() {} void _lf_initialize_watchdog_mutexes() {} void logical_tag_complete(tag_t tag_to_send) {} +int _lf_get_environments(environment_t ** envs) { + *envs = &_env; + return 1; +} \ No newline at end of file