Skip to content

Commit

Permalink
Merge branch 'main' into transient-fed
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadliaJerad committed Oct 1, 2024
2 parents 1d06d53 + 815696c commit 8f14707
Show file tree
Hide file tree
Showing 28 changed files with 290 additions and 149 deletions.
20 changes: 16 additions & 4 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,16 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
const char* trace_file_name) {
(void)trace_file_name; // Will be used with future enclave support.

env->name = malloc(strlen(name) + 1); // +1 for the null terminator
LF_ASSERT_NON_NULL(env->name);
strcpy(env->name, name);

// Space for the name string with the null terminator.
if (name != NULL) {
size_t name_size = strlen(name) + 1; // +1 for the null terminator
env->name = (char*)malloc(name_size);
LF_ASSERT_NON_NULL(env->name);
// Use strncpy rather than strcpy to avoid compiler warnings.
strncpy(env->name, name, name_size);
} else {
env->name = NULL;
}
env->id = id;
env->stop_tag = FOREVER_TAG;

Expand Down Expand Up @@ -284,3 +290,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->initialized = true;
return 0;
}

void environment_verify(environment_t* env) {
for (int i = 0; i < env->is_present_fields_size; i++) {
LF_ASSERT_NON_NULL(env->is_present_fields[i]);
}
}
4 changes: 2 additions & 2 deletions core/federated/RTI/rti.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARG BASEIMAGE=alpine:latest
FROM ${BASEIMAGE} as builder
FROM ${BASEIMAGE} AS builder
COPY . /lingua-franca
WORKDIR /lingua-franca/core/federated/RTI
RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
Expand All @@ -12,7 +12,7 @@ RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
WORKDIR /lingua-franca

# application stage
FROM ${BASEIMAGE} as app
FROM ${BASEIMAGE} AS app
LABEL maintainer="lf-lang"
LABEL source="https://github.com/lf-lang/reactor-c/tree/main/core/federated/RTI"
COPY --from=builder /usr/local/bin/RTI /usr/local/bin/RTI
Expand Down
103 changes: 65 additions & 38 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag
// federate that is far ahead of other upstream federates in logical time.
lf_update_max_level(_fed.last_TAG, _fed.is_last_TAG_provisional);
lf_cond_broadcast(&lf_port_status_changed);
lf_cond_broadcast(&env->event_q_changed);
} else {
// Message arrivals should be monotonic, so this should not occur.
lf_print_warning("Attempt to update the last known status tag "
Expand All @@ -292,6 +293,34 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag
}
}

/**
* @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER.
*
* This does nothing if the federate is not using decentralized coordination.
* This function acquires the mutex on the top-level environment.
* @param fed_id The ID of the federate.
*/
static void mark_inputs_known_absent(int fed_id) {
#ifdef FEDERATED_DECENTRALIZED
// Note that when transient federates are supported, this will need to be updated because the
// federate could rejoin.
environment_t* env;
_lf_get_environments(&env);
LF_MUTEX_LOCK(&env->mutex);

for (size_t i = 0; i < _lf_action_table_size; i++) {
lf_action_base_t* action = _lf_action_table[i];
if (action->source_id == fed_id) {
update_last_known_status_on_input_port(env, FOREVER_TAG, i);
}
}
LF_MUTEX_UNLOCK(&env->mutex);
#else
// Do nothing, except suppress unused parameter error.
(void)fed_id;
#endif // FEDERATED_DECENTRALIZED
}

/**
* Set the status of network port with id portID.
*
Expand Down Expand Up @@ -735,46 +764,46 @@ static void* listen_to_federates(void* _args) {
bool socket_closed = false;
// Read one byte to get the message type.
LF_PRINT_DEBUG("Waiting for a P2P message on socket %d.", *socket_id);
bool bad_message = false;
if (read_from_socket_close_on_error(socket_id, 1, buffer)) {
// Socket has been closed.
lf_print("Socket from federate %d is closed.", fed_id);
// Stop listening to this federate.
socket_closed = true;
break;
}
LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]);
bool bad_message = false;
switch (buffer[0]) {
case MSG_TYPE_P2P_MESSAGE:
LF_PRINT_LOG("Received untimed message from federate %d.", fed_id);
if (handle_message(socket_id, fed_id)) {
// Failed to complete the reading of a message on a physical connection.
lf_print_warning("Failed to complete reading of message on physical connection.");
socket_closed = true;
}
break;
case MSG_TYPE_P2P_TAGGED_MESSAGE:
LF_PRINT_LOG("Received tagged message from federate %d.", fed_id);
if (handle_tagged_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
case MSG_TYPE_PORT_ABSENT:
LF_PRINT_LOG("Received port absent message from federate %d.", fed_id);
if (handle_port_absent_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
} else {
LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]);
switch (buffer[0]) {
case MSG_TYPE_P2P_MESSAGE:
LF_PRINT_LOG("Received untimed message from federate %d.", fed_id);
if (handle_message(socket_id, fed_id)) {
// Failed to complete the reading of a message on a physical connection.
lf_print_warning("Failed to complete reading of message on physical connection.");
socket_closed = true;
}
break;
case MSG_TYPE_P2P_TAGGED_MESSAGE:
LF_PRINT_LOG("Received tagged message from federate %d.", fed_id);
if (handle_tagged_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
case MSG_TYPE_PORT_ABSENT:
LF_PRINT_LOG("Received port absent message from federate %d.", fed_id);
if (handle_port_absent_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
default:
bad_message = true;
}
break;
default:
bad_message = true;
}
if (bad_message) {
lf_print_error("Received erroneous message type: %d. Closing the socket.", buffer[0]);
Expand All @@ -783,12 +812,10 @@ static void* listen_to_federates(void* _args) {
break; // while loop
}
if (socket_closed) {
// NOTE: For decentralized execution, once this socket is closed, we could
// For decentralized execution, once this socket is closed, we
// update last known tags of all ports connected to the specified federate to FOREVER_TAG,
// which would eliminate the need to wait for STAA to assume an input is absent.
// However, at this time, we don't know which ports correspond to which upstream federates.
// The code generator would have to encode this information. Once that is done,
// we could call update_last_known_status_on_input_port with FOREVER_TAG.
mark_inputs_known_absent(fed_id);

break; // while loop
}
Expand Down
24 changes: 19 additions & 5 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,19 @@ void _lf_pop_events(environment_t* env) {
}
}

// Mark the trigger present.
// Mark the trigger present
event->trigger->status = present;

// If the trigger is a periodic timer, create a new event for its next execution.
if (event->trigger->is_timer && event->trigger->period > 0LL) {
// Reschedule the trigger.
lf_schedule_trigger(env, event->trigger, event->trigger->period, NULL);
} else {
// For actions, store a pointer to status field so it is reset later.
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
env->is_present_fields_abbreviated[ipfas] = (bool*)&event->trigger->status;
}
}

// Copy the token pointer into the trigger struct so that the
Expand All @@ -323,9 +329,6 @@ void _lf_pop_events(environment_t* env) {
// freed prematurely.
_lf_done_using(token);

// Mark the trigger present.
event->trigger->status = present;

lf_recycle_event(env, event);

// Peek at the next event in the event queue.
Expand Down Expand Up @@ -603,8 +606,12 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t*
// for which we decrement the reference count.
_lf_replace_template_token((token_template_t*)trigger, token);

// Mark the trigger present.
// Mark the trigger present and store a pointer to it for marking it as absent later.
trigger->status = present;
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
env->is_present_fields_abbreviated[ipfas] = (bool*)&trigger->status;
}

// Push the corresponding reactions for this trigger
// onto the reaction queue.
Expand Down Expand Up @@ -1096,6 +1103,13 @@ void initialize_global(void) {
// Call the code-generated function to initialize all actions, timers, and ports
// This is done for all environments/enclaves at the same time.
_lf_initialize_trigger_objects();

#if !defined(LF_SINGLE_THREADED) && !defined(NDEBUG)
// If we are testing, verify that environment with pointers is correctly set up.
for (int i = 0; i < num_envs; i++) {
environment_verify(&envs[i]);
}
#endif
}

/**
Expand Down
16 changes: 11 additions & 5 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ void lf_set_present(lf_port_base_t* port) {
return;
environment_t* env = port->source_reactor->environment;
bool* is_present_field = &port->is_present;
int ipfas = lf_atomic_fetch_add32(&env->is_present_fields_abbreviated_size, 1);
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
env->is_present_fields_abbreviated[ipfas] = is_present_field;
}
*is_present_field = true;

// Support for sparse destination multiports.
if (port->sparse_record && port->destination_channel >= 0 && port->sparse_record->size >= 0) {
size_t next = (size_t)lf_atomic_fetch_add32(&port->sparse_record->size, 1);
size_t next = (size_t)lf_atomic_fetch_add(&port->sparse_record->size, 1);
if (next >= port->sparse_record->capacity) {
// Buffer is full. Have to revert to the classic iteration.
port->sparse_record->size = -1;
Expand Down Expand Up @@ -1027,13 +1027,17 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
#endif

LF_PRINT_DEBUG("Start time: " PRINTF_TIME "ns", start_time);
struct timespec physical_time_timespec = {start_time / BILLION, start_time % BILLION};

#ifdef MINIMAL_STDLIB
lf_print("---- Start execution ----");
#else
lf_print("---- Start execution at time %s---- plus %ld nanoseconds", ctime(&physical_time_timespec.tv_sec),
physical_time_timespec.tv_nsec);
struct timespec physical_time_timespec = {start_time / BILLION, start_time % BILLION};
struct tm* time_info = localtime(&physical_time_timespec.tv_sec);
char buffer[80]; // Long enough to hold the formatted time string.
// Use strftime rather than ctime because as of C23, ctime is deprecated.
strftime(buffer, sizeof(buffer), "%a %b %d %H:%M:%S %Y", time_info);

lf_print("---- Start execution on %s ---- plus %ld nanoseconds", buffer, physical_time_timespec.tv_nsec);
#endif // MINIMAL_STDLIB

// Create and initialize the environments for each enclave
Expand Down Expand Up @@ -1118,6 +1122,8 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
} else {
int failure = lf_thread_join(env->thread_ids[j], &worker_thread_exit_status);
if (failure) {
// Windows warns that strerror is deprecated but doesn't define strerror_r.
// There seems to be no portable replacement.
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/threaded/scheduler_GEDF_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu

void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) {
(void)worker_number; // Suppress unused parameter warning.
if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) {
if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) {
lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued);
}
}

void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
(void)worker_number; // Suppress unused parameter warning.
if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) {
if (reaction == NULL || !lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) {
return;
}
LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index));
Expand Down
10 changes: 5 additions & 5 deletions core/threaded/scheduler_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
scheduler->indexes[reaction_level] = 0;
}
#endif
int reaction_q_level_index = lf_atomic_fetch_add32((int32_t*)&scheduler->indexes[reaction_level], 1);
int reaction_q_level_index = lf_atomic_fetch_add((int*)&scheduler->indexes[reaction_level], 1);
assert(reaction_q_level_index >= 0);
LF_PRINT_DEBUG("Scheduler: Accessing triggered reactions at the level %zu with index %d.", reaction_level,
reaction_q_level_index);
Expand Down Expand Up @@ -203,7 +203,7 @@ static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* schedul
static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) {
// Increment the number of idle workers by 1 and check if this is the last
// worker thread to become idle.
if (lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) {
if (lf_atomic_add_fetch((int*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) {
// Last thread to go idle
LF_PRINT_DEBUG("Scheduler: Worker %zu is the last idle thread.", worker_number);
// Call on the scheduler to distribute work or advance tag.
Expand Down Expand Up @@ -322,7 +322,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu
// the current level (if there is a causality loop)
LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[current_level]);
#endif
int current_level_q_index = lf_atomic_add_fetch32((int32_t*)&scheduler->indexes[current_level], -1);
int current_level_q_index = lf_atomic_add_fetch((int*)&scheduler->indexes[current_level], -1);
if (current_level_q_index >= 0) {
LF_PRINT_DEBUG("Scheduler: Worker %d popping reaction with level %zu, index "
"for level: %d.",
Expand Down Expand Up @@ -361,7 +361,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu
*/
void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) {
(void)worker_number;
if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) {
if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) {
lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued);
}
}
Expand All @@ -388,7 +388,7 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction
void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
(void)worker_number;

if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) {
if (reaction == NULL || !lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) {
return;
}
LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index));
Expand Down
Loading

0 comments on commit 8f14707

Please sign in to comment.