Skip to content

Commit

Permalink
Fix a race condition related to out-of-order execution
Browse files Browse the repository at this point in the history
  • Loading branch information
lsk567 committed Nov 11, 2024
1 parent fc1042f commit 58fcd92
Show file tree
Hide file tree
Showing 4 changed files with 348 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.lflang.target.Target;
import org.lflang.target.property.LoggingProperty;
import org.lflang.target.property.SchedulerProperty;
import org.lflang.target.property.SchedulerProperty.SchedulerOptions;
import org.lflang.target.property.type.LoggingType.LogLevel;
import org.lflang.target.property.type.SchedulerType.Scheduler;
import org.lflang.target.property.type.StaticSchedulerType;
import org.lflang.tests.TestBase;
Expand Down Expand Up @@ -34,6 +36,10 @@ public void runStaticSchedulerTests() {
config,
new SchedulerOptions(Scheduler.STATIC)
.update(StaticSchedulerType.StaticScheduler.LB));
// Keep the logging level at INFO because logs from the long
// running tests (e.g., RaceConditionCheck.lf) could overflow
// the buffer and stall the process.
LoggingProperty.INSTANCE.override(config, LogLevel.INFO);
return true;
},
TestLevel.EXECUTION,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/lib/c/reactor-c
170 changes: 170 additions & 0 deletions test/C/src/static/RaceCondition.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* This program is designed to check if the PretVM runtime
* correctly handle synchronization between workers.
*
* A problem was discovered after running ScheduleTest.lf
* for 1000 times on macOS M3:
* for about 1~20 out 1000 runs, the final results do not add up
* to the expected value. The auxiliary variables also exhibit strange
* behaviors: in principle, check_sum = cnt_1 + cnt_3. In practice,
* there are occasional times when check_sum == cnt_1 + cnt_3 - 1, and
* then one time when check_sum = cnt_1 + cnt_3 + 1.
*
* However, there seems to be no problems with the schedule: there are
* no simultaneous reaction invocations within the same reactor, and the
* schedule contains all the correct dependencies. *
* Upon closer inspection, this appears to be a problem of out-of-order
* execution. Either the compiler or the CPU decides to execute the ADDI
* that increments a worker's progress counter before the reaction
* invocation is fully completed. To prevent this issue, the current
* PretVM runtime uses a full memory barrier (__sync_synchronize();)
* at the end of the EXE implementation to ensure that the ADDI does not
* execute before the completion of reaction bodies or auxiliary
* functions.
*
* @author Shaokai Lin
* @author Erling Jellum
*/
target C {
scheduler: {
type: STATIC,
mapper: LB,
},
workers: 2,
/**
* The timeout needs to be large enough for the error to appear
* consistently in one go, typically (logical) 2000 sec.
*/
timeout: 5000 sec,
fast: true,
/**
* Leaving build-type unset giving the best chance of seeing the bug.
* The default build-type in LF seems to be RelWithDebInfo.
*/
// build-type: RELEASE,
/* The bug disappears after turning this on. */
// logging: Debug,
}

preamble {=
#define EXPECTED 1000002
=}

reactor Source(id : int = 0) {
output out: int
output out2: int
timer t(0 msec, 10 msec)
state s: int = 1

@wcet("3 ms")
reaction(t) -> out, out2 {=
lf_set(out, self->s);
lf_set(out2, self->s);
// lf_print("[Source %d reaction_2] Inside source reaction_1", self->id);
=}
}

reactor Sink(id : int = 0) {
input in: int
input in2: int
// timer t(1 nsec, 10 msec)
timer t(0, 10 msec)
state sum: int = 0
state check_sum: int = 0
state cnt_1: int = 0
state cnt_3: int = 0
state running: bool = false

@wcet("500 us")
reaction(in) {=
/* Check if the input is delivered properly. */
// if (in->value != 1) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 1] Input != 1");
// exit(1);
// }
/* Check if there are simultaneous reaction invocations within the same reactor. */
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 1]");
// exit(1);
// }
// self->running=true;

self->sum += in->value;
self->cnt_1++;

// self->running=false;
=}

@wcet("500 us")
reaction(t) {=
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 2]");
// exit(1);
// }
// self->running=true;

self->check_sum += 1;
if (self->check_sum != self->sum) {
fprintf(stderr, "ERROR: [Sink %d, reaction 2] self->check_sum: %d, self->sum: %d, self->cnt_1: %d, self->cnt_3: %d\n", self->id, self->check_sum, self->sum, self->cnt_1, self->cnt_3);
exit(1);
}

// self->running=false;
=}

@wcet("500 us")
reaction(in2) {=
// if (in2->value != 1) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 1] Input != 1");
// exit(1);
// }
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 3]");
// exit(1);
// }
// self->running=true;

self->sum += in2->value;
self->cnt_3++;

// self->running=false;
=}

@wcet("500 us")
reaction(t) {=
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 4]");
// exit(1);
// }
// self->running=true;

self->check_sum += 1;
if (self->check_sum != self->sum) {
fprintf(stderr, "ERROR: [Sink %d, reaction 4] self->check_sum: %d, self->sum: %d, self->cnt_1: %d, self->cnt_3: %d\n", self->id, self->check_sum, self->sum, self->cnt_1, self->cnt_3);
exit(1);
}

// self->running=false;
=}

@wcet("500 us")
reaction(shutdown) {=
if (self->sum != EXPECTED) {
fprintf(stderr, "ERROR: [Sink reaction_5] FAILURE: Expected %d, Received %d\n", EXPECTED, self->sum);
exit(1);
} else {
lf_print("Successfully received %d", self->sum);
}
=}
}

main reactor {
source = new Source(id=1)
source2 = new Source(id=2)
sink = new Sink(id=1)
sink2 = new Sink(id=2)
source.out -> sink.in
source2.out -> sink.in2
source.out2 -> sink2.in
source2.out2 -> sink2.in2
}
171 changes: 171 additions & 0 deletions test/C/src/static/RaceCondition2.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* This program is designed to check if the PretVM runtime
* correctly handle synchronization between workers.
*
* This version tests connection buffers and auxiliary functions.
*
* A problem was discovered after running ScheduleTest.lf
* for 1000 times on macOS M3:
* for about 1~20 out 1000 runs, the final results do not add up
* to the expected value. The auxiliary variables also exhibit strange
* behaviors: in principle, check_sum = cnt_1 + cnt_3. In practice,
* there are occasional times when check_sum == cnt_1 + cnt_3 - 1, and
* then one time when check_sum = cnt_1 + cnt_3 + 1.
*
* However, there seems to be no problems with the schedule: there are
* no simultaneous reaction invocations within the same reactor, and the
* schedule contains all the correct dependencies. *
* Upon closer inspection, this appears to be a problem of out-of-order
* execution. Either the compiler or the CPU decides to execute the ADDI
* that increments a worker's progress counter before the reaction
* invocation is fully completed. To prevent this issue, the current
* PretVM runtime uses a full memory barrier (__sync_synchronize();)
* at the end of the EXE implementation to ensure that the ADDI does not
* execute before the completion of reaction bodies or auxiliary
* functions.
*
* @author Shaokai Lin
* @author Erling Jellum
*/
target C {
scheduler: {
type: STATIC,
mapper: LB,
},
workers: 2,
/**
* The timeout needs to be large enough for the error to appear
* consistently in one go, typically (logical) 2000 sec.
*/
timeout: 5000 sec,
fast: true,
/**
* Leaving build-type unset giving the best chance of seeing the bug.
* The default build-type in LF seems to be RelWithDebInfo.
*/
// build-type: RELEASE,
/* The bug disappears after turning this on. */
// logging: Debug,
}

preamble {=
#define EXPECTED 1000000
=}

reactor Source(id : int = 0) {
output out: int
output out2: int
timer t(0 msec, 10 msec)
state s: int = 1

@wcet("3 ms")
reaction(t) -> out, out2 {=
lf_set(out, self->s);
lf_set(out2, self->s);
// lf_print("[Source %d reaction_2] Inside source reaction_1", self->id);
=}
}

reactor Sink(id : int = 0) {
input in: int
input in2: int
timer t(1 nsec, 10 msec)
state sum: int = 0
state check_sum: int = 0
state cnt_1: int = 0
state cnt_3: int = 0
state running: bool = false

@wcet("500 us")
reaction(in) {=
/* Check if the input is delivered properly. */
// if (in->value != 1) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 1] Input != 1");
// exit(1);
// }
/* Check if there are simultaneous reaction invocations within the same reactor. */
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 1]");
// exit(1);
// }
// self->running=true;

self->sum += in->value;
self->cnt_1++;

// self->running=false;
=}

@wcet("500 us")
reaction(t) {=
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 2]");
// exit(1);
// }
// self->running=true;

self->check_sum += 1;
if (self->check_sum != self->sum) {
fprintf(stderr, "ERROR: [Sink %d, reaction 2] self->check_sum: %d, self->sum: %d, self->cnt_1: %d, self->cnt_3: %d\n", self->id, self->check_sum, self->sum, self->cnt_1, self->cnt_3);
exit(1);
}

// self->running=false;
=}

@wcet("500 us")
reaction(in2) {=
// if (in2->value != 1) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 1] Input != 1");
// exit(1);
// }
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 3]");
// exit(1);
// }
// self->running=true;

self->sum += in2->value;
self->cnt_3++;

// self->running=false;
=}

@wcet("500 us")
reaction(t) {=
// if(self->running) {
// fprintf(stderr, "URGENT: [Sink %d, reaction 4]");
// exit(1);
// }
// self->running=true;

self->check_sum += 1;
if (self->check_sum != self->sum) {
fprintf(stderr, "ERROR: [Sink %d, reaction 4] self->check_sum: %d, self->sum: %d, self->cnt_1: %d, self->cnt_3: %d\n", self->id, self->check_sum, self->sum, self->cnt_1, self->cnt_3);
exit(1);
}

// self->running=false;
=}

@wcet("500 us")
reaction(shutdown) {=
if (self->sum != EXPECTED) {
fprintf(stderr, "ERROR: [Sink reaction_5] FAILURE: Expected %d, Received %d\n", EXPECTED, self->sum);
exit(1);
} else {
lf_print("Successfully received %d", self->sum);
}
=}
}

main reactor {
source = new Source(id=1)
source2 = new Source(id=2)
sink = new Sink(id=1)
sink2 = new Sink(id=2)
source.out -> sink.in after 1 nsec
source2.out -> sink.in2 after 1 nsec
source.out2 -> sink2.in after 1 nsec
source2.out2 -> sink2.in2 after 1 nsec
}

0 comments on commit 58fcd92

Please sign in to comment.