Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Introduce the next downstream tag (NDT) to optimize the communication in centralized federated execution #176

Closed
wants to merge 67 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
7995d88
First commit for fowarding NET message to optimize RTI
byeonggiljun Mar 8, 2023
812c779
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Apr 18, 2023
8ba5c1e
Add a new message type and its description
byeonggiljun Apr 18, 2023
8128844
Add skeletons of NDET
byeonggiljun Apr 19, 2023
fcf3f0a
Merge branch 'main' into rti-NET-forwarding
byeonggiljun May 17, 2023
ae2bfb3
Add some comments and start filling functions
byeonggiljun May 17, 2023
b06d638
Add encoding and sending part of NDET
byeonggiljun May 21, 2023
fe7a475
Merge branch 'main' into rti-NET-forwarding
byeonggiljun May 21, 2023
3a5e8d1
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Aug 31, 2023
871e9d5
Send NDT and rename NDET to NDT
byeonggiljun Aug 31, 2023
13ebb41
Send NDT if the RTI needs info from upstreams
byeonggiljun Sep 1, 2023
5005afd
Start implementing handle_next_downstream_tag
byeonggiljun Sep 2, 2023
a58cf3a
Start add ndt queue to store NDTs
byeonggiljun Sep 7, 2023
d6fa46a
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Sep 7, 2023
6c08c13
Add command line argument v to check the version number
byeonggiljun Sep 7, 2023
19969eb
Minor fix
byeonggiljun Sep 7, 2023
79eef2a
Initialized ndt queue in the same manner with the event queue
byeonggiljun Sep 10, 2023
2ebc563
Add supports for ndt_queue
byeonggiljun Sep 12, 2023
3d4c114
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Sep 19, 2023
052eab0
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Oct 2, 2023
5939d5a
Add the NDT option in the RTI
byeonggiljun Oct 3, 2023
53a6f38
Initialzie ndt_q properly
byeonggiljun Oct 3, 2023
d8e179b
Copy the ndt_q support code into pqueue.h
byeonggiljun Oct 3, 2023
95e0e79
Fixing the reference issues
byeonggiljun Oct 10, 2023
f9bfa9e
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Oct 10, 2023
8b0ccc3
Update lingua-franca-ref.txt
byeonggiljun Oct 10, 2023
b4c4aff
Check emptiness of ndt_q before processing it
byeonggiljun Oct 10, 2023
afe8cc9
Add debuging messages
byeonggiljun Oct 14, 2023
f2e4329
Eliminate unnecessary port absent messages
byeonggiljun Oct 14, 2023
8a5d11a
Remove unnecessary NET messages
byeonggiljun Oct 15, 2023
8b63d24
Remove unnecessary LTC messages
byeonggiljun Oct 15, 2023
f9e0777
Add tracepoints for NDT
byeonggiljun Oct 15, 2023
693db2e
Send NET messages when downstream federates need it
byeonggiljun Oct 15, 2023
29728e7
When a sender sends a tagged message, insert the tag to ndt_q to send…
byeonggiljun Oct 17, 2023
51f42dc
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Oct 17, 2023
1a77403
Minor fix
byeonggiljun Oct 20, 2023
b97fd91
Disable NDT when there is a cycle between federates
byeonggiljun Oct 24, 2023
5ca3de0
Remove the wrong cycle detection code and add FIXME to solve it in th…
byeonggiljun Oct 24, 2023
e67988a
Temporarily disable the use of NDT for port absent messages
byeonggiljun Oct 24, 2023
2667c1c
Minor fix
byeonggiljun Nov 1, 2023
057ac08
Instead of sending NET as the response of NDT, insert the current tag…
byeonggiljun Nov 1, 2023
20c5a8a
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Nov 1, 2023
d4f4c95
Detect a cycle and disable the NDT
byeonggiljun Nov 5, 2023
f590af6
Don't insert NDT to ndt_q when send tagged messages if it haven't rec…
byeonggiljun Nov 5, 2023
625d2b8
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Nov 5, 2023
8b76705
Check whether each enclave is in a cylce when the RTI first construct…
byeonggiljun Nov 6, 2023
915c4d6
Clean the array visited before reusing it
byeonggiljun Nov 6, 2023
ce6ba0f
Try to remove port absent messages again
byeonggiljun Nov 6, 2023
3faa188
Fix the function `tag_matches` based on @Edwardalee's comment
byeonggiljun Nov 12, 2023
ddfef14
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Nov 12, 2023
ffa5efb
Add functions for not sending NDTs to an upstream federate of a downs…
byeonggiljun Nov 13, 2023
9ff34a9
Resolve conflicts with enclaves2
byeonggiljun Nov 14, 2023
41074e5
Update lingua-franca-ref.txt
byeonggiljun Nov 14, 2023
3f3b601
Merge 'pqueue-refactoring' into rti-NET-forwarding
byeonggiljun Nov 15, 2023
a0eb282
Merge branch 'pqueue-refactoring' into rti-NET-forwarding
byeonggiljun Nov 15, 2023
356cdba
Do not insert the duplicate tag into the ndt_q
byeonggiljun Nov 15, 2023
720ec56
Fix Typo
byeonggiljun Nov 15, 2023
7cc31ba
Merge branch 'pqueue-refactoring' into rti-NET-forwarding
byeonggiljun Nov 19, 2023
6dbfd20
Use the function `pqueue_tag_insert_if_no_match` when inserting a tag
byeonggiljun Nov 19, 2023
20151f2
Free an ndt queue at the end of the program
byeonggiljun Nov 19, 2023
7a951d8
Inform the RTI whether a federate has a physical action
byeonggiljun Nov 20, 2023
83e0009
Update lingua-franca-ref.txt
byeonggiljun Nov 20, 2023
e337d38
Listen for and handle set up messages more flexibly
byeonggiljun Nov 26, 2023
db9db08
Make the NDT things optional in the RTI and federates
byeonggiljun Nov 27, 2023
65e30da
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Nov 27, 2023
7749df4
Merge branch 'main' into rti-NET-forwarding
byeonggiljun Dec 4, 2023
7a02876
Remove the duplicate cycle detection function
byeonggiljun Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,19 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED_NDT_ENABLED
// FIXME: Create a queue saving tags instead of events. For now, ndt_q stores
// dummy events.
env->ndt_q = pqueue_tag_init(10);
#endif // FEDERATED_NDT_ENABLED
#ifdef FEDERATED_DECENTRALIZED
env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory");
env->_lf_intended_tag_fields_size = num_is_present_fields;
#endif
}

void environment_init_tags( environment_t *env, instant_t start_time, interval_t duration) {
void environment_init_tags(environment_t *env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
Expand Down Expand Up @@ -165,6 +170,9 @@ void environment_free(environment_t* env) {
pqueue_free(env->event_q);
pqueue_free(env->recycle_q);
pqueue_free(env->next_q);
#ifdef FEDERATED_NDT_ENABLED
pqueue_tag_free(env->ndt_q);
#endif // FEDERATED_NDT_ENABLED

environment_free_threaded(env);
environment_free_single_threaded(env);
Expand Down
11 changes: 9 additions & 2 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ void usage(int argc, const char* argv[]) {
lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n");
lf_print(" -a, --auth Turn on HMAC authentication options.\n");
lf_print(" -t, --tracing Turn on tracing.\n");
lf_print(" -v, --version The minimum required version of Lingua Franca.");
lf_print(" --ndt Turn on ndt optimization.\n");

lf_print("Command given:");
for (int i = 0; i < argc; i++) {
Expand Down Expand Up @@ -171,7 +173,10 @@ int process_clock_sync_args(int argc, const char* argv[]) {

int process_args(int argc, const char* argv[]) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) {
if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--version") == 0) {
lf_print("%s", version_info);
return 0;
} else if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) {
if (argc < i + 2) {
lf_print_error("--id needs a string argument.");
usage(argc, argv);
Expand Down Expand Up @@ -232,14 +237,16 @@ int process_args(int argc, const char* argv[]) {
rti.authentication_enabled = true;
} else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) {
rti.base.tracing_enabled = true;
} else if (strcmp(argv[i], "--ndt") == 0) {
rti.ndt_enabled = true;
} else if (strcmp(argv[i], " ") == 0) {
// Tolerate spaces
continue;
} else {
lf_print_error("Unrecognized command-line argument: %s", argv[i]);
usage(argc, argv);
return 0;
}
}
}
if (rti.base.number_of_scheduling_nodes == 0) {
lf_print_error("--number_of_federates needs a valid positive integer argument.");
Expand Down
21 changes: 21 additions & 0 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) {
e->downstream = NULL;
e->num_downstream = 0;
e->mode = REALTIME;
e->has_physical_action = false;
e->enable_ndt = false;
invalidate_min_delays_upstream(e);
}

Expand Down Expand Up @@ -298,6 +300,25 @@ static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_
}
}

bool check_physical_action_of_transitive_downstreams(scheduling_node_t* e, bool visited[]) {
if (visited[e->id] || e->state == NOT_CONNECTED) {
return false;
}

visited[e->id] = true;

for (int i = 0; i < e->num_downstream; i++) {
if (check_physical_action_of_transitive_downstreams(rti_common->scheduling_nodes[e->downstream[i]], visited)) {
return true;
}
}
if (e->has_physical_action) {
return true;
} else {
return false;
}
}

void update_min_delays_upstream(scheduling_node_t* node) {
// Check whether cached result is valid.
if (node->min_delays == NULL) {
Expand Down
7 changes: 7 additions & 0 deletions core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ typedef struct scheduling_node_t {
int* downstream; // Array of downstream scheduling node ids.
int num_downstream; // Size of the array of downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
bool is_in_cycle;
bool has_physical_action;
bool enable_ndt;
minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node.
size_t num_min_delays; // Size of min_delays array.
int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
Expand Down Expand Up @@ -273,5 +276,9 @@ void invalidate_min_delays_upstream(scheduling_node_t* node);
*/
void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes);

bool check_cycle(scheduling_node_t* e, int target_id, bool visited[]);

bool check_physical_action_of_transitive_downstreams(scheduling_node_t* e, bool visited[]);

#endif // RTI_COMMON_H
#endif // STANDALONE_RTI || LF_ENCLAVES
Loading