Skip to content

Commit

Permalink
Merge branch 'main' into rti-NET-forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun authored Sep 19, 2023
2 parents 2ebc563 + 064e1ad commit 3d4c114
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 32 deletions.
8 changes: 5 additions & 3 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ if(DEFINED _LF_CLOCK_SYNC_ON)
endif()
endif()

# Link with thread library
# Link with thread library, unless if we are targeting the Zephyr RTOS
if(DEFINED LF_THREADED OR DEFINED LF_TRACE)
find_package(Threads REQUIRED)
target_link_libraries(core PUBLIC Threads::Threads)
if(NOT ${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
find_package(Threads REQUIRED)
target_link_libraries(core PUBLIC Threads::Threads)
endif()
endif()

# Macro for translating a command-line argument into compile definition for
Expand Down
27 changes: 18 additions & 9 deletions core/federated/RTI/enclave.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) {
// Ignore this enclave if it no longer connected.
if (upstream->state == NOT_CONNECTED) continue;

tag_t candidate = lf_delay_tag(upstream->completed, e->upstream_delay[j]);
// Adjust by the "after" delay.
// Note that "no delay" is encoded as NEVER,
// whereas one microstep delay is encoded as 0LL.
tag_t candidate = lf_delay_strict(upstream->completed, e->upstream_delay[j]);

if (lf_tag_compare(candidate, min_upstream_completed) < 0) {
min_upstream_completed = candidate;
Expand Down Expand Up @@ -102,12 +105,13 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) {

// Find the tag of the earliest possible incoming message from
// upstream enclaves.
tag_t t_d = FOREVER_TAG;
tag_t t_d_nonzero_delay = FOREVER_TAG;
// The tag of the earliest possible incoming message from a zero-delay connection.
// Delayed connections are not guarded from STP violations by the MLAA; this property is
// acceptable because delayed connections impose no deadlock risk and in some cases (startup)
// this property is necessary to avoid deadlocks. However, it requires some special care here
// when potentially sending a PTAG.
// when potentially sending a PTAG because we must not send a PTAG for a tag at which data may
// still be received over nonzero-delay connections.
tag_t t_d_zero_delay = FOREVER_TAG;
LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG,
FOREVER_TAG.time - start_time, FOREVER_TAG.microstep,
Expand All @@ -131,16 +135,20 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) {
// Adjust by the "after" delay.
// Note that "no delay" is encoded as NEVER,
// whereas one microstep delay is encoded as 0LL.
tag_t candidate = lf_delay_tag(upstream_next_event, e->upstream_delay[j]);
tag_t candidate = lf_delay_strict(upstream_next_event, e->upstream_delay[j]);

if (lf_tag_compare(candidate, t_d) < 0) {
t_d = candidate;
}
if (lf_tag_compare(candidate, t_d_zero_delay) < 0 && e->upstream_delay[j] == NEVER) {
t_d_zero_delay = candidate;
if (e->upstream_delay[j] == NEVER) {
if (lf_tag_compare(candidate, t_d_zero_delay) < 0) {
t_d_zero_delay = candidate;
}
} else {
if (lf_tag_compare(candidate, t_d_nonzero_delay) < 0) {
t_d_nonzero_delay = candidate;
}
}
}
free(visited);
tag_t t_d = (lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0) ? t_d_zero_delay : t_d_nonzero_delay;

LF_PRINT_LOG("Earliest next event upstream has tag " PRINTF_TAG ".",
t_d.time - start_time, t_d.microstep);
Expand All @@ -162,6 +170,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) {
result.tag = e->next_event;
} else if (
lf_tag_compare(t_d_zero_delay, e->next_event) == 0 // The enclave has something to do.
&& lf_tag_compare(t_d_zero_delay, t_d_nonzero_delay) < 0 // The statuses of nonzero-delay connections are known at tag t_d_zero_delay
&& lf_tag_compare(t_d_zero_delay, e->last_provisionally_granted) > 0 // The grant is not redundant.
&& lf_tag_compare(t_d_zero_delay, e->last_granted) > 0 // The grant is not redundant.
) {
Expand Down
7 changes: 5 additions & 2 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1406,8 +1406,11 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela
unsigned char buffer[message_length];

// Apply the additional delay to the current tag and use that as the intended
// tag of the outgoing message
tag_t current_message_intended_tag = lf_delay_tag(env->current_tag,
// tag of the outgoing message. Note that if there is delay on the connection,
// then we cannot promise no message with tag = current_tag + delay because a
// subsequent reaction might produce such a message. But we can promise no
// message with a tag strictly less than current_tag + delay.
tag_t current_message_intended_tag = lf_delay_strict(env->current_tag,
additional_delay);

LF_PRINT_LOG("Sending port "
Expand Down
2 changes: 1 addition & 1 deletion core/platform/lf_zephyr_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void _lf_initialize_clock() {
// Clock and sleep implementation for the HI_RES clock based on
// Zephyrs Counter API

* Return the current time in nanoseconds. It gets the current value
/* Return the current time in nanoseconds. It gets the current value
* of the hi-res counter device and also keeps track of overflows
* to deliver a monotonically increasing clock.
*/
Expand Down
6 changes: 4 additions & 2 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ extern instant_t start_time;

/**
* Mark the given port's is_present field as true. This is_present field
* will later be cleaned up by _lf_start_time_step.
* will later be cleaned up by _lf_start_time_step. If the port is unconnected,
* do nothing.
* @param env Environment in which we are executing
* @param port A pointer to the port struct.
*/
void _lf_set_present(lf_port_base_t* port) {
environment_t *env = port->source_reactor->environment;
if (!port->source_reactor) return;
environment_t *env = port->source_reactor->environment;
bool* is_present_field = &port->is_present;
if (env->is_present_fields_abbreviated_size < env->is_present_fields_size) {
env->is_present_fields_abbreviated[env->is_present_fields_abbreviated_size]
Expand Down
12 changes: 11 additions & 1 deletion core/tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ int lf_tag_compare(tag_t tag1, tag_t tag2) {
}

tag_t lf_delay_tag(tag_t tag, interval_t interval) {
if (tag.time == NEVER || interval == NEVER) return tag;
if (tag.time == NEVER || interval < 0LL) return tag;
tag_t result = tag;
if (interval == 0LL) {
// Note that unsigned variables will wrap on overflow.
Expand All @@ -152,6 +152,16 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) {
return result;
}

tag_t lf_delay_strict(tag_t tag, interval_t interval) {
tag_t result = lf_delay_tag(tag, interval);
if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) {
LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time);
result.time -= 1;
result.microstep = UINT_MAX;
}
return result;
}

instant_t lf_time_logical(void *env) {
assert(env != GLOBAL_ENVIRONMENT);
return ((environment_t *) env)->current_tag.time;
Expand Down
6 changes: 4 additions & 2 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,14 @@ int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag) {

/**
* Mark the given port's is_present field as true. This is_present field
* will later be cleaned up by _lf_start_time_step.
* will later be cleaned up by _lf_start_time_step. If the port is unconnected,
* do nothing.
* This assumes that the mutex is not held.
* @param port A pointer to the port struct.
*/
void _lf_set_present(lf_port_base_t* port) {
environment_t *env = port->source_reactor->environment;
if (!port->source_reactor) return;
environment_t *env = port->source_reactor->environment;
bool* is_present_field = &port->is_present;
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
Expand Down
37 changes: 28 additions & 9 deletions include/core/tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,41 @@ int lf_tag_compare(tag_t tag1, tag_t tag2);

/**
* Delay a tag by the specified time interval to realize the "after" keyword.
* If either the time interval or the time field of the tag is NEVER,
* return the unmodified tag.
* If the time interval is 0LL, add one to the microstep, leave
* the time field alone, and return the result.
* Any interval less than 0 (including NEVER) is interpreted as "no delay",
* whereas an interval equal to 0 is interpreted as one microstep delay.
* If the time field of the tag is NEVER or the interval is negative,
* return the unmodified tag. If the time interval is 0LL, add one to
* the microstep, leave the time field alone, and return the result.
* Otherwise, add the interval to the time field of the tag and reset
* the microstep to 0.
* If the sum overflows, saturate the time value at FOREVER.
*
* Note that normally it makes no sense to call this with a negative
* interval (except NEVER), but this is not checked.
* the microstep to 0. If the sum overflows, saturate the time value at
* FOREVER. For example:
* - if tag = (t, 0) and interval = 10, return (t + 10, 0)
* - if tag = (t, 0) and interval = 0, return (t, 1)
* - if tag = (t, 0) and interval = NEVER, return (t, 0)
* - if tag = (FOREVER, 0) and interval = 10, return (FOREVER, 0)
*
* @param tag The tag to increment.
* @param interval The time interval.
*/
tag_t lf_delay_tag(tag_t tag, interval_t interval);

/**
* Return the latest tag strictly less than the specified tag plus the
* interval, unless tag is NEVER or interval is negative (including NEVER),
* in which case return the tag unmodified. Any interval less than 0
* (including NEVER) is interpreted as "no delay", whereas an interval
* equal to 0 is interpreted as one microstep delay. If the time sum
* overflows, saturate the time value at FOREVER. For example:
* - if tag = (t, 0) and interval = 10, return (t + 10 - 1, UINT_MAX)
* - if tag = (t, 0) and interval = 0, return (t, 0)
* - if tag = (t, 0) and interval = NEVER, return (t, 0)
* - if tag = (FOREVER, 0) and interval = 10, return (FOREVER, 0)
*
* @param tag The tag to increment.
* @param interval The time interval.
*/
tag_t lf_delay_strict(tag_t tag, interval_t interval);

/**
* Return the current logical time in nanoseconds.
* On many platforms, this is the number of nanoseconds
Expand Down
9 changes: 6 additions & 3 deletions util/tracing/visualization/fedsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,12 @@ def load_and_process_csv_file(csv_file) :
# which boils up to having 'RTI' in the 'event' column
df = df[df['event'].str.contains('Sending|Receiving|Scheduler advancing time ends') == True]

# Fix the parameters of the event 'Scheduler advancing time ends'
# We rely on the fact that the first row of the csv file cannot be the end of advancing time
id = df.iloc[-1]['self_id']
# Determine the "self id" in the trace file based on the first 'Receiving' or 'Sending' message (or use -1, the id of the RTI, if there is none).
id = -1
for index, row in df.iterrows():
if ('Sending' in row['event'] or 'Receiving' in row['event']) :
id = row['self_id']
break
df['self_id'] = id
df = df.astype({'self_id': 'int', 'partner_id': 'int'})

Expand Down

0 comments on commit 3d4c114

Please sign in to comment.