Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A EDF scheduler based on enclaves #484

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ define(FEDERATED_DECENTRALIZED)
define(FEDERATED)
define(FEDERATED_AUTHENTICATED)
define(FEDERATE_ID)
define(LF_NUMBER_OF_CORES)
define(LF_REACTION_GRAPH_BREADTH)
define(LF_THREAD_POLICY)
define(LF_TRACE)
define(LF_SINGLE_THREADED)
define(LOG_LEVEL)
Expand Down
7 changes: 4 additions & 3 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ void _lf_worker_invoke_reaction(environment_t* env, int worker_number, reaction_
* @param env Environment within which we are executing.
* @param worker_number The number assigned to this worker thread
*/
void _lf_worker_do_work(environment_t* env, int worker_number) {
static void _lf_worker_do_work(environment_t* env, int worker_number) {
assert(env != GLOBAL_ENVIRONMENT);

// Keep track of whether we have decremented the idle thread count.
Expand Down Expand Up @@ -874,6 +874,8 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
*/
void* worker(void* arg) {
initialize_lf_thread_id();
lf_sched_configure_worker();

environment_t* env = (environment_t*)arg;
LF_MUTEX_LOCK(&env->mutex);

Expand Down Expand Up @@ -984,7 +986,6 @@ void determine_number_of_workers(void) {
* at compile time.
*/
int lf_reactor_c_main(int argc, const char* argv[]) {
initialize_lf_thread_id();
// Invoke the function that optionally provides default command-line options.
lf_set_default_command_line_options();

Expand Down Expand Up @@ -1075,7 +1076,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
lf_print("Environment %u: ---- Intializing start tag", env->id);
_lf_initialize_start_tag(env);

lf_print("Environment %u: ---- Spawning %d workers.", env->id, env->num_workers);
lf_print("Environment %u: ---- Spawning %d workers on %d cores.", env->id, env->num_workers, LF_NUMBER_OF_CORES);

for (int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
Expand Down
451 changes: 451 additions & 0 deletions core/threaded/scheduler_GEDF_NP.c

Large diffs are not rendered by default.

14 changes: 3 additions & 11 deletions core/threaded/scheduler_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,9 @@ void lf_sched_free(lf_scheduler_t* scheduler) {
}

///////////////////// Scheduler Worker API (public) /////////////////////////
/**
* @brief Ask the scheduler for one more reaction.
*
* This function blocks until it can return a ready reaction for worker thread
* 'worker_number' or it is time for the worker thread to stop and exit (where a
* NULL value would be returned).
*
* @param worker_number
* @return reaction_t* A reaction for the worker to execute. NULL if the calling
* worker thread should exit.
*/

void lf_sched_configure_worker() {}

reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) {
// Iterate until the stop tag is reached or reaction vectors are empty
while (!scheduler->should_stop) {
Expand Down
2 changes: 2 additions & 0 deletions core/threaded/scheduler_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ void lf_sched_free(lf_scheduler_t* scheduler) {

///////////////////////// Scheduler Worker API ///////////////////////////////

void lf_sched_configure_worker() {}

reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) {
assert(worker_number >= 0);
reaction_t* ret;
Expand Down
21 changes: 21 additions & 0 deletions include/core/threaded/reactor_threaded.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@

#include "lf_types.h"

/**
* @brief The number of cores to use.
*
* If the target parameter number_of_cores is set, it will override this default.
*/
#ifndef LF_NUMBER_OF_CORES
#define LF_NUMBER_OF_CORES 0
#endif

/**
* @brief The thread scheduling policy to use.
*
* This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY.
* The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER.
* LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds
* to SCHED_FIFO.
*/
#ifndef LF_THREAD_POLICY
#define LF_THREAD_POLICY LF_SCHED_FAIR
#endif

/**
* Enqueue port absent reactions that will send a PORT_ABSENT
* message to downstream federates if a given network output port is not present.
Expand Down
5 changes: 5 additions & 0 deletions include/core/threaded/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,9 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction
*/
void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number);

/**
* @brief Initialize priority and set core binding for the calling worker thread, if appropriate.
*/
void lf_sched_configure_worker();

#endif // LF_SCHEDULER_H
25 changes: 16 additions & 9 deletions low_level_platform/api/low_level_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,15 @@ int lf_available_cores();
lf_thread_t lf_thread_self();

/**
* Create a new thread, starting with execution of lf_thread
* getting passed arguments. The new handle is stored in thread_id.
* @brief Create a new thread and start execution of the function lf_thread
* with the specified arguments.
*
* The new handle is stored in thread_id.
*
* @return 0 on success, platform-specific error number otherwise.
*/
int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments);

/**
* @brief Helper function for creating a thread.
*/
int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments);

/**
* Make calling thread wait for termination of the thread. The
* exit status of the thread is stored in thread_return if thread_return
Expand Down Expand Up @@ -174,23 +171,33 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number);
* number indicates higher priority. Setting the priority of a thread only
* makes sense if the thread is scheduled with LF_SCHED_TIMESLICE or LF_THREAD_PRIORITY
*
* @param thread The thread.
* @param thread The thread ID.
* @param priority The priority.
* @return int 0 on success, platform-specific error otherwise
*/
int lf_thread_set_priority(lf_thread_t thread, int priority);

/**
* @brief Set the scheduling policy of a thread. This is based on the scheduling
* DEBUGGING
*/
int lf_thread_get_priority(lf_thread_t thread);

/**
* @brief Set the scheduling policy of a thread.
*
* This is based on the scheduling
* concept from Linux explained here: https://man7.org/linux/man-pages/man7/sched.7.html
* A scheduling policy is specific to a thread/worker. We have three policies
* LF_SCHED_PRIORITY which corresponds to SCHED_FIFO on Linux.
* LF_SCHED_TIMESLICE which corresponds to SCHED_RR on Linux.
* LF_SCHED_FAIR which corresponds to SCHED_OTHER on Linux.
*
* @param thread The thread ID.
* @param policy A pointer to the policy (this will not be used after returning, so it can be on the stack).
* @return int 0 on success, platform-specific error number otherwise.
*/
int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy);
// FIXME: The policy pointer argument is worrisome. Can it really be on the stack?

/**
* Initialize a mutex.
Expand Down
7 changes: 7 additions & 0 deletions low_level_platform/impl/src/lf_POSIX_threads_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ int lf_mutex_init(lf_mutex_t* mutex) {
// of the predicate.” This seems like a bug in the implementation of
// pthreads. Maybe it has been fixed?
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);

// Initialize the mutex protocol to INHERIT:
// a thread t1 owning the mutex, when it is preempted by a
// higher-priority thread t2 that tries to get the lock on the
// same mutex, inherits t2's priority.
pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT);

return pthread_mutex_init((pthread_mutex_t*)mutex, &attr);
}

Expand Down
11 changes: 11 additions & 0 deletions low_level_platform/impl/src/lf_flexpret_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ void initialize_lf_thread_id() {
// Nothing needed here; thread ID's are already available in harware registers
// which can be fetched with `read_hartid`.
}

/**
* Real-time scheduling API not implemented for FlexPRET.
*/
int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; }

int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; }

int lf_thread_get_priority(lf_thread_t thread) { return -1; }

int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; }
#endif

#endif // PLATFORM_FLEXPRET
12 changes: 12 additions & 0 deletions low_level_platform/impl/src/lf_linux_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) {
return pthread_setaffinity_np(thread, sizeof(cpu_set), &cpu_set);
}

// FIXME: This does not do any translation between LF_SCHED priority range and
// the Linux sched priority range.
int lf_thread_get_priority(lf_thread_t thread) {
struct sched_param schedparam;
int policy;

// Get the current scheduling policy
pthread_getschedparam(thread, &policy, &schedparam);

return schedparam.sched_priority;
}

int lf_thread_set_priority(lf_thread_t thread, int priority) {
int posix_policy, min_pri, max_pri, final_priority, res;
struct sched_param schedparam;
Expand Down
2 changes: 2 additions & 0 deletions low_level_platform/impl/src/lf_macos_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; }

int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; }

int lf_thread_get_priority(lf_thread_t thread) { return -1; }

int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; }
#endif

Expand Down
2 changes: 2 additions & 0 deletions low_level_platform/impl/src/lf_windows_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; }

int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; }

int lf_thread_get_priority(lf_thread_t thread) { return -1; }

int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; }

int lf_mutex_init(_lf_critical_section_t* critical_section) {
Expand Down
3 changes: 3 additions & 0 deletions low_level_platform/impl/src/lf_zephyr_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ int lf_thread_set_priority(lf_thread_t thread, int priority) {
return 0;
}

// FIXME: Implement this.
int lf_thread_get_priority(lf_thread_t thread) { return -1; }

int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) {
// Update the policy
switch (policy->policy) {
Expand Down
Loading