From d9200e0f564296ff1682847952766a2a24aa180c Mon Sep 17 00:00:00 2001 From: Peter Donovan Date: Sat, 2 Sep 2023 15:08:48 -0700 Subject: [PATCH 01/10] Fix PTAG sending logic. --- core/federated/RTI/enclave.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/federated/RTI/enclave.c b/core/federated/RTI/enclave.c index 5fcd5e6a1..ae3b9202d 100644 --- a/core/federated/RTI/enclave.c +++ b/core/federated/RTI/enclave.c @@ -102,12 +102,13 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // Find the tag of the earliest possible incoming message from // upstream enclaves. - tag_t t_d = FOREVER_TAG; + tag_t t_d_nonzero_delay = FOREVER_TAG; // The tag of the earliest possible incoming message from a zero-delay connection. // Delayed connections are not guarded from STP violations by the MLAA; this property is // acceptable because delayed connections impose no deadlock risk and in some cases (startup) // this property is necessary to avoid deadlocks. However, it requires some special care here - // when potentially sending a PTAG. + // when potentially sending a PTAG because we must not send a PTAG for a tag at which data may + // still be received over nonzero-delay connections. tag_t t_d_zero_delay = FOREVER_TAG; LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG, FOREVER_TAG.time - start_time, FOREVER_TAG.microstep, @@ -133,14 +134,18 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // whereas one microstep delay is encoded as 0LL. tag_t candidate = lf_delay_tag(upstream_next_event, e->upstream_delay[j]); - if (lf_tag_compare(candidate, t_d) < 0) { - t_d = candidate; - } - if (lf_tag_compare(candidate, t_d_zero_delay) < 0 && e->upstream_delay[j] == NEVER) { - t_d_zero_delay = candidate; + if (e->upstream_delay[j] == NEVER) { + if (lf_tag_compare(candidate, t_d_zero_delay) < 0) { + t_d_zero_delay = candidate; + } + } else { + if (lf_tag_compare(candidate, t_d_nonzero_delay) < 0) { + t_d_nonzero_delay = candidate; + } } } free(visited); + tag_t t_d = (lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0) ? t_d_zero_delay : t_d_nonzero_delay; LF_PRINT_LOG("Earliest next event upstream has tag " PRINTF_TAG ".", t_d.time - start_time, t_d.microstep); @@ -162,6 +167,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { result.tag = e->next_event; } else if ( lf_tag_compare(t_d_zero_delay, e->next_event) == 0 // The enclave has something to do. + && lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0 // The statuses of nonzero-delay connections are known at tag t_d_zero_delay && lf_tag_compare(t_d_zero_delay, e->last_provisionally_granted) > 0 // The grant is not redundant. && lf_tag_compare(t_d_zero_delay, e->last_granted) > 0 // The grant is not redundant. ) { From 1907499542a8756482bc8a2e535699abb9aa2df5 Mon Sep 17 00:00:00 2001 From: Peter Donovan Date: Sat, 2 Sep 2023 15:21:05 -0700 Subject: [PATCH 02/10] Account for non-injectivity of right addition by g If a upstream federate A finishes tag g' and the delay between A and downstream federate B is g, then it is not guaranteed that A will not send another message to B at tag g' + g because there exists g'' > g' such that g'' + g = g' + g. --- core/federated/RTI/enclave.c | 4 ++-- core/federated/federate.c | 2 +- core/tag.c | 10 ++++++++++ include/core/tag.h | 9 +++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/core/federated/RTI/enclave.c b/core/federated/RTI/enclave.c index ae3b9202d..c50a6102f 100644 --- a/core/federated/RTI/enclave.c +++ b/core/federated/RTI/enclave.c @@ -72,7 +72,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // Ignore this enclave if it no longer connected. if (upstream->state == NOT_CONNECTED) continue; - tag_t candidate = lf_delay_tag(upstream->completed, e->upstream_delay[j]); + tag_t candidate = lf_delay_antitag(upstream->completed, e->upstream_delay[j]); if (lf_tag_compare(candidate, min_upstream_completed) < 0) { min_upstream_completed = candidate; @@ -132,7 +132,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // Adjust by the "after" delay. // Note that "no delay" is encoded as NEVER, // whereas one microstep delay is encoded as 0LL. - tag_t candidate = lf_delay_tag(upstream_next_event, e->upstream_delay[j]); + tag_t candidate = lf_delay_antitag(upstream_next_event, e->upstream_delay[j]); if (e->upstream_delay[j] == NEVER) { if (lf_tag_compare(candidate, t_d_zero_delay) < 0) { diff --git a/core/federated/federate.c b/core/federated/federate.c index 91d2e4a04..1d3d30f6b 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1407,7 +1407,7 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela // Apply the additional delay to the current tag and use that as the intended // tag of the outgoing message - tag_t current_message_intended_tag = lf_delay_tag(env->current_tag, + tag_t current_message_intended_tag = lf_delay_antitag(env->current_tag, additional_delay); LF_PRINT_LOG("Sending port " diff --git a/core/tag.c b/core/tag.c index 9cc1eee0f..a7983b474 100644 --- a/core/tag.c +++ b/core/tag.c @@ -152,6 +152,16 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { return result; } +tag_t lf_delay_antitag(tag_t antitag, interval_t interval) { + tag_t result = lf_delay_tag(antitag, interval); + if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) { + LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time); + result.time -= 1; + result.microstep = UINT_MAX; + } + return result; +} + instant_t lf_time_logical(void *env) { assert(env != GLOBAL_ENVIRONMENT); return ((environment_t *) env)->current_tag.time; diff --git a/include/core/tag.h b/include/core/tag.h index d3f649640..7a331b9a1 100644 --- a/include/core/tag.h +++ b/include/core/tag.h @@ -110,6 +110,15 @@ int lf_tag_compare(tag_t tag1, tag_t tag2); */ tag_t lf_delay_tag(tag_t tag, interval_t interval); +/** + * Return the latest tag that cannot have an event provided that + * - `antitag` does not have an event, and + * - if an event occurs at time `x`, then there exists a tag `antitag` such that + * `x = antitag + interval`, where `+` is the usual `+` that we use for tags + * which is neither commutative nor a group operation. + */ +tag_t lf_delay_antitag(tag_t antitag, interval_t interval); + /** * Return the current logical time in nanoseconds. * On many platforms, this is the number of nanoseconds From 5010ad532698620afaf6acacb78847023d7e1b88 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 13 Sep 2023 10:58:35 +0200 Subject: [PATCH 03/10] Changed lf_delay_antitag to lf_delay_strict --- core/federated/RTI/enclave.c | 7 +++++-- core/federated/federate.c | 7 +++++-- core/tag.c | 17 +++++++++------ include/core/tag.h | 40 ++++++++++++++++++++++-------------- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/core/federated/RTI/enclave.c b/core/federated/RTI/enclave.c index c50a6102f..9af2d60ac 100644 --- a/core/federated/RTI/enclave.c +++ b/core/federated/RTI/enclave.c @@ -72,7 +72,10 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // Ignore this enclave if it no longer connected. if (upstream->state == NOT_CONNECTED) continue; - tag_t candidate = lf_delay_antitag(upstream->completed, e->upstream_delay[j]); + // Adjust by the "after" delay. + // Note that "no delay" is encoded as NEVER, + // whereas one microstep delay is encoded as 0LL. + tag_t candidate = lf_delay_strict(upstream->completed, e->upstream_delay[j]); if (lf_tag_compare(candidate, min_upstream_completed) < 0) { min_upstream_completed = candidate; @@ -132,7 +135,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // Adjust by the "after" delay. // Note that "no delay" is encoded as NEVER, // whereas one microstep delay is encoded as 0LL. - tag_t candidate = lf_delay_antitag(upstream_next_event, e->upstream_delay[j]); + tag_t candidate = lf_delay_strict(upstream_next_event, e->upstream_delay[j]); if (e->upstream_delay[j] == NEVER) { if (lf_tag_compare(candidate, t_d_zero_delay) < 0) { diff --git a/core/federated/federate.c b/core/federated/federate.c index 1d3d30f6b..5e25c7eaf 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1406,8 +1406,11 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela unsigned char buffer[message_length]; // Apply the additional delay to the current tag and use that as the intended - // tag of the outgoing message - tag_t current_message_intended_tag = lf_delay_antitag(env->current_tag, + // tag of the outgoing message. Note that if there is delay on the connection, + // then we cannot promise no message with tag = current_tag + delay because a + // subsequent reaction might produce such a message. But we can promise no + // message with a tag strictly less than current_tag + delay. + tag_t current_message_intended_tag = lf_delay_strict(env->current_tag, additional_delay); LF_PRINT_LOG("Sending port " diff --git a/core/tag.c b/core/tag.c index a7983b474..070d738e8 100644 --- a/core/tag.c +++ b/core/tag.c @@ -133,7 +133,7 @@ int lf_tag_compare(tag_t tag1, tag_t tag2) { } tag_t lf_delay_tag(tag_t tag, interval_t interval) { - if (tag.time == NEVER || interval == NEVER) return tag; + if (tag.time == NEVER || interval < 0LL) return tag; tag_t result = tag; if (interval == 0LL) { // Note that unsigned variables will wrap on overflow. @@ -152,11 +152,16 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { return result; } -tag_t lf_delay_antitag(tag_t antitag, interval_t interval) { - tag_t result = lf_delay_tag(antitag, interval); - if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) { - LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time); - result.time -= 1; +tag_t lf_delay_strict(tag_t tag, interval_t interval) { + if (tag.time == NEVER || interval < 0LL) return tag; + tag_t result = tag; + if (interval > 0LL) { + // Note that overflow in C is undefined for signed variables. + if (FOREVER - interval < result.time) { + result.time = FOREVER; + } else { + result.time += interval - 1; + } result.microstep = UINT_MAX; } return result; diff --git a/include/core/tag.h b/include/core/tag.h index 7a331b9a1..14c2dc510 100644 --- a/include/core/tag.h +++ b/include/core/tag.h @@ -94,16 +94,18 @@ int lf_tag_compare(tag_t tag1, tag_t tag2); /** * Delay a tag by the specified time interval to realize the "after" keyword. - * If either the time interval or the time field of the tag is NEVER, - * return the unmodified tag. - * If the time interval is 0LL, add one to the microstep, leave - * the time field alone, and return the result. + * Any interval less than 0 (including NEVER) is interpreted as "no delay", + * whereas an interval equal to 0 is interpreted as one microstep delay. + * If the time field of the tag is NEVER or the interval is negative, + * return the unmodified tag. If the time interval is 0LL, add one to + * the microstep, leave the time field alone, and return the result. * Otherwise, add the interval to the time field of the tag and reset - * the microstep to 0. - * If the sum overflows, saturate the time value at FOREVER. - * - * Note that normally it makes no sense to call this with a negative - * interval (except NEVER), but this is not checked. + * the microstep to 0. If the sum overflows, saturate the time value at + * FOREVER. For example: + * - if tag = (t, 0) and interval = 10, return (t + 10, 0) + * - if tag = (t, 0) and interval = 0, return (t, 1) + * - if tag = (t, 0) and interval = NEVER, return (t, 0) + * - if tag = (FOREVER, 0) and interval = 10, return (FOREVER, 0) * * @param tag The tag to increment. * @param interval The time interval. @@ -111,13 +113,21 @@ int lf_tag_compare(tag_t tag1, tag_t tag2); tag_t lf_delay_tag(tag_t tag, interval_t interval); /** - * Return the latest tag that cannot have an event provided that - * - `antitag` does not have an event, and - * - if an event occurs at time `x`, then there exists a tag `antitag` such that - * `x = antitag + interval`, where `+` is the usual `+` that we use for tags - * which is neither commutative nor a group operation. + * Return the latest tag strictly less than the specified tag plus the + * interval, unless tag is NEVER or interval is negative (including NEVER), + * in which case return the tag unmodified. Any interval less than 0 + * (including NEVER) is interpreted as "no delay", whereas an interval + * equal to 0 is interpreted as one microstep delay. If the time sum + * overflows, saturate the time value at FOREVER. For example: + * - if tag = (t, 0) and interval = 10, return (t + 10 - 1, UINT_MAX) + * - if tag = (t, 0) and interval = 0, return (t, 0) + * - if tag = (t, 0) and interval = NEVER, return (t, 0) + * - if tag = (FOREVER, 0) and interval = 10, return (FOREVER, 0) + * + * @param tag The tag to increment. + * @param interval The time interval. */ -tag_t lf_delay_antitag(tag_t antitag, interval_t interval); +tag_t lf_delay_strict(tag_t tag, interval_t interval); /** * Return the current logical time in nanoseconds. From 9dfb1505296b0e5cf0f80f2d3502bfb991dff0ea Mon Sep 17 00:00:00 2001 From: Peter Donovan Date: Wed, 13 Sep 2023 17:36:47 -0700 Subject: [PATCH 04/10] Partially revert some implementation changes. I think that the new implementation incorrectly handled the case for delay_strict when the interval is zero and a single-microstep delay is introduced. This highlights the amount of subtlety in the meaning of the function's behavior and shows that it is not as simple as returning the latest tag that is less than the sum. Instead, it has to return the latest tag in the current interval at which nothing can happen provided that nothing happens at the current tag, like I explained originally. --- core/tag.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/core/tag.c b/core/tag.c index 070d738e8..3e9c366ff 100644 --- a/core/tag.c +++ b/core/tag.c @@ -153,15 +153,10 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) { } tag_t lf_delay_strict(tag_t tag, interval_t interval) { - if (tag.time == NEVER || interval < 0LL) return tag; - tag_t result = tag; - if (interval > 0LL) { - // Note that overflow in C is undefined for signed variables. - if (FOREVER - interval < result.time) { - result.time = FOREVER; - } else { - result.time += interval - 1; - } + tag_t result = lf_delay_tag(tag, interval); + if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) { + LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time); + result.time -= 1; result.microstep = UINT_MAX; } return result; From 54ae0700888260c8162e83fbd4cee4142b48964f Mon Sep 17 00:00:00 2001 From: Erling Rennemo Jellum Date: Thu, 14 Sep 2023 20:24:07 +0200 Subject: [PATCH 05/10] Fix typo in zephyr platform --- core/platform/lf_zephyr_support.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/platform/lf_zephyr_support.c b/core/platform/lf_zephyr_support.c index de8e51c23..34c2f2177 100644 --- a/core/platform/lf_zephyr_support.c +++ b/core/platform/lf_zephyr_support.c @@ -146,7 +146,7 @@ void _lf_initialize_clock() { // Clock and sleep implementation for the HI_RES clock based on // Zephyrs Counter API - * Return the current time in nanoseconds. It gets the current value +/* Return the current time in nanoseconds. It gets the current value * of the hi-res counter device and also keeps track of overflows * to deliver a monotonically increasing clock. */ From 300b4d1434ff9fb7274b65abe91d4a31e3c24bca Mon Sep 17 00:00:00 2001 From: ChadliaJerad Date: Thu, 14 Sep 2023 15:42:24 -0700 Subject: [PATCH 06/10] Fix the wrong assumption about the last event in a federate processed trace --- util/tracing/visualization/fedsd.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 9d482ab82..241e30930 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -334,9 +334,14 @@ def load_and_process_csv_file(csv_file) : # which boils up to having 'RTI' in the 'event' column df = df[df['event'].str.contains('Sending|Receiving|Scheduler advancing time ends') == True] + id = -1 # Fix the parameters of the event 'Scheduler advancing time ends' - # We rely on the fact that the first row of the csv file cannot be the end of advancing time - id = df.iloc[-1]['self_id'] + # where 'self_id' should not be -1, but rather the value from a sending or + # receiving event + for index, row in df.iterrows(): + if ('Sending' in row['event'] or 'Receiving' in row['event']) : + id = row['self_id'] + break df['self_id'] = id df = df.astype({'self_id': 'int', 'partner_id': 'int'}) From 0e3fdda45f4ce3ebdb5930afc0d4ff6a815105e3 Mon Sep 17 00:00:00 2001 From: Erling Rennemo Jellum Date: Fri, 15 Sep 2023 09:54:17 +0200 Subject: [PATCH 07/10] Do not link with thread library when we are compiling for Zephyr --- core/CMakeLists.txt | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index f28e424c7..923daa561 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -74,10 +74,12 @@ if(DEFINED _LF_CLOCK_SYNC_ON) endif() endif() -# Link with thread library +# Link with thread library. But if we are targeting the Zephyr RTOS if(DEFINED LF_THREADED OR DEFINED LF_TRACE) - find_package(Threads REQUIRED) - target_link_libraries(core PUBLIC Threads::Threads) + if(NOT ${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr") + find_package(Threads REQUIRED) + target_link_libraries(core PUBLIC Threads::Threads) + endif() endif() # Macro for translating a command-line argument into compile definition for From 7e031ec1400fbebc1f90a7ce41fa10ffa40549d2 Mon Sep 17 00:00:00 2001 From: Erling Rennemo Jellum Date: Fri, 15 Sep 2023 09:57:11 +0200 Subject: [PATCH 08/10] Fix spelling --- core/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 923daa561..a9026392c 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -74,7 +74,7 @@ if(DEFINED _LF_CLOCK_SYNC_ON) endif() endif() -# Link with thread library. But if we are targeting the Zephyr RTOS +# Link with thread library, unless if we are targeting the Zephyr RTOS if(DEFINED LF_THREADED OR DEFINED LF_TRACE) if(NOT ${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr") find_package(Threads REQUIRED) From 87e0dd65f32099746dafd10cacedbfd1324c6758 Mon Sep 17 00:00:00 2001 From: ChadliaJerad Date: Fri, 15 Sep 2023 14:00:41 -0700 Subject: [PATCH 09/10] Tweak comments based on code review --- util/tracing/visualization/fedsd.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 241e30930..8739e83a7 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -334,10 +334,8 @@ def load_and_process_csv_file(csv_file) : # which boils up to having 'RTI' in the 'event' column df = df[df['event'].str.contains('Sending|Receiving|Scheduler advancing time ends') == True] + # Determine the "self id" in the trace file based on the first 'Receiving' or 'Sending' message (or use -1, the id of the RTI, if there is none). id = -1 - # Fix the parameters of the event 'Scheduler advancing time ends' - # where 'self_id' should not be -1, but rather the value from a sending or - # receiving event for index, row in df.iterrows(): if ('Sending' in row['event'] or 'Receiving' in row['event']) : id = row['self_id'] From f0bd1bc6533e4b47f2af9d4e9d8613fa5e670be0 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 18 Sep 2023 14:40:33 +0200 Subject: [PATCH 10/10] Guard against unconnected outputs --- core/reactor.c | 6 ++++-- core/threaded/reactor_threaded.c | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/reactor.c b/core/reactor.c index 90129ae26..b30632366 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -53,12 +53,14 @@ extern instant_t start_time; /** * Mark the given port's is_present field as true. This is_present field - * will later be cleaned up by _lf_start_time_step. + * will later be cleaned up by _lf_start_time_step. If the port is unconnected, + * do nothing. * @param env Environment in which we are executing * @param port A pointer to the port struct. */ void _lf_set_present(lf_port_base_t* port) { - environment_t *env = port->source_reactor->environment; + if (!port->source_reactor) return; + environment_t *env = port->source_reactor->environment; bool* is_present_field = &port->is_present; if (env->is_present_fields_abbreviated_size < env->is_present_fields_size) { env->is_present_fields_abbreviated[env->is_present_fields_abbreviated_size] diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 2f336d662..251f8b1ba 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -210,12 +210,14 @@ int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag) { /** * Mark the given port's is_present field as true. This is_present field - * will later be cleaned up by _lf_start_time_step. + * will later be cleaned up by _lf_start_time_step. If the port is unconnected, + * do nothing. * This assumes that the mutex is not held. * @param port A pointer to the port struct. */ void _lf_set_present(lf_port_base_t* port) { - environment_t *env = port->source_reactor->environment; + if (!port->source_reactor) return; + environment_t *env = port->source_reactor->environment; bool* is_present_field = &port->is_present; int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1); if (ipfas < env->is_present_fields_size) {