diff --git a/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java b/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java
index 0200f98183..cbab653e3b 100644
--- a/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java
+++ b/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java
@@ -37,6 +37,7 @@
import org.lflang.tests.runtime.CTest;
import org.lflang.tests.runtime.CppTest;
import org.lflang.tests.runtime.PythonTest;
+import org.lflang.tests.runtime.RustRtiTest;
import org.lflang.tests.runtime.RustTest;
import org.lflang.tests.runtime.TypeScriptTest;
@@ -90,6 +91,8 @@ private static Class extends TestBase> getTestInstance(Target target) {
return PythonTest.class;
case Rust:
return RustTest.class;
+ case RustRti:
+ return RustRtiTest.class;
default:
throw new IllegalArgumentException();
}
diff --git a/core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java b/core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java
new file mode 100644
index 0000000000..05ce13845d
--- /dev/null
+++ b/core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java
@@ -0,0 +1,41 @@
+package org.lflang.tests;
+
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.lflang.target.Target;
+import org.lflang.tests.TestRegistry.TestCategory;
+
+/**
+ * A collection of JUnit tests to perform on a given set of targets.
+ *
+ * @author Marten Lohstroh
+ * @author Chanhee Lee
+ */
+public abstract class SimplifiedRuntimeTest extends TestBase {
+
+ /**
+ * Construct a test instance that runs tests for a single target.
+ *
+ * @param target The target to run tests for.
+ */
+ protected SimplifiedRuntimeTest(Target target) {
+ super(target);
+ }
+
+ /** Whether to enable {@link #runFederatedTests()}. */
+ protected boolean supportsFederatedExecution() {
+ return false;
+ }
+
+ @Test
+ public void runFederatedTestsWithRustRti() {
+ Assumptions.assumeTrue(supportsFederatedExecution(), Message.NO_FEDERATION_SUPPORT);
+ runTestsForTargetsWithRustRti(
+ Message.DESC_FEDERATED_WITH_RUST_RTI,
+ TestCategory.FEDERATED::equals,
+ Transformers::noChanges,
+ Configurators::noChanges,
+ TestLevel.EXECUTION,
+ false);
+ }
+}
diff --git a/core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java b/core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java
new file mode 100644
index 0000000000..90d31a42bd
--- /dev/null
+++ b/core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java
@@ -0,0 +1,62 @@
+/*************
+ * Copyright (c) 2019-2024, The University of California at Berkeley.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ***************/
+package org.lflang.tests.runtime;
+
+import org.junit.jupiter.api.Test;
+import org.lflang.target.Target;
+import org.lflang.tests.SimplifiedRuntimeTest;
+
+/**
+ * Collection of tests for the C target with Rust RTI.
+ *
+ *
Tests that are implemented in the base class are still overridden so that each test can be
+ * easily invoked individually from IDEs with JUnit support like Eclipse and IntelliJ. This is
+ * typically done by right-clicking on the name of the test method and then clicking "Run".*
+ *
+ * @author Marten Lohstroh
+ * @author Chanhee Lee
+ */
+public class RustRtiTest extends SimplifiedRuntimeTest {
+
+ public RustRtiTest() {
+ super(Target.RustRti);
+ }
+
+ @Override
+ protected boolean supportsSingleThreadedExecution() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportsFederatedExecution() {
+ return true;
+ }
+
+ @Test
+ @Override
+ public void runFederatedTestsWithRustRti() {
+ super.runFederatedTestsWithRustRti();
+ }
+}
diff --git a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java
index 0acf495fc7..792c9f89a0 100644
--- a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java
+++ b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java
@@ -201,6 +201,97 @@ public boolean doGenerate(Resource resource, LFGeneratorContext context) throws
return false;
}
+ /**
+ * Produce LF code for each federate in a separate file, then invoke a target-specific code
+ * generator for each of those files.
+ *
+ * @param resource The resource that has the federated main reactor in it
+ * @param context The context in which to carry out the code generation.
+ * @return False if no errors have occurred, true otherwise.
+ */
+ public boolean doGenerateForRustRTI(Resource resource, LFGeneratorContext context)
+ throws IOException {
+ if (!federatedExecutionIsSupported(resource)) return true;
+ cleanIfNeeded(context);
+
+ // In a federated execution, we need keepalive to be true,
+ // otherwise a federate could exit simply because it hasn't received
+ // any messages.
+ KeepaliveProperty.INSTANCE.override(targetConfig, true);
+
+ // Process command-line arguments
+ processCLIArguments(context);
+
+ // Find the federated reactor
+ Reactor federation = FedASTUtils.findFederatedReactor(resource);
+
+ // Make sure the RTI host is set correctly.
+ setRTIHost(federation);
+
+ // Create the FederateInstance objects.
+ ReactorInstance main = createFederateInstances(federation, context);
+
+ // Insert reactors that split multiports into many ports.
+ insertIndexers(main, resource);
+
+ // Clear banks so that each bank member becomes a single federate.
+ for (Instantiation instantiation : ASTUtils.allInstantiations(federation)) {
+ instantiation.setWidthSpec(null);
+ instantiation.setWidthSpec(null);
+ }
+
+ // Find all the connections between federates.
+ // For each connection between federates, replace it in the
+ // AST with an action (which inherits the delay) and three reactions.
+ // The action will be physical for physical connections and logical
+ // for logical connections.
+ replaceFederateConnectionsWithProxies(federation, main, resource);
+
+ FedEmitter fedEmitter =
+ new FedEmitter(
+ fileConfig,
+ ASTUtils.toDefinition(mainDef.getReactorClass()),
+ messageReporter,
+ rtiConfig);
+
+ // Generate LF code for each federate.
+ Map lf2lfCodeMapMap = new HashMap<>();
+ for (FederateInstance federate : federates) {
+ lf2lfCodeMapMap.putAll(fedEmitter.generateFederate(context, federate, federates.size()));
+ }
+
+ // Do not invoke target code generators if --no-compile flag is used.
+ if (context.getTargetConfig().get(NoCompileProperty.INSTANCE)) {
+ context.finish(Status.GENERATED, lf2lfCodeMapMap);
+ return false;
+ }
+
+ // If the RTI is to be built locally, set up a build environment for it.
+ prepareRtiBuildEnvironment(context);
+
+ Map codeMapMap =
+ compileFederates(
+ context,
+ lf2lfCodeMapMap,
+ subContexts -> {
+ createDockerFiles(context, subContexts);
+ generateLaunchScriptForRustRti();
+ // If an error has occurred during codegen of any federate, report it.
+ subContexts.forEach(
+ c -> {
+ if (c.getErrorReporter().getErrorsOccurred()) {
+ context
+ .getErrorReporter()
+ .at(c.getFileConfig().srcFile)
+ .error("Failure during code generation of " + c.getFileConfig().srcFile);
+ }
+ });
+ });
+
+ context.finish(Status.COMPILED, codeMapMap);
+ return false;
+ }
+
/**
* Prepare a build environment for the rti alongside the generated sources of the federates.
*
@@ -229,6 +320,11 @@ private void generateLaunchScript() {
.doGenerate(federates, rtiConfig);
}
+ private void generateLaunchScriptForRustRti() {
+ new FedLauncherGenerator(this.targetConfig, this.fileConfig, this.messageReporter)
+ .doGenerateForRustRTI(federates, new RtiConfig());
+ }
+
/**
* Generate a Dockerfile for each federate and a docker-compose.yml for the federation.
*
diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java
index 67cc8085f0..765a3504d2 100644
--- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java
+++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java
@@ -265,6 +265,169 @@ public void doGenerate(List federates, RtiConfig rtiConfig) {
}
}
+ /**
+ * Create the launcher shell scripts. This will create one or two files in the output path (bin
+ * directory). The first has name equal to the filename of the source file without the ".lf"
+ * extension. This will be a shell script that launches the RTI and the federates. If, in
+ * addition, either the RTI or any federate is mapped to a particular machine (anything other than
+ * the default "localhost" or "0.0.0.0"), then this will generate a shell script in the bin
+ * directory with name filename_distribute.sh that copies the relevant source files to the remote
+ * host and compiles them so that they are ready to execute using the launcher.
+ *
+ * A precondition for this to work is that the user invoking this code generator can log into
+ * the remote host without supplying a password. Specifically, you have to have installed your
+ * public key (typically found in ~/.ssh/id_rsa.pub) in ~/.ssh/authorized_keys on the remote host.
+ * In addition, the remote host must be running an ssh service. On an Arch Linux system using
+ * systemd, for example, this means running:
+ *
+ *
sudo systemctl ssh.service
+ *
+ * Enable means to always start the service at startup, whereas start means to just start it
+ * this once.
+ *
+ * @param federates A list of federate instances in the federation
+ * @param rtiConfig Can have values for 'host', 'dir', and 'user'
+ */
+ public void doGenerateForRustRTI(List federates, RtiConfig rtiConfig) {
+ // NOTE: It might be good to use screen when invoking the RTI
+ // or federates remotely, so you can detach and the process keeps running.
+ // However, I was unable to get it working properly.
+ // What this means is that the shell that invokes the launcher
+ // needs to remain live for the duration of the federation.
+ // If that shell is killed, the federation will die.
+ // Hence, it is reasonable to launch the federation on a
+ // machine that participates in the federation, for example,
+ // on the machine that runs the RTI. The command I tried
+ // to get screen to work looks like this:
+ // ssh -t «target» cd «path»; screen -S «filename»_«federate.name» -L
+ // bin/«filename»_«federate.name» 2>&1
+ // var outPath = binGenPath
+ StringBuilder shCode = new StringBuilder();
+ StringBuilder distCode = new StringBuilder();
+ shCode.append(getSetupCode()).append("\n");
+ String distHeader = getDistHeader();
+ String host = rtiConfig.getHost();
+ String target = host;
+
+ String user = rtiConfig.getUser();
+ if (user != null) {
+ target = user + "@" + host;
+ }
+
+ shCode.append("#### Host is ").append(host);
+
+ // Launch the RTI in the foreground.
+ if (host.equals("localhost") || host.equals("0.0.0.0")) {
+ // FIXME: the paths below will not work on Windows
+ shCode.append(getLaunchCodeForRustRti(Integer.toString(federates.size()))).append("\n");
+ } else {
+ // Start the RTI on the remote machine - Not supported yet for Rust RTI.
+ }
+
+ // Index used for storing pids of federates
+ int federateIndex = 0;
+ for (FederateInstance federate : federates) {
+ var buildConfig = getBuildConfig(federate, fileConfig, messageReporter);
+ if (federate.isRemote) {
+ if (distCode.isEmpty()) distCode.append(distHeader).append("\n");
+ distCode.append(getDistCode(rtiConfig.getDirectory(), federate)).append("\n");
+ shCode
+ .append(getFedRemoteLaunchCode(rtiConfig.getDirectory(), federate, federateIndex++))
+ .append("\n");
+ } else {
+ String executeCommand = buildConfig.localExecuteCommand();
+ shCode
+ .append(getFedLocalLaunchCode(federate, executeCommand, federateIndex++))
+ .append("\n");
+ }
+ }
+ if (host.equals("localhost") || host.equals("0.0.0.0")) {
+ // Local PID managements
+ shCode.append(
+ "echo \"#### Bringing the RTI back to foreground so it can receive Control-C.\"" + "\n");
+ shCode.append("fg %1" + "\n");
+ }
+ // Wait for launched processes to finish
+ shCode
+ .append(
+ String.join(
+ "\n",
+ "echo \"RTI has exited. Wait for federates to exit.\"",
+ "# Wait for launched processes to finish.",
+ "# The errors are handled separately via trap.",
+ "for pid in \"${pids[@]}\"",
+ "do",
+ " wait $pid || exit $?",
+ "done",
+ "echo \"All done.\"",
+ "EXITED_SUCCESSFULLY=true"))
+ .append("\n");
+
+ // Create bin directory for the script.
+ if (!Files.exists(fileConfig.binPath)) {
+ try {
+ Files.createDirectories(fileConfig.binPath);
+ } catch (IOException e) {
+ messageReporter.nowhere().error("Unable to create directory: " + fileConfig.binPath);
+ }
+ }
+
+ // Write the launcher file.
+ File file = fileConfig.binPath.resolve(fileConfig.name).toFile();
+ messageReporter.nowhere().info("Script for launching the federation: " + file);
+
+ // Delete file previously produced, if any.
+ if (file.exists()) {
+ if (!file.delete())
+ messageReporter
+ .nowhere()
+ .error("Failed to delete existing federated launch script \"" + file + "\"");
+ }
+
+ FileOutputStream fOut = null;
+ try {
+ fOut = new FileOutputStream(file);
+ } catch (FileNotFoundException e) {
+ messageReporter.nowhere().error("Unable to find file: " + file);
+ }
+ if (fOut != null) {
+ try {
+ fOut.write(shCode.toString().getBytes());
+ fOut.close();
+ } catch (IOException e) {
+ messageReporter.nowhere().error("Unable to write to file: " + file);
+ }
+ }
+
+ if (!file.setExecutable(true, false)) {
+ messageReporter.nowhere().warning("Unable to make launcher script executable.");
+ }
+
+ // Write the distributor file.
+ // Delete the file even if it does not get generated.
+ file = fileConfig.binPath.resolve(fileConfig.name + "_distribute.sh").toFile();
+ if (file.exists()) {
+ if (!file.delete())
+ messageReporter
+ .nowhere()
+ .error("Failed to delete existing federated distributor script \"" + file + "\"");
+ }
+ if (distCode.length() > 0) {
+ try {
+ fOut = new FileOutputStream(file);
+ fOut.write(distCode.toString().getBytes());
+ fOut.close();
+ if (!file.setExecutable(true, false)) {
+ messageReporter.nowhere().warning("Unable to make file executable: " + file);
+ }
+ } catch (FileNotFoundException e) {
+ messageReporter.nowhere().error("Unable to find file: " + file);
+ } catch (IOException e) {
+ messageReporter.nowhere().error("Unable to write to file " + file);
+ }
+ }
+ }
+
private String getSetupCode() {
return String.join(
"\n",
@@ -377,6 +540,35 @@ private String getLaunchCode(String rtiLaunchCode) {
"sleep 1");
}
+ private String getLaunchCodeForRustRti(String numberOfFederates) {
+ String launchCodeWithoutLogging =
+ new String("cargo run -- -i ${FEDERATION_ID} -n " + numberOfFederates + " -c init &");
+ return String.join(
+ "\n",
+ "echo \"#### Launching the Rust runtime infrastructure (RTI).\"",
+ "# The Rust RTI is started first to allow proper boot-up",
+ "# before federates will try to connect.",
+ "# The RTI will be brought back to foreground",
+ "# to be responsive to user inputs after all federates",
+ "# are launched.",
+ "RUST_RTI_REMOTE_PATHS=`find ~/ -name rti_remote.rs`",
+ "if [ \"${RUST_RTI_REMOTE_PATHS}\" = \"\" ]; then",
+ " git clone https://github.com/hokeun/lf-rust-rti.git",
+ " cd lf-rust-rti/rust/rti",
+ "else",
+ " FIRST_RUST_RTI_REMOTE_PATH=($RUST_RTI_REMOTE_PATHS)",
+ " FIRST_RUST_RTI_PATH=${FIRST_RUST_RTI_REMOTE_PATH[0]%/*}",
+ " cd ${FIRST_RUST_RTI_PATH}; cd ../",
+ "fi",
+ launchCodeWithoutLogging,
+ "# Store the PID of the RTI",
+ "RTI=$!",
+ "# Wait for the RTI to boot up before",
+ "# starting federates (this could be done by waiting for a specific output",
+ "# from the RTI, but here we use sleep)",
+ "sleep 1");
+ }
+
private String getRemoteLaunchCode(
Object host, Object target, String logFileName, String rtiLaunchString) {
return String.join(
@@ -590,7 +782,7 @@ private String getFedLocalLaunchCode(
private BuildConfig getBuildConfig(
FederateInstance federate, FederationFileConfig fileConfig, MessageReporter messageReporter) {
return switch (federate.targetConfig.target) {
- case C, CCPP -> new CBuildConfig(federate, fileConfig, messageReporter);
+ case C, CCPP, RustRti -> new CBuildConfig(federate, fileConfig, messageReporter);
case Python -> new PyBuildConfig(federate, fileConfig, messageReporter);
case TS -> new TsBuildConfig(federate, fileConfig, messageReporter);
case CPP, Rust -> throw new UnsupportedOperationException();
diff --git a/core/src/main/java/org/lflang/generator/LFGenerator.java b/core/src/main/java/org/lflang/generator/LFGenerator.java
index f4e61e93d5..305b08f063 100644
--- a/core/src/main/java/org/lflang/generator/LFGenerator.java
+++ b/core/src/main/java/org/lflang/generator/LFGenerator.java
@@ -56,7 +56,7 @@ public static FileConfig createFileConfig(
}
return switch (target) {
- case CCPP, C -> new CFileConfig(resource, srcGenBasePath, useHierarchicalBin);
+ case CCPP, C, RustRti -> new CFileConfig(resource, srcGenBasePath, useHierarchicalBin);
case Python -> new PyFileConfig(resource, srcGenBasePath, useHierarchicalBin);
case CPP -> new CppFileConfig(resource, srcGenBasePath, useHierarchicalBin);
case Rust -> new RustFileConfig(resource, srcGenBasePath, useHierarchicalBin);
@@ -82,6 +82,7 @@ private GeneratorBase createGenerator(LFGeneratorContext context) {
case CPP -> new CppGenerator(context, scopeProvider);
case TS -> new TSGenerator(context);
case Rust -> new RustGenerator(context, scopeProvider);
+ case RustRti -> new CGenerator(context, true);
};
}
@@ -121,6 +122,42 @@ public void doGenerate(Resource resource, IFileSystemAccess2 fsa, IGeneratorCont
}
}
+ public void doGenerateForRustRTI(
+ Resource resource, IFileSystemAccess2 fsa, IGeneratorContext context) {
+ assert injector != null;
+ final LFGeneratorContext lfContext;
+ if (context instanceof LFGeneratorContext) {
+ lfContext = (LFGeneratorContext) context;
+ } else {
+ lfContext = LFGeneratorContext.lfGeneratorContextOf(resource, fsa, context);
+ }
+
+ // The fastest way to generate code is to not generate any code.
+ if (lfContext.getMode() == LFGeneratorContext.Mode.LSP_FAST) return;
+
+ if (FedASTUtils.findFederatedReactor(resource) != null) {
+ try {
+ FedGenerator fedGenerator = new FedGenerator(lfContext);
+ injector.injectMembers(fedGenerator);
+ generatorErrorsOccurred = fedGenerator.doGenerateForRustRTI(resource, lfContext);
+ } catch (IOException e) {
+ throw new RuntimeIOException("Error during federated code generation", e);
+ }
+
+ } else {
+ final GeneratorBase generator = createGenerator(lfContext);
+
+ if (generator != null) {
+ generator.doGenerate(resource, lfContext);
+ generatorErrorsOccurred = generator.errorsOccurred();
+ }
+ }
+ final MessageReporter messageReporter = lfContext.getErrorReporter();
+ if (messageReporter instanceof LanguageServerMessageReporter) {
+ ((LanguageServerMessageReporter) messageReporter).publishDiagnostics();
+ }
+ }
+
/** Return true if errors occurred in the last call to doGenerate(). */
public boolean errorsOccurred() {
return generatorErrorsOccurred;
diff --git a/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java b/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java
index 203c5f94d7..25bcfef6d2 100644
--- a/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java
+++ b/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java
@@ -69,7 +69,7 @@ public static DockerGenerator dockerGeneratorFactory(LFGeneratorContext context)
case C, CCPP -> new CDockerGenerator(context);
case TS -> new TSDockerGenerator(context);
case Python -> new PythonDockerGenerator(context);
- case CPP, Rust -> throw new IllegalArgumentException(
+ case CPP, Rust, RustRti -> throw new IllegalArgumentException(
"No Docker support for " + target + " yet.");
};
}
diff --git a/core/src/main/java/org/lflang/target/Target.java b/core/src/main/java/org/lflang/target/Target.java
index a01b863303..109d8b211b 100644
--- a/core/src/main/java/org/lflang/target/Target.java
+++ b/core/src/main/java/org/lflang/target/Target.java
@@ -383,6 +383,13 @@ public enum Target {
// In our Rust implementation, the only reserved keywords
// are those that are a valid expression. Others may be escaped
// with the syntax r#keyword.
+ Arrays.asList("self", "true", "false")),
+ RustRti(
+ "RustRti",
+ true,
+ // In our Rust implementation, the only reserved keywords
+ // are those that are a valid expression. Others may be escaped
+ // with the syntax r#keyword.
Arrays.asList("self", "true", "false"));
/** String representation of this target. */
@@ -460,7 +467,7 @@ public boolean isReservedIdent(String ident) {
/** Return true if the target supports federated execution. */
public boolean supportsFederated() {
return switch (this) {
- case C, CCPP, Python, TS -> true;
+ case C, CCPP, Python, TS, RustRti -> true;
default -> false;
};
}
@@ -476,7 +483,7 @@ public boolean supportsInheritance() {
/** Return true if the target supports multiports and banks of reactors. */
public boolean supportsMultiports() {
return switch (this) {
- case C, CCPP, CPP, Python, Rust, TS -> true;
+ case C, CCPP, CPP, Python, Rust, TS, RustRti -> true;
default -> false;
};
}
@@ -501,7 +508,7 @@ public boolean supportsReactionDeclarations() {
public boolean buildsUsingDocker() {
return switch (this) {
case TS -> false;
- case C, CCPP, CPP, Python, Rust -> true;
+ case C, CCPP, CPP, Python, Rust, RustRti -> true;
};
}
@@ -639,7 +646,7 @@ public void initialize(TargetConfig config) {
SingleThreadedProperty.INSTANCE,
TracingProperty.INSTANCE,
WorkersProperty.INSTANCE);
- case Rust -> config.register(
+ case Rust, RustRti -> config.register(
BuildTypeProperty.INSTANCE,
CargoDependenciesProperty.INSTANCE,
CargoFeaturesProperty.INSTANCE,
diff --git a/core/src/main/resources/lib/c/reactor-c b/core/src/main/resources/lib/c/reactor-c
index 188fb3c983..26eb3465c5 160000
--- a/core/src/main/resources/lib/c/reactor-c
+++ b/core/src/main/resources/lib/c/reactor-c
@@ -1 +1 @@
-Subproject commit 188fb3c983aac9ea7bcafa4e5172daaa5f8a883d
+Subproject commit 26eb3465c5ca425a03ded2e042a7b6b35cf0181f
diff --git a/core/src/testFixtures/java/org/lflang/tests/TestBase.java b/core/src/testFixtures/java/org/lflang/tests/TestBase.java
index 6923149f07..bf9a8eb6d1 100644
--- a/core/src/testFixtures/java/org/lflang/tests/TestBase.java
+++ b/core/src/testFixtures/java/org/lflang/tests/TestBase.java
@@ -139,6 +139,7 @@ public static class Message {
public static final String DESC_MULTIPORT = "Run multiport tests.";
public static final String DESC_AS_FEDERATED = "Run non-federated tests in federated mode.";
public static final String DESC_FEDERATED = "Run federated tests.";
+ public static final String DESC_FEDERATED_WITH_RUST_RTI = "Run federated tests with Rust RTI.";
public static final String DESC_DOCKER = "Run docker tests.";
public static final String DESC_DOCKER_FEDERATED = "Run docker federated tests.";
public static final String DESC_ENCLAVE = "Run enclave tests.";
@@ -196,6 +197,36 @@ protected final void runTestsAndPrintResults(
}
}
+ /**
+ * Run selected tests for a given target and configurator up to the specified level.
+ *
+ * @param target The target to run tests for.
+ * @param selected A predicate that given a test category returns whether it should be included in
+ * this test run or not.
+ * @param configurator A procedure for configuring the tests.
+ * @param copy Whether to work on copies of tests in the test. registry.
+ */
+ protected final void runTestsAndPrintResultsWithRustRti(
+ Target target,
+ Predicate selected,
+ TestLevel level,
+ Transformer transformer,
+ Configurator configurator,
+ boolean copy) {
+ var categories = Arrays.stream(TestCategory.values()).filter(selected).toList();
+ for (var category : categories) {
+ System.out.println(category.getHeader());
+ var tests = testRegistry.getRegisteredTests(target, category, copy);
+ try {
+ validateAndRunWithRustRti(tests, transformer, configurator, level);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ System.out.println(testRegistry.getCoverageReport(target, category));
+ checkAndReportFailures(tests);
+ }
+ }
+
/**
* Run tests in the given selection for all targets enabled in this class.
*
@@ -217,6 +248,28 @@ protected void runTestsForTargets(
}
}
+ /**
+ * Run tests in the given selection for all targets enabled in this class.
+ *
+ * @param description A string that describes the collection of tests.
+ * @param selected A predicate that given a test category returns whether it should be included in
+ * this test run or not.
+ * @param configurator A procedure for configuring the tests.
+ * @param copy Whether to work on copies of tests in the test. registry.
+ */
+ protected void runTestsForTargetsWithRustRti(
+ String description,
+ Predicate selected,
+ Transformer transformer,
+ Configurator configurator,
+ TestLevel level,
+ boolean copy) {
+ for (Target target : this.targets) {
+ runTestsForRustRti(
+ List.of(target), description, selected, transformer, configurator, level, copy);
+ }
+ }
+
/**
* Run tests in the given selection for a subset of given targets.
*
@@ -241,6 +294,30 @@ protected void runTestsFor(
}
}
+ /**
+ * Run tests in the given selection for a subset of given targets.
+ *
+ * @param subset The subset of targets to run the selected tests for.
+ * @param description A string that describes the collection of tests.
+ * @param selected A predicate that given a test category returns whether it should be included in
+ * this test run or not.
+ * @param configurator A procedure for configuring the tests.
+ * @param copy Whether to work on copies of tests in the test. registry.
+ */
+ protected void runTestsForRustRti(
+ List subset,
+ String description,
+ Predicate selected,
+ Transformer transformer,
+ Configurator configurator,
+ TestLevel level,
+ boolean copy) {
+ for (Target target : subset) {
+ printTestHeader(target, description);
+ runTestsAndPrintResultsWithRustRti(target, selected, level, transformer, configurator, copy);
+ }
+ }
+
/** Whether to enable threading. */
protected boolean supportsSingleThreadedExecution() {
return false;
@@ -496,6 +573,25 @@ private void generateCode(LFTest test) throws TestError {
}
}
+ /**
+ * Invoke the code generator for the given test.
+ *
+ * @param test The test to generate code for.
+ */
+ private void generateCodeForRustRti(LFTest test) throws TestError {
+ if (test.getFileConfig().resource == null) {
+ test.getContext().finish(GeneratorResult.NOTHING);
+ }
+ try {
+ generator.doGenerateForRustRTI(test.getFileConfig().resource, fileAccess, test.getContext());
+ } catch (Throwable e) {
+ throw new TestError("Code generation unsuccessful.", Result.CODE_GEN_FAIL, e);
+ }
+ if (generator.errorsOccurred()) {
+ throw new TestError("Code generation unsuccessful.", Result.CODE_GEN_FAIL);
+ }
+ }
+
/**
* Given an indexed test, execute it and label the test as failing if it did not execute, took too
* long to execute, or executed but exited with an error code.
@@ -712,4 +808,49 @@ private void validateAndRun(
System.out.print(System.lineSeparator());
}
+
+ /**
+ * Validate and run the given tests, using the specified configuratator and level.
+ *
+ * While performing tests, this method prints a header that reaches completion once all tests
+ * have been run.
+ *
+ * @param tests A set of tests to run.
+ * @param transformer A procedure for transforming the tests.
+ * @param configurator A procedure for configuring the tests.
+ * @param level The level of testing.
+ * @throws IOException If initial file configuration fails
+ */
+ private void validateAndRunWithRustRti(
+ Set tests, Transformer transformer, Configurator configurator, TestLevel level)
+ throws IOException {
+ var done = 1;
+
+ System.out.println(THICK_LINE);
+
+ for (var test : tests) {
+ System.out.println(
+ "Running: " + test.toString() + " (" + (int) (done / (float) tests.size() * 100) + "%)");
+ try {
+ test.redirectOutputs();
+ prepare(test, transformer, configurator);
+ validate(test);
+ generateCodeForRustRti(test);
+ if (level == TestLevel.EXECUTION) {
+ execute(test);
+ }
+ test.markPassed();
+ } catch (TestError e) {
+ test.handleTestError(e);
+ } catch (Throwable e) {
+ test.handleTestError(
+ new TestError("Unknown exception during test execution", Result.TEST_EXCEPTION, e));
+ } finally {
+ test.restoreOutputs();
+ }
+ done++;
+ }
+
+ System.out.print(System.lineSeparator());
+ }
}
diff --git a/test/RustRti/.gitignore b/test/RustRti/.gitignore
new file mode 100644
index 0000000000..08f514ebc5
--- /dev/null
+++ b/test/RustRti/.gitignore
@@ -0,0 +1 @@
+include/
diff --git a/test/RustRti/src/federated/Absent.lf b/test/RustRti/src/federated/Absent.lf
new file mode 100644
index 0000000000..7130210cf3
--- /dev/null
+++ b/test/RustRti/src/federated/Absent.lf
@@ -0,0 +1,46 @@
+target C {
+ tracing: true,
+ timeout: 100 ms
+}
+
+reactor Sender {
+ output out1: int
+ output out2: int
+ timer t(0, 20 ms)
+ state c: int = 1
+
+ reaction(t) -> out1, out2 {=
+ if (self->c % 2 != 0) {
+ lf_set(out1, self->c);
+ } else {
+ lf_set(out2, self->c);
+ }
+ self->c++;
+ =}
+}
+
+reactor Receiver {
+ input in1: int
+ input in2: int
+
+ reaction(in1) {=
+ lf_print("Received %d on in1", in1->value);
+ if (in1->value % 2 == 0) {
+ lf_print_error_and_exit("********* Expected an odd integer!");
+ }
+ =}
+
+ reaction(in2) {=
+ lf_print("Received %d on in2", in2->value);
+ if (in2->value % 2 != 0) {
+ lf_print_error_and_exit("********* Expected an even integer!");
+ }
+ =}
+}
+
+federated reactor(d: time = 1 ms) {
+ s = new Sender()
+ r = new Receiver()
+ s.out1 -> r.in1
+ s.out2 -> r.in2
+}
diff --git a/test/RustRti/src/federated/BroadcastFeedback.lf b/test/RustRti/src/federated/BroadcastFeedback.lf
new file mode 100644
index 0000000000..66a93c275b
--- /dev/null
+++ b/test/RustRti/src/federated/BroadcastFeedback.lf
@@ -0,0 +1,33 @@
+/** This tests an output that is broadcast back to a multiport input of a bank. */
+target C {
+ timeout: 1 sec,
+ build-type: RelWithDebInfo
+}
+
+reactor SenderAndReceiver {
+ output out: int
+ input[2] in: int
+ state received: bool = false
+
+ reaction(startup) -> out {=
+ lf_set(out, 42);
+ =}
+
+ reaction(in) {=
+ if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) {
+ lf_print("SUCCESS");
+ self->received = true;
+ }
+ =}
+
+ reaction(shutdown) {=
+ if (!self->received == true) {
+ lf_print_error_and_exit("Failed to receive broadcast");
+ }
+ =}
+}
+
+federated reactor {
+ s = new[2] SenderAndReceiver()
+ (s.out)+ -> s.in
+}
diff --git a/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf
new file mode 100644
index 0000000000..114e42cfd7
--- /dev/null
+++ b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf
@@ -0,0 +1,40 @@
+/** This tests an output that is broadcast back to a multiport input of a bank. */
+target C {
+ timeout: 1 sec
+}
+
+reactor SenderAndReceiver {
+ output out: int
+ input[2] in: int
+ state received: bool = false
+
+ r = new Receiver()
+ in -> r.in
+
+ reaction(startup) -> out {=
+ lf_set(out, 42);
+ =}
+}
+
+reactor Receiver {
+ input[2] in: int
+ state received: bool = false
+
+ reaction(in) {=
+ if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) {
+ lf_print("SUCCESS");
+ self->received = true;
+ }
+ =}
+
+ reaction(shutdown) {=
+ if (!self->received == true) {
+ lf_print_error_and_exit("Failed to receive broadcast");
+ }
+ =}
+}
+
+federated reactor {
+ s = new[2] SenderAndReceiver()
+ (s.out)+ -> s.in
+}
diff --git a/test/RustRti/src/federated/DistributedBank.lf b/test/RustRti/src/federated/DistributedBank.lf
new file mode 100644
index 0000000000..65a6f871c2
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedBank.lf
@@ -0,0 +1,24 @@
+// Check bank of federates.
+target C {
+ timeout: 1 sec,
+ coordination: centralized
+}
+
+reactor Node(bank_index: int = 0) {
+ timer t(0, 100 msec)
+ state count: int = 0
+
+ reaction(t) {=
+ lf_print("Hello world %d.", self->count++);
+ =}
+
+ reaction(shutdown) {=
+ if (self->count == 0) {
+ lf_print_error_and_exit("Timer reactions did not execute.");
+ }
+ =}
+}
+
+federated reactor DistributedBank {
+ n = new[2] Node()
+}
diff --git a/test/RustRti/src/federated/DistributedBankToMultiport.lf b/test/RustRti/src/federated/DistributedBankToMultiport.lf
new file mode 100644
index 0000000000..d73b0959fd
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedBankToMultiport.lf
@@ -0,0 +1,33 @@
+// Check multiport to bank connections between federates.
+target C {
+ timeout: 3 sec
+}
+
+import Count from "../lib/Count.lf"
+
+reactor Destination {
+ input[2] in: int
+ state count: int = 1
+
+ reaction(in) {=
+ for (int i = 0; i < in_width; i++) {
+ lf_print("Received %d.", in[i]->value);
+ if (self->count != in[i]->value) {
+ lf_print_error_and_exit("Expected %d.", self->count);
+ }
+ }
+ self->count++;
+ =}
+
+ reaction(shutdown) {=
+ if (self->count == 0) {
+ lf_print_error_and_exit("No data received.");
+ }
+ =}
+}
+
+federated reactor {
+ s = new[2] Count()
+ d = new Destination()
+ s.out -> d.in
+}
diff --git a/test/RustRti/src/federated/DistributedDoublePort.lf b/test/RustRti/src/federated/DistributedDoublePort.lf
new file mode 100644
index 0000000000..ec0a6d0b1d
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedDoublePort.lf
@@ -0,0 +1,52 @@
+/**
+ * Test the case for when two upstream federates send messages to a downstream federate on two
+ * different ports. One message should carry a microstep delay relative to the other message.
+ *
+ * @author Soroush Bateni
+ */
+target C {
+ timeout: 900 msec,
+ coordination: centralized
+}
+
+import Count from "../lib/Count.lf"
+
+reactor CountMicrostep {
+ state count: int = 1
+ output out: int
+ logical action act: int
+ timer t(0, 1 sec)
+
+ reaction(t) -> act {=
+ lf_schedule_int(act, 0, self->count++);
+ =}
+
+ reaction(act) -> out {=
+ lf_set(out, act->value);
+ =}
+}
+
+reactor Print {
+ input in: int
+ input in2: int
+
+ reaction(in, in2) {=
+ interval_t elapsed_time = lf_time_logical_elapsed();
+ lf_print("At tag " PRINTF_TAG ", received in = %d and in2 = %d.", elapsed_time, lf_tag().microstep, in->value, in2->value);
+ if (in->is_present && in2->is_present) {
+ lf_print_error_and_exit("ERROR: invalid logical simultaneity.");
+ }
+ =}
+
+ reaction(shutdown) {=
+ lf_print("SUCCESS: messages were at least one microstep apart.");
+ =}
+}
+
+federated reactor DistributedDoublePort {
+ c = new Count()
+ cm = new CountMicrostep()
+ p = new Print()
+ c.out -> p.in // Indicating a 'logical' connection.
+ cm.out -> p.in2
+}
diff --git a/test/RustRti/src/federated/DistributedInterleaved.lf b/test/RustRti/src/federated/DistributedInterleaved.lf
new file mode 100644
index 0000000000..dc212daf17
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedInterleaved.lf
@@ -0,0 +1,44 @@
+// Check multiport to bank connections between federates.
+target C {
+ timeout: 3 sec
+}
+
+reactor Count(offset: time = 0, period: time = 1 sec) {
+ state count: int = 1
+ output[4] out: int
+ timer t(offset, period)
+
+ reaction(t) -> out {=
+ for (int i = 0; i < out_width; i++) {
+ lf_set(out[i], self->count++);
+ }
+ =}
+}
+
+reactor Destination {
+ input[2] in: int
+ state count: int = 0
+
+ reaction(in) {=
+ lf_print("Received %d.", in[0]->value);
+ lf_print("Received %d.", in[1]->value);
+ // Because the connection is interleaved, the difference between the
+ // two inputs should be 2, not 1.
+ if (in[1]->value - in[0]->value != 2) {
+ lf_print_error_and_exit("Expected a difference of two.");
+ }
+ self->count++;
+ =}
+
+ reaction(shutdown) {=
+ if (self->count == 0) {
+ lf_print_error_and_exit("No data received.");
+ }
+ =}
+}
+
+federated reactor {
+ s = new Count()
+ d = new[2] Destination()
+ s.out -> interleaved(d.in)
+}
diff --git a/test/RustRti/src/federated/DistributedLoopedAction.lf b/test/RustRti/src/federated/DistributedLoopedAction.lf
new file mode 100644
index 0000000000..88418f84d1
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedLoopedAction.lf
@@ -0,0 +1,62 @@
+/**
+ * Test a sender-receiver network system that relies on microsteps being taken into account.
+ *
+ * @author Soroush Bateni
+ */
+target C {
+ logging: LOG,
+ timeout: 1 sec
+}
+
+import Sender from "../lib/LoopedActionSender.lf"
+
+reactor Receiver(take_a_break_after: int = 10, break_interval: time = 400 msec) {
+ input in: int
+ state received_messages: int = 0
+ state total_received_messages: int = 0
+ state breaks: int = 0
+ timer t(0, 10 msec) // This will impact the performance
+
+ // but forces the logical time to advance Comment this line for a more sensible log output.
+ reaction(in) {=
+ lf_print("At tag " PRINTF_TAG " received value %d.",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep,
+ in->value);
+ self->total_received_messages++;
+ if (in->value != self->received_messages++) {
+ lf_print_error("Expected %d", self->received_messages - 1);
+ // exit(1);
+ }
+ if (lf_time_logical_elapsed() != self->breaks * self->break_interval) {
+ lf_print_error("Received messages at an incorrect time: " PRINTF_TIME, lf_time_logical_elapsed());
+ // exit(2);
+ }
+
+ if (self->received_messages == self->take_a_break_after) {
+ // Sender is taking a break;
+ self->breaks++;
+ self->received_messages = 0;
+ }
+ =}
+
+ reaction(t) {=
+ // Do nothing
+ =}
+
+ reaction(shutdown) {=
+ if (self->breaks != 3 ||
+ (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after)
+ ) {
+ lf_print_error_and_exit("Did not receive enough messages.");
+ }
+ printf("SUCCESS: Successfully received all messages from the sender.\n");
+ =}
+}
+
+federated reactor DistributedLoopedAction {
+ sender = new Sender()
+ receiver = new Receiver()
+
+ sender.out -> receiver.in
+}
diff --git a/test/RustRti/src/federated/DistributedMultiport.lf b/test/RustRti/src/federated/DistributedMultiport.lf
new file mode 100644
index 0000000000..44a04c4654
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedMultiport.lf
@@ -0,0 +1,48 @@
+// Check multiport connections between federates.
+target C {
+ timeout: 1 sec,
+ coordination: centralized
+}
+
+reactor Source(width: int = 2) {
+ output[width] out: int
+ timer t(0, 100 msec)
+ state count: int = 0
+
+ reaction(t) -> out {=
+ for (int i = 0; i < out_width; i++) {
+ lf_set(out[i], self->count++);
+ }
+ =}
+}
+
+reactor Destination(width: int = 3) {
+ input[width] in: int
+ state count: int = 0
+
+ reaction(in) {=
+ for (int i = 0; i < in_width; i++) {
+ if (in[i]->is_present) {
+ tag_t now = lf_tag();
+ lf_print("Received %d at channel %d at tag " PRINTF_TAG, in[i]->value, i,
+ now.time - lf_time_start(), now.microstep
+ );
+ if (in[i]->value != self->count++) {
+ lf_print_error_and_exit("Expected %d.", self->count - 1);
+ }
+ }
+ }
+ =}
+
+ reaction(shutdown) {=
+ if (self->count == 0) {
+ lf_print_error_and_exit("No data received.");
+ }
+ =}
+}
+
+federated reactor DistributedMultiport {
+ s = new Source(width=4)
+ d = new Destination(width=4)
+ s.out -> d.in
+}
diff --git a/test/RustRti/src/federated/DistributedMultiportToBank.lf b/test/RustRti/src/federated/DistributedMultiportToBank.lf
new file mode 100644
index 0000000000..d8171de51e
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedMultiportToBank.lf
@@ -0,0 +1,41 @@
+// Check multiport to bank connections between federates.
+target C {
+ timeout: 1 sec
+}
+
+reactor Source {
+ output[2] out: int
+ timer t(0, 100 msec)
+ state count: int = 0
+
+ reaction(t) -> out {=
+ for (int i = 0; i < out_width; i++) {
+ lf_set(out[i], self->count);
+ }
+ self->count++;
+ =}
+}
+
+reactor Destination {
+ input in: int
+ state count: int = 0
+
+ reaction(in) {=
+ lf_print("Received %d.", in->value);
+ if (self->count++ != in->value) {
+ lf_print_error_and_exit("Expected %d.", self->count - 1);
+ }
+ =}
+
+ reaction(shutdown) {=
+ if (self->count == 0) {
+ lf_print_error_and_exit("No data received.");
+ }
+ =}
+}
+
+federated reactor DistributedMultiportToBank {
+ s = new Source()
+ d = new[2] Destination()
+ s.out -> d.in
+}
diff --git a/test/RustRti/src/federated/DistributedMultiportToken.lf b/test/RustRti/src/federated/DistributedMultiportToken.lf
new file mode 100644
index 0000000000..547fe651d9
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedMultiportToken.lf
@@ -0,0 +1,46 @@
+// Check multiport connections between federates where the message is carried by a Token (in this
+// case, with an array of char).
+target C {
+ timeout: 1 sec,
+ coordination: centralized
+}
+
+reactor Source {
+ output[4] out: char*
+ timer t(0, 200 msec)
+ state count: int = 0
+
+ reaction(t) -> out {=
+ for (int i = 0; i < out_width; i++) {
+ // With NULL, 0 arguments, snprintf tells us how many bytes are needed.
+ // Add one for the null terminator.
+ int length = snprintf(NULL, 0, "Hello %d", self->count) + 1;
+ // Dynamically allocate memory for the output.
+ SET_NEW_ARRAY(out[i], length);
+ // Populate the output string and increment the count.
+ snprintf(out[i]->value, length, "Hello %d", self->count++);
+ lf_print("MessageGenerator: At time " PRINTF_TIME ", send message: %s.",
+ lf_time_logical_elapsed(),
+ out[i]->value
+ );
+ }
+ =}
+}
+
+reactor Destination {
+ input[4] in: char*
+
+ reaction(in) {=
+ for (int i = 0; i < in_width; i++) {
+ if (in[i]->is_present) {
+ lf_print("Received %s.", in[i]->value);
+ }
+ }
+ =}
+}
+
+federated reactor DistributedMultiportToken {
+ s = new Source()
+ d = new Destination()
+ s.out -> d.in
+}
diff --git a/test/RustRti/src/federated/DistributedNetworkOrder.lf b/test/RustRti/src/federated/DistributedNetworkOrder.lf
new file mode 100644
index 0000000000..b1413c11b1
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedNetworkOrder.lf
@@ -0,0 +1,75 @@
+/**
+ * This is a test for send_timed_message, which is an internal API.
+ *
+ * This test sends a second message at time 5 msec that has the same intended tag as a message that
+ * it had previously sent at time 0 msec. This results in a warning, but the message microstep is
+ * incremented and correctly received one microstep later.
+ *
+ * @author Soroush Bateni
+ */
+target C {
+ timeout: 1 sec,
+ build-type: RelWithDebInfo // Release with debug info
+}
+
+preamble {=
+ #ifdef __cplusplus
+ extern "C" {
+ #endif
+ #include "federate.h"
+ #ifdef __cplusplus
+ }
+ #endif
+=}
+
+reactor Sender {
+ output out: int
+ timer t(0, 1 msec)
+
+ reaction(t) -> out {=
+ int payload = 1;
+ if (lf_time_logical_elapsed() == 0LL) {
+ lf_send_tagged_message(self->base.environment, MSEC(10), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int),
+ (unsigned char*)&payload);
+ } else if (lf_time_logical_elapsed() == MSEC(5)) {
+ payload = 2;
+ lf_send_tagged_message(self->base.environment, MSEC(5), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int),
+ (unsigned char*)&payload);
+ }
+ =}
+}
+
+reactor Receiver {
+ input in: int
+ state success: int = 0
+
+ reaction(in) {=
+ tag_t current_tag = lf_tag();
+ if (current_tag.time == (lf_time_start() + MSEC(10))) {
+ if (current_tag.microstep == 0 && in->value == 1) {
+ self->success++;
+ } else if (current_tag.microstep == 1 && in->value == 2) {
+ self->success++;
+ }
+ }
+ printf("Received %d at tag " PRINTF_TAG ".\n",
+ in->value,
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ =}
+
+ reaction(shutdown) {=
+ if (self->success != 2) {
+ fprintf(stderr, "ERROR: Failed to receive messages.\n");
+ exit(1);
+ }
+ printf("SUCCESS.\n");
+ =}
+}
+
+federated reactor DistributedNetworkOrder {
+ sender = new Sender()
+ receiver = new Receiver()
+
+ sender.out -> receiver.in
+}
diff --git a/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf
new file mode 100644
index 0000000000..3a85c9b3d1
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf
@@ -0,0 +1,60 @@
+/**
+ * Test that a rapidly produced physical action in an upstream federate can be properly handled in
+ * federated execution.
+ */
+target C {
+ timeout: 10 secs,
+ coordination-options: {
+ advance-message-interval: 30 msec
+ }
+}
+
+import PassThrough from "../lib/PassThrough.lf"
+import TestCount from "../lib/TestCount.lf"
+
+preamble {=
+ extern int _counter;
+ void callback(void *a);
+ void* take_time(void* a);
+=}
+
+reactor WithPhysicalAction {
+ preamble {=
+ int _counter = 1;
+ void callback(void *a) {
+ lf_schedule_int(a, 0, _counter++);
+ }
+ // Simulate time passing before a callback occurs.
+ void* take_time(void* a) {
+ while (_counter < 15) {
+ instant_t sleep_time = MSEC(10);
+ lf_sleep(sleep_time);
+ callback(a);
+ }
+ return NULL;
+ }
+ =}
+
+ output out: int
+ state thread_id: lf_thread_t = 0
+ physical action act(0): int
+
+ reaction(startup) -> act {=
+ // start new thread, provide callback
+ lf_thread_create(&self->thread_id, &take_time, act);
+ =}
+
+ reaction(act) -> out {=
+ lf_set(out, act->value);
+ =}
+}
+
+federated reactor {
+ a = new WithPhysicalAction()
+ m1 = new PassThrough()
+ m2 = new PassThrough()
+ test = new TestCount(num_inputs=14)
+ a.out -> m1.in
+ m1.out -> m2.in
+ m2.out -> test.in
+}
diff --git a/test/RustRti/src/federated/DistributedStopZero.lf b/test/RustRti/src/federated/DistributedStopZero.lf
new file mode 100644
index 0000000000..876bd6f7f4
--- /dev/null
+++ b/test/RustRti/src/federated/DistributedStopZero.lf
@@ -0,0 +1,84 @@
+/**
+ * Test for lf_request_stop() in federated execution with centralized coordination at tag (0,0).
+ *
+ * @author Soroush Bateni
+ */
+target C
+
+reactor Sender {
+ output out: int
+ timer t(0, 1 usec)
+
+ reaction(t) -> out {=
+ printf("Sending 42 at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ lf_set(out, 42);
+
+ tag_t zero = (tag_t) { .time = lf_time_start(), .microstep = 0u };
+ if (lf_tag_compare(lf_tag(), zero) == 0) {
+ // Request stop at (0,0)
+ printf("Requesting stop at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ lf_request_stop();
+ }
+ =}
+
+ reaction(shutdown) {=
+ if (lf_time_logical_elapsed() != USEC(0) ||
+ lf_tag().microstep != 1) {
+ fprintf(stderr, "ERROR: Sender failed to stop the federation in time. "
+ "Stopping at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ exit(1);
+ }
+ printf("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ =}
+}
+
+reactor Receiver {
+ input in: int
+
+ reaction(in) {=
+ printf("Received %d at " PRINTF_TAG ".\n",
+ in->value,
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ tag_t zero = (tag_t) { .time = lf_time_start(), .microstep = 0u };
+ if (lf_tag_compare(lf_tag(), zero) == 0) {
+ // Request stop at (0,0)
+ printf("Requesting stop at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ lf_request_stop();
+ }
+ =}
+
+ reaction(shutdown) {=
+ // Sender should have requested stop earlier than the receiver.
+ // Therefore, the shutdown events must occur at (0, 0) on the
+ // receiver.
+ if (lf_time_logical_elapsed() != USEC(0) ||
+ lf_tag().microstep != 1) {
+ fprintf(stderr, "ERROR: Receiver failed to stop the federation in time. "
+ "Stopping at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ exit(1);
+ }
+ printf("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep);
+ =}
+}
+
+federated reactor {
+ sender = new Sender()
+ receiver = new Receiver()
+
+ sender.out -> receiver.in
+}
diff --git a/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf b/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf
new file mode 100644
index 0000000000..0bed9e03d1
--- /dev/null
+++ b/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf
@@ -0,0 +1,39 @@
+/**
+ * Test that enclaves within federates all stop at the time requested by the first enclave to
+ * request a stop. Note that the test has no timeout because any finite timeout can, in theory,
+ * cause the test to fail. The first federate to request a stop does no at 50 ms, so the program
+ * should terminate quickly if all goes well.
+ */
+target C
+
+reactor Stop(
+ // Zero value here means "don't stop".
+ stop_time: time = 0) {
+ preamble {=
+ #include "platform.h" // Defines PRINTF_TIME
+ =}
+ timer t(stop_time)
+
+ reaction(t) {=
+ if (self->stop_time > 0) lf_request_stop();
+ =}
+
+ reaction(shutdown) {=
+ lf_print("Stopped at tag (" PRINTF_TIME ", %d)", lf_time_logical_elapsed(), lf_tag().microstep);
+ if (lf_time_logical_elapsed() != 50000000LL || lf_tag().microstep != 1) {
+ lf_print_error_and_exit("Expected stop tag to be (50ms, 1).");
+ }
+ =}
+}
+
+reactor Fed(least_stop_time: time = 0) {
+ @enclave
+ s1 = new Stop()
+ @enclave
+ s2 = new Stop(stop_time=least_stop_time)
+}
+
+federated reactor {
+ f1 = new Fed()
+ f2 = new Fed(least_stop_time = 50 ms)
+}
diff --git a/test/RustRti/src/federated/FederatedFilePkgReader.lf b/test/RustRti/src/federated/FederatedFilePkgReader.lf
new file mode 100644
index 0000000000..cf79291acf
--- /dev/null
+++ b/test/RustRti/src/federated/FederatedFilePkgReader.lf
@@ -0,0 +1,57 @@
+/** Test reading a file at a location relative to the source file. */
+target C {
+ timeout: 0 s
+}
+
+reactor Source {
+ output out: char* // Use char*, not string, so memory is freed.
+
+ reaction(startup) -> out {=
+ char* file_path =
+ LF_PACKAGE_DIRECTORY
+ LF_FILE_SEPARATOR "src"
+ LF_FILE_SEPARATOR "lib"
+ LF_FILE_SEPARATOR "FileReader.txt";
+
+ FILE* file = fopen(file_path, "rb");
+ if (file == NULL) lf_print_error_and_exit("Error opening file at path %s.", file_path);
+
+ // Determine the file size
+ fseek(file, 0, SEEK_END);
+ long file_size = ftell(file);
+ fseek(file, 0, SEEK_SET);
+
+ // Allocate memory for the buffer
+ char* buffer = (char *) malloc(file_size + 1);
+ if (buffer == NULL) lf_print_error_and_exit("Out of memory.");
+
+ // Read the file into the buffer
+ fread(buffer, file_size, 1, file);
+ buffer[file_size] = '\0';
+ fclose(file);
+
+ // For federated version, have to use lf_set_array so array size is know
+ // to the serializer.
+ lf_set_array(out, buffer, file_size + 1);
+ =}
+}
+
+reactor Check {
+ preamble {=
+ #include
+ =}
+ input in: char*
+
+ reaction(in) {=
+ printf("Received: %s\n", in->value);
+ if (strcmp("Hello World", in->value) != 0) {
+ lf_print_error_and_exit("Expected 'Hello World'");
+ }
+ =}
+}
+
+federated reactor {
+ s = new Source()
+ c = new Check()
+ s.out -> c.in
+}
diff --git a/test/RustRti/src/federated/FederatedFileReader.lf b/test/RustRti/src/federated/FederatedFileReader.lf
new file mode 100644
index 0000000000..617d34c3c8
--- /dev/null
+++ b/test/RustRti/src/federated/FederatedFileReader.lf
@@ -0,0 +1,66 @@
+/** Test reading a file at a location relative to the source file. */
+target C {
+ logging: DEBUG,
+ timeout: 0 s
+}
+
+reactor Source {
+ output out: char* // Use char*, not string, so memory is freed.
+
+ reaction(startup) -> out {=
+ char* file_path =
+ LF_SOURCE_DIRECTORY
+ LF_FILE_SEPARATOR ".."
+ LF_FILE_SEPARATOR "lib"
+ LF_FILE_SEPARATOR "FileReader.txt";
+
+ FILE* file = fopen(file_path, "rb");
+ if (file == NULL) lf_print_error_and_exit("Error opening file at path %s.", file_path);
+
+ // Determine the file size
+ fseek(file, 0, SEEK_END);
+ long file_size = ftell(file);
+ fseek(file, 0, SEEK_SET);
+
+ // Allocate memory for the buffer
+ char* buffer = (char *) malloc(file_size + 1);
+ if (buffer == NULL) lf_print_error_and_exit("Out of memory.");
+
+ // Read the file into the buffer
+ fread(buffer, file_size, 1, file);
+ buffer[file_size] = '\0';
+ fclose(file);
+
+ // For federated version, have to use lf_set_array so array size is know
+ // to the serializer.
+ lf_set_array(out, buffer, file_size + 1);
+ =}
+}
+
+reactor Check {
+ preamble {=
+ #include
+ =}
+ input in: char*
+ state received: bool = false
+
+ reaction(in) {=
+ printf("Received: %s\n", in->value);
+ self->received = true;
+ if (strcmp("Hello World", in->value) != 0) {
+ lf_print_error_and_exit("Expected 'Hello World'");
+ }
+ =}
+
+ reaction(shutdown) {=
+ if (!self->received) {
+ lf_print_error_and_exit("No input received.");
+ }
+ =}
+}
+
+federated reactor {
+ s = new Source()
+ c = new Check()
+ s.out -> c.in
+}
diff --git a/test/RustRti/src/federated/FeedbackDelay.lf b/test/RustRti/src/federated/FeedbackDelay.lf
new file mode 100644
index 0000000000..88b15945b2
--- /dev/null
+++ b/test/RustRti/src/federated/FeedbackDelay.lf
@@ -0,0 +1,85 @@
+/**
+ * This test has two coupled cycles. In this variant, one is a zero-delay cycle (ZDC) and the other
+ * is not, having a microstep delay. In this variant, the microstep delay is on a connection
+ * entering the ZDC.
+ */
+target C {
+ timeout: 1 sec
+}
+
+reactor PhysicalPlant {
+ input control: double
+ output sensor: double
+ timer t(0, 100 ms)
+ state last_sensor_time: time = 0
+ state previous_sensor_time: time = 0
+ state count: int = 0
+
+ reaction(t) -> sensor {=
+ lf_set(sensor, 42);
+ self->previous_sensor_time = self->last_sensor_time;
+ self->last_sensor_time = lf_time_physical();
+ =}
+
+ reaction(control) {=
+ self->count++;
+ lf_print("Control input: %f", control->value);
+ instant_t control_time = lf_time_physical();
+ lf_print("Latency: " PRINTF_TIME ".", control_time - self->previous_sensor_time);
+ lf_print("Logical time: " PRINTF_TIME ".", lf_time_logical_elapsed());
+ =}
+
+ reaction(shutdown) {=
+ if (self->count != 10) {
+ lf_print_error_and_exit("Received only %d inputs.", self->count);
+ }
+ =}
+}
+
+reactor Controller {
+ input sensor: double
+ output control: double
+
+ state latest_control: double = 0.0
+ state first: bool = true
+
+ output request_for_planning: double
+ input planning: double
+
+ reaction(planning) {=
+ self->latest_control = planning->value;
+ tag_t now = lf_tag();
+ lf_print("Controller received planning value %f at tag " PRINTF_TAG,
+ self->latest_control, now.time - lf_time_start(), now.microstep
+ );
+ =}
+
+ reaction(sensor) -> control, request_for_planning {=
+ if (!self->first) {
+ lf_set(control, self->latest_control);
+ }
+ self->first = false;
+ lf_set(request_for_planning, sensor->value);
+ =}
+}
+
+reactor Planner {
+ input request: double
+ output response: double
+
+ reaction(request) -> response {=
+ lf_sleep(MSEC(10));
+ lf_set(response, request->value);
+ =}
+}
+
+federated reactor {
+ p = new PhysicalPlant()
+ c = new Controller()
+ pl = new Planner()
+
+ p.sensor -> c.sensor
+ c.request_for_planning -> pl.request
+ pl.response -> c.planning after 0
+ c.control -> p.control
+}
diff --git a/test/RustRti/src/federated/FeedbackDelay3.lf b/test/RustRti/src/federated/FeedbackDelay3.lf
new file mode 100644
index 0000000000..e7d47d9340
--- /dev/null
+++ b/test/RustRti/src/federated/FeedbackDelay3.lf
@@ -0,0 +1,41 @@
+/** This test has two coupled cycles. In this variant, both are a zero-delay cycles (ZDC). */
+target C {
+ timeout: 1 sec,
+ tracing: true
+}
+
+import PhysicalPlant, Planner from "FeedbackDelay.lf"
+
+reactor Controller {
+ input sensor: double
+ output control: double
+
+ state latest_control: double = 0.0
+ state first: bool = true
+
+ output request_for_planning: double
+ input planning: double
+
+ reaction(sensor) -> control, request_for_planning {=
+ if (!self->first) {
+ lf_set(control, self->latest_control);
+ }
+ self->first = false;
+ lf_set(request_for_planning, sensor->value);
+ =}
+
+ reaction(planning) {=
+ self->latest_control = planning->value;
+ =}
+}
+
+federated reactor {
+ p = new PhysicalPlant()
+ c = new Controller()
+ pl = new Planner()
+
+ p.sensor -> c.sensor
+ c.request_for_planning -> pl.request
+ pl.response -> c.planning
+ c.control -> p.control
+}
diff --git a/test/RustRti/src/federated/FeedbackDelay5.lf b/test/RustRti/src/federated/FeedbackDelay5.lf
new file mode 100644
index 0000000000..cd7edcd051
--- /dev/null
+++ b/test/RustRti/src/federated/FeedbackDelay5.lf
@@ -0,0 +1,57 @@
+/**
+ * This test has two coupled cycles. In this variant, both are zero-delay cycles (ZDC), but one of
+ * the cycles has two superposed cycles, one of which is zero delay and the other of which is not.
+ */
+target C {
+ timeout: 900 ms
+}
+
+import PhysicalPlant from "FeedbackDelay.lf"
+
+reactor Controller {
+ input in: double
+ input sensor: double
+ output control: double
+
+ state latest_control: double = 0.0
+
+ output request_for_planning: double
+ input planning: double
+
+ reaction(in, planning) {=
+ self->latest_control = planning->value;
+ =}
+
+ reaction(sensor) -> control, request_for_planning {=
+ lf_set(control, self->latest_control);
+ lf_set(request_for_planning, sensor->value);
+ =}
+}
+
+reactor Planner {
+ input request: double
+ output response: double
+ output out: double
+ timer t(0, 100 ms)
+
+ reaction(t) -> out {=
+ lf_set(out, 0);
+ =}
+
+ reaction(request) -> response {=
+ lf_sleep(MSEC(10));
+ lf_set(response, request->value);
+ =}
+}
+
+federated reactor {
+ p = new PhysicalPlant()
+ c = new Controller()
+ pl = new Planner()
+
+ p.sensor -> c.sensor
+ c.request_for_planning -> pl.request
+ pl.response -> c.planning after 0
+ c.control -> p.control
+ pl.out -> c.in
+}
diff --git a/test/RustRti/src/federated/FeedbackDelaySimple.lf b/test/RustRti/src/federated/FeedbackDelaySimple.lf
new file mode 100644
index 0000000000..655fbe0762
--- /dev/null
+++ b/test/RustRti/src/federated/FeedbackDelaySimple.lf
@@ -0,0 +1,41 @@
+target C {
+ timeout: 1 sec
+}
+
+reactor Loop {
+ input in: int
+ output out: int
+ timer t(0, 100 msec)
+ state count: int = 1
+
+ reaction(in) {=
+ lf_print("Received %d.", in->value);
+ if (in->value != self->count) {
+ lf_print_error_and_exit(
+ "Expected %d. Got %d.",
+ self->count,
+ in->value
+ );
+ }
+ self->count++;
+ =}
+
+ reaction(t) -> out {=
+ lf_set(out, self->count);
+ =}
+
+ reaction(shutdown) {=
+ if (self->count != 11) {
+ lf_print_error_and_exit(
+ "Expected 11 messages. Got %d.",
+ self->count
+ );
+ }
+ =}
+}
+
+federated reactor {
+ l = new Loop()
+
+ l.out -> l.in after 0
+}
diff --git a/test/RustRti/src/federated/HelloDistributed.lf b/test/RustRti/src/federated/HelloDistributed.lf
new file mode 100644
index 0000000000..fc7c10da3c
--- /dev/null
+++ b/test/RustRti/src/federated/HelloDistributed.lf
@@ -0,0 +1,56 @@
+/**
+ * Test a particularly simple form of a distributed deterministic system where a federation that
+ * receives timestamped messages has only those messages as triggers. Therefore, no additional
+ * coordination of the advancement of time (HLA or Ptides) is needed.
+ * @author Edward A. Lee
+ */
+target C
+
+preamble {=
+ #include
+=}
+
+reactor Source {
+ output out: string
+
+ reaction(startup) -> out {=
+ lf_print("Sending 'Hello World!' message from source federate.");
+ lf_set(out, "Hello World!");
+ lf_request_stop();
+ =}
+}
+
+reactor Destination {
+ input in: string
+ state received: bool = false
+
+ reaction(startup) {=
+ lf_print("Destination started.");
+ =}
+
+ reaction(in) {=
+ lf_print("At logical time " PRINTF_TIME ", destination received: %s", lf_time_logical_elapsed(), in->value);
+ if (strcmp(in->value, "Hello World!") != 0) {
+ fprintf(stderr, "ERROR: Expected to receive 'Hello World!'\n");
+ exit(1);
+ }
+ self->received = true;
+ =}
+
+ reaction(shutdown) {=
+ lf_print("Shutdown invoked.");
+ if (!self->received) {
+ lf_print_error_and_exit("Destination did not receive the message.");
+ }
+ =}
+}
+
+federated reactor HelloDistributed at localhost {
+ s = new Source() // Reactor s is in federate Source
+ d = new Destination() // Reactor d is in federate Destination
+ s.out -> d.in // This version preserves the timestamp.
+
+ reaction(startup) {=
+ lf_print("Printing something in top-level federated reactor.");
+ =}
+}
diff --git a/test/RustRti/src/federated/InheritanceFederated.lf b/test/RustRti/src/federated/InheritanceFederated.lf
new file mode 100644
index 0000000000..90098b29bb
--- /dev/null
+++ b/test/RustRti/src/federated/InheritanceFederated.lf
@@ -0,0 +1,23 @@
+// Test for inheritance in a federated program.
+// Compilation without errors is success.
+// Based on https://github.com/lf-lang/lingua-franca/issues/1733.
+target C {
+ timeout: 1 ms
+}
+
+reactor A {
+ reaction(startup) {=
+ printf("Hello\n");
+ =}
+}
+
+reactor B {
+ a = new A()
+}
+
+reactor C extends B {
+}
+
+federated reactor {
+ c = new C()
+}
diff --git a/test/RustRti/src/federated/LoopDistributedCentralized.lf b/test/RustRti/src/federated/LoopDistributedCentralized.lf
new file mode 100644
index 0000000000..968ac2784e
--- /dev/null
+++ b/test/RustRti/src/federated/LoopDistributedCentralized.lf
@@ -0,0 +1,48 @@
+/**
+ * This tests a feedback loop with physical actions and centralized coordination.
+ *
+ * @author Edward A. Lee
+ */
+target C {
+ coordination: centralized,
+ coordination-options: {
+ advance-message-interval: 100 msec
+ },
+ timeout: 4 sec,
+ logging: DEBUG
+}
+
+reactor Looper(incr: int = 1, delay: time = 0 msec) {
+ input in: int
+ output out: int
+ physical action a(delay)
+ state count: int = 0
+
+ timer t(0, 1 sec)
+
+ reaction(t) -> out {=
+ lf_set(out, self->count);
+ self->count += self->incr;
+ =}
+
+ reaction(in) {=
+ instant_t time_lag = lf_time_physical() - lf_time_logical();
+ char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
+ lf_comma_separated_time(time_buffer, time_lag);
+ lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ if (self->count != 5 * self->incr) {
+ lf_print_error_and_exit("Failed to receive all five expected inputs.");
+ }
+ =}
+}
+
+federated reactor LoopDistributedCentralized(delay: time = 0) {
+ left = new Looper()
+ right = new Looper(incr=-1)
+ left.out -> right.in
+ right.out -> left.in
+}
diff --git a/test/RustRti/src/federated/LoopDistributedCentralized2.lf b/test/RustRti/src/federated/LoopDistributedCentralized2.lf
new file mode 100644
index 0000000000..25de5873e2
--- /dev/null
+++ b/test/RustRti/src/federated/LoopDistributedCentralized2.lf
@@ -0,0 +1,75 @@
+/**
+ * This tests a feedback loop with physical actions and centralized coordination.
+ *
+ * @author Edward A. Lee
+ */
+target C {
+ coordination: centralized,
+ coordination-options: {
+ advance-message-interval: 100 msec
+ },
+ timeout: 4 sec
+}
+
+reactor Looper(incr: int = 1, delay: time = 0 msec) {
+ input in: int
+ output out: int
+ physical action a(delay)
+ state count: int = 0
+
+ timer t(0, 1 sec)
+
+ reaction(t) -> out {=
+ lf_set(out, self->count);
+ self->count += self->incr;
+ =}
+
+ reaction(in) {=
+ instant_t time_lag = lf_time_physical() - lf_time_logical();
+ char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
+ lf_comma_separated_time(time_buffer, time_lag);
+ lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ if (self->count != 5 * self->incr) {
+ lf_print_error_and_exit("Failed to receive all five expected inputs.");
+ }
+ =}
+}
+
+reactor Looper2(incr: int = 1, delay: time = 0 msec) {
+ input in: int
+ output out: int
+ physical action a(delay)
+ state count: int = 0
+
+ timer t(0, 1 sec)
+
+ reaction(in) {=
+ instant_t time_lag = lf_time_physical() - lf_time_logical();
+ char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
+ lf_comma_separated_time(time_buffer, time_lag);
+ lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
+ =}
+
+ reaction(t) -> out {=
+ lf_set(out, self->count);
+ self->count += self->incr;
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ if (self->count != 5 * self->incr) {
+ lf_print_error_and_exit("Failed to receive all five expected inputs.");
+ }
+ =}
+}
+
+federated reactor(delay: time = 0) {
+ left = new Looper()
+ right = new Looper2(incr=-1)
+ left.out -> right.in
+ right.out -> left.in
+}
diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf
new file mode 100644
index 0000000000..ac783f07cc
--- /dev/null
+++ b/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf
@@ -0,0 +1,74 @@
+/**
+ * This tests a feedback loop with physical actions and centralized coordination.
+ *
+ * @author Edward A. Lee
+ */
+target C {
+ flags: "-Wall",
+ coordination: centralized,
+ coordination-options: {
+ advance-message-interval: 100 msec
+ },
+ timeout: 5 sec,
+ logging: warn
+}
+
+preamble {=
+ #include // Defines sleep()
+ extern bool stop;
+ void* ping(void* actionref);
+=}
+
+reactor Looper(incr: int = 1, delay: time = 0 msec) {
+ preamble {=
+ bool stop = false;
+ // Thread to trigger an action once every second.
+ void* ping(void* actionref) {
+ while(!stop) {
+ lf_print("Scheduling action.");
+ lf_schedule(actionref, 0);
+ sleep(1);
+ }
+ return NULL;
+ }
+ =}
+ input in: int
+ output out: int
+ physical action a(delay)
+ state count: int = 0
+
+ reaction(startup) -> a {=
+ // Start the thread that listens for Enter or Return.
+ lf_thread_t thread_id;
+ lf_print("Starting thread.");
+ lf_thread_create(&thread_id, &ping, a);
+ =}
+
+ reaction(a) -> out {=
+ lf_set(out, self->count);
+ self->count += self->incr;
+ =}
+
+ reaction(in) {=
+ instant_t time_lag = lf_time_physical() - lf_time_logical();
+ char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
+ lf_comma_separated_time(time_buffer, time_lag);
+ lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ // Stop the thread that is scheduling actions.
+ stop = true;
+ if (self->count != 5 * self->incr) {
+ lf_print_error_and_exit("Failed to receive all five expected inputs.");
+ }
+ =}
+}
+
+federated reactor(delay: time = 0) {
+ left = new Looper()
+ right = new Looper(incr=-1)
+ left.out -> right.in
+ right.out -> left.in
+}
diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf
new file mode 100644
index 0000000000..51a10faac2
--- /dev/null
+++ b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf
@@ -0,0 +1,56 @@
+/**
+ * This tests that the precedence order of reaction invocation is kept when a feedback loop is
+ * present in centralized coordination.
+ *
+ * @author Edward A. Lee
+ * @author Soroush Bateni
+ */
+target C {
+ flags: "-Wall",
+ coordination: centralized,
+ coordination-options: {
+ advance-message-interval: 100 msec
+ },
+ timeout: 5 sec
+}
+
+reactor Looper(incr: int = 1, delay: time = 0 msec) {
+ input in: int
+ output out: int
+ state count: int = 0
+ state received_count: int = 0
+ timer t(0, 1 sec)
+
+ reaction(t) -> out {=
+ lf_set(out, self->count);
+ self->count += self->incr;
+ =}
+
+ reaction(in) {=
+ instant_t time_lag = lf_time_physical() - lf_time_logical();
+ char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
+ lf_comma_separated_time(time_buffer, time_lag);
+ lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
+ self->received_count = self->count;
+ =}
+
+ reaction(t) {=
+ if (self->received_count != self->count) {
+ lf_print_error_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept.");
+ }
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ if (self->count != 6 * self->incr) {
+ lf_print_error_and_exit("Failed to receive all six expected inputs.");
+ }
+ =}
+}
+
+federated reactor(delay: time = 0) {
+ left = new Looper()
+ right = new Looper(incr=-1)
+ left.out -> right.in
+ right.out -> left.in
+}
diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf
new file mode 100644
index 0000000000..82adfca699
--- /dev/null
+++ b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf
@@ -0,0 +1,73 @@
+/**
+ * This tests that the precedence order of reaction invocation is kept in the hierarchy of reactors
+ * when a feedback loop is present in centralized coordination.
+ *
+ * @author Edward A. Lee
+ * @author Soroush Bateni
+ */
+target C {
+ flags: "-Wall",
+ coordination: centralized,
+ coordination-options: {
+ advance-message-interval: 100 msec
+ },
+ timeout: 5 sec
+}
+
+reactor Contained(incr: int = 1) {
+ timer t(0, 1 sec)
+ input in: int
+ state count: int = 0
+ state received_count: int = 0
+
+ reaction(t) {=
+ self->count += self->incr;
+ =}
+
+ reaction(in) {=
+ self->received_count = self->count;
+ =}
+
+ reaction(t) {=
+ if (self->received_count != self->count) {
+ lf_print_error_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept.");
+ }
+ =}
+}
+
+reactor Looper(incr: int = 1, delay: time = 0 msec) {
+ input in: int
+ output out: int
+ state count: int = 0
+ timer t(0, 1 sec)
+
+ c = new Contained(incr=incr)
+ in -> c.in
+
+ reaction(t) -> out {=
+ lf_print("Sending network output %d", self->count);
+ lf_set(out, self->count);
+ self->count += self->incr;
+ =}
+
+ reaction(in) {=
+ instant_t time_lag = lf_time_physical() - lf_time_logical();
+ char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
+ lf_comma_separated_time(time_buffer, time_lag);
+ lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ if (self->count != 6 * self->incr) {
+ lf_print_error_and_exit("Failed to receive all six expected inputs.");
+ }
+ =}
+}
+
+federated reactor(delay: time = 0) {
+ left = new Looper()
+ right = new Looper(incr=-1)
+ left.out -> right.in
+ right.out -> left.in
+}
diff --git a/test/RustRti/src/federated/ParallelDestinations.lf b/test/RustRti/src/federated/ParallelDestinations.lf
new file mode 100644
index 0000000000..a4a4c026db
--- /dev/null
+++ b/test/RustRti/src/federated/ParallelDestinations.lf
@@ -0,0 +1,23 @@
+/** Test parallel connections for federated execution. */
+target C {
+ timeout: 2 sec
+}
+
+import Count from "../lib/Count.lf"
+import TestCount from "../lib/TestCount.lf"
+
+reactor Source {
+ output[2] out: int
+ c1 = new Count()
+ c2 = new Count()
+
+ c1.out, c2.out -> out
+}
+
+federated reactor {
+ s = new Source()
+ t1 = new TestCount(num_inputs=3)
+ t2 = new TestCount(num_inputs=3)
+
+ s.out -> t1.in, t2.in
+}
diff --git a/test/RustRti/src/federated/ParallelSources.lf b/test/RustRti/src/federated/ParallelSources.lf
new file mode 100644
index 0000000000..0bedc87d68
--- /dev/null
+++ b/test/RustRti/src/federated/ParallelSources.lf
@@ -0,0 +1,24 @@
+/** Test parallel connections for federated execution. */
+target C {
+ timeout: 2 sec
+}
+
+import Count from "../lib/Count.lf"
+import TestCount from "../lib/TestCount.lf"
+
+reactor Destination {
+ input[2] in: int
+
+ t1 = new TestCount(num_inputs=3)
+ t2 = new TestCount(num_inputs=3)
+
+ in -> t1.in, t2.in
+}
+
+federated reactor {
+ c1 = new Count()
+ c2 = new Count()
+ d = new Destination()
+
+ c1.out, c2.out -> d.in
+}
diff --git a/test/RustRti/src/federated/ParallelSourcesMultiport.lf b/test/RustRti/src/federated/ParallelSourcesMultiport.lf
new file mode 100644
index 0000000000..026c223463
--- /dev/null
+++ b/test/RustRti/src/federated/ParallelSourcesMultiport.lf
@@ -0,0 +1,34 @@
+/** Test parallel connections for federated execution. */
+target C {
+ timeout: 2 sec
+}
+
+import Count from "../lib/Count.lf"
+import TestCount from "../lib/TestCount.lf"
+
+reactor Source {
+ output[2] out: int
+ c1 = new Count()
+ c2 = new Count()
+
+ c1.out, c2.out -> out
+}
+
+reactor Destination1 {
+ input[3] in: int
+
+ t1 = new TestCount(num_inputs=3)
+ t2 = new TestCount(num_inputs=3)
+ t3 = new TestCount(num_inputs=3)
+
+ in -> t1.in, t2.in, t3.in
+}
+
+federated reactor {
+ s1 = new Source()
+ s2 = new Source()
+ d1 = new Destination1()
+ t4 = new TestCount(num_inputs=3)
+
+ s1.out, s2.out -> d1.in, t4.in
+}
diff --git a/test/RustRti/src/federated/SimpleFederated.lf b/test/RustRti/src/federated/SimpleFederated.lf
new file mode 100644
index 0000000000..cb6a798f8b
--- /dev/null
+++ b/test/RustRti/src/federated/SimpleFederated.lf
@@ -0,0 +1,17 @@
+target C {
+ timeout: 2 secs,
+ build-type: RelWithDebInfo
+}
+
+reactor Fed {
+ input in: int
+ output out: int
+}
+
+federated reactor {
+ fed1 = new Fed()
+ fed2 = new Fed()
+
+ fed1.out -> fed2.in
+ fed2.out -> fed1.in
+}
diff --git a/test/RustRti/src/federated/SpuriousDependency.lf b/test/RustRti/src/federated/SpuriousDependency.lf
new file mode 100644
index 0000000000..b810d5288f
--- /dev/null
+++ b/test/RustRti/src/federated/SpuriousDependency.lf
@@ -0,0 +1,63 @@
+/**
+ * This checks that a federated program does not deadlock when it is ambiguous, given the structure
+ * of a federate, whether it is permissible to require certain network sender/receiver reactions to
+ * precede others in the execution of a given tag.
+ */
+target C {
+ timeout: 1 sec
+}
+
+reactor Passthrough(id: int = 0) {
+ input in: int
+ output out: int
+
+ reaction(in) -> out {=
+ lf_print("Hello from passthrough %d", self->id);
+ lf_set(out, in->value);
+ =}
+}
+
+reactor Twisty {
+ input in0: int
+ input in1: int
+ output out0: int
+ output out1: int
+ p0 = new Passthrough(id=0)
+ p1 = new Passthrough(id=1)
+ in0 -> p0.in
+ p0.out -> out0
+ in1 -> p1.in
+ p1.out -> out1
+}
+
+reactor Check {
+ input in: int
+
+ state count: int = 0
+
+ reaction(in) {=
+ lf_print("count is now %d", ++self->count);
+ =}
+
+ reaction(shutdown) {=
+ lf_print("******* Shutdown invoked.");
+ if (self->count != 1) {
+ lf_print_error_and_exit("Failed to receive expected input.");
+ }
+ =}
+}
+
+federated reactor {
+ t0 = new Twisty()
+ t1 = new Twisty()
+ check = new Check()
+ t0.out1 -> t1.in0
+ t1.out1 -> t0.in0
+ state count: int = 0
+
+ t1.out0 -> check.in
+
+ reaction(startup) -> t0.in1 {=
+ lf_set(t0.in1, 0);
+ =}
+}
diff --git a/test/RustRti/src/federated/StopAtShutdown.lf b/test/RustRti/src/federated/StopAtShutdown.lf
new file mode 100644
index 0000000000..2fad7db3d0
--- /dev/null
+++ b/test/RustRti/src/federated/StopAtShutdown.lf
@@ -0,0 +1,45 @@
+/**
+ * Check that lf_request_stop() doesn't cause any issues at the shutdown tag.
+ *
+ * Original bug discovered by Steven Wong
+ *
+ * @author Steven Wong
+ */
+target C {
+ timeout: 2 sec
+}
+
+reactor A {
+ input in: int
+
+ reaction(startup) {=
+ lf_print("Hello World!");
+ =}
+
+ reaction(in) {=
+ lf_print("Got it");
+ =}
+
+ reaction(shutdown) {=
+ lf_request_stop();
+ =}
+}
+
+reactor B {
+ output out: int
+ timer t(1 sec)
+
+ reaction(t) -> out {=
+ lf_set(out, 1);
+ =}
+
+ reaction(shutdown) {=
+ lf_request_stop();
+ =}
+}
+
+federated reactor {
+ a = new A()
+ b = new B()
+ b.out -> a.in
+}
diff --git a/test/RustRti/src/federated/TopLevelArtifacts.lf b/test/RustRti/src/federated/TopLevelArtifacts.lf
new file mode 100644
index 0000000000..d73ea35967
--- /dev/null
+++ b/test/RustRti/src/federated/TopLevelArtifacts.lf
@@ -0,0 +1,44 @@
+/**
+ * Test whether top-level reactions, actions, and ports are handled appropriately.
+ *
+ * Currently, these artifacts are replicated on all federates.
+ *
+ * @note This just tests for the correctness of the code generation. These top-level artifacts might
+ * be disallowed in the future.
+ */
+target C {
+ timeout: 1 msec
+}
+
+import Count from "../lib/Count.lf"
+import TestCount from "../lib/TestCount.lf"
+
+federated reactor {
+ state successes: int = 0
+ timer t(0, 1 sec)
+ logical action act(0)
+
+ c = new Count()
+ tc = new TestCount()
+ c.out -> tc.in
+
+ reaction(startup) {=
+ self->successes++;
+ =}
+
+ reaction(t) -> act {=
+ self->successes++;
+ lf_schedule(act, 0);
+ =}
+
+ reaction(act) {=
+ self->successes++;
+ =}
+
+ reaction(shutdown) {=
+ if (self->successes != 3) {
+ lf_print_error_and_exit("Failed to properly execute top-level reactions");
+ }
+ lf_print("SUCCESS!");
+ =}
+}
diff --git a/test/RustRti/src/lib/Count.lf b/test/RustRti/src/lib/Count.lf
new file mode 100644
index 0000000000..ee3953b021
--- /dev/null
+++ b/test/RustRti/src/lib/Count.lf
@@ -0,0 +1,11 @@
+target C
+
+reactor Count(offset: time = 0, period: time = 1 sec) {
+ state count: int = 1
+ output out: int
+ timer t(offset, period)
+
+ reaction(t) -> out {=
+ lf_set(out, self->count++);
+ =}
+}
diff --git a/test/RustRti/src/lib/FileLevelPreamble.lf b/test/RustRti/src/lib/FileLevelPreamble.lf
new file mode 100644
index 0000000000..11067d5e63
--- /dev/null
+++ b/test/RustRti/src/lib/FileLevelPreamble.lf
@@ -0,0 +1,12 @@
+/** Test for ensuring that file-level preambles are inherited when a file is imported. */
+target C
+
+preamble {=
+ #define FOO 2
+=}
+
+reactor FileLevelPreamble {
+ reaction(startup) {=
+ printf("FOO: %d\n", FOO);
+ =}
+}
diff --git a/test/RustRti/src/lib/FileReader.txt b/test/RustRti/src/lib/FileReader.txt
new file mode 100644
index 0000000000..5e1c309dae
--- /dev/null
+++ b/test/RustRti/src/lib/FileReader.txt
@@ -0,0 +1 @@
+Hello World
\ No newline at end of file
diff --git a/test/RustRti/src/lib/GenDelay.lf b/test/RustRti/src/lib/GenDelay.lf
new file mode 100644
index 0000000000..8f21c3de1b
--- /dev/null
+++ b/test/RustRti/src/lib/GenDelay.lf
@@ -0,0 +1,21 @@
+target C
+
+preamble {=
+ typedef int message_t;
+=}
+
+reactor Source {
+ output out: message_t
+
+ reaction(startup) -> out {=
+ lf_set(out, 42);
+ =}
+}
+
+reactor Sink {
+ input in: message_t
+
+ reaction(in) {=
+ lf_print("Received %d at time %lld", in->value, lf_time_logical_elapsed());
+ =}
+}
diff --git a/test/RustRti/src/lib/Imported.lf b/test/RustRti/src/lib/Imported.lf
new file mode 100644
index 0000000000..85d0a2b493
--- /dev/null
+++ b/test/RustRti/src/lib/Imported.lf
@@ -0,0 +1,14 @@
+// This is used by the test for the ability to import a reactor definition that itself imports a
+// reactor definition.
+target C
+
+import ImportedAgain from "./ImportedAgain.lf"
+
+reactor Imported {
+ input x: int
+ a = new ImportedAgain()
+
+ reaction(x) -> a.x {=
+ lf_set(a.x, x->value);
+ =}
+}
diff --git a/test/RustRti/src/lib/ImportedAgain.lf b/test/RustRti/src/lib/ImportedAgain.lf
new file mode 100644
index 0000000000..6870526b95
--- /dev/null
+++ b/test/RustRti/src/lib/ImportedAgain.lf
@@ -0,0 +1,15 @@
+// This is used by the test for the ability to import a reactor definition that itself imports a
+// reactor definition.
+target C
+
+reactor ImportedAgain {
+ input x: int
+
+ reaction(x) {=
+ printf("Received: %d.\n", x->value);
+ if (x->value != 42) {
+ printf("ERROR: Expected input to be 42. Got: %d.\n", x->value);
+ exit(1);
+ }
+ =}
+}
diff --git a/test/RustRti/src/lib/ImportedComposition.lf b/test/RustRti/src/lib/ImportedComposition.lf
new file mode 100644
index 0000000000..e5524f3d22
--- /dev/null
+++ b/test/RustRti/src/lib/ImportedComposition.lf
@@ -0,0 +1,22 @@
+// This is used by the test for the ability to import a reactor definition that itself imports a
+// reactor definition.
+target C
+
+reactor Gain {
+ input x: int
+ output y: int
+
+ reaction(x) -> y {=
+ lf_set(y, x->value * 2);
+ =}
+}
+
+reactor ImportedComposition {
+ input x: int
+ output y: int
+ g1 = new Gain()
+ g2 = new Gain()
+ x -> g1.x after 10 msec
+ g1.y -> g2.x after 30 msec
+ g2.y -> y after 15 msec
+}
diff --git a/test/RustRti/src/lib/InternalDelay.lf b/test/RustRti/src/lib/InternalDelay.lf
new file mode 100644
index 0000000000..fb7124a4ec
--- /dev/null
+++ b/test/RustRti/src/lib/InternalDelay.lf
@@ -0,0 +1,15 @@
+target C
+
+reactor InternalDelay(delay: time = 10 msec) {
+ input in: int
+ output out: int
+ logical action d: int
+
+ reaction(in) -> d {=
+ lf_schedule_int(d, self->delay, in->value);
+ =}
+
+ reaction(d) -> out {=
+ lf_set(out, d->value);
+ =}
+}
diff --git a/test/RustRti/src/lib/LoopedActionSender.lf b/test/RustRti/src/lib/LoopedActionSender.lf
new file mode 100644
index 0000000000..e9ea36f40a
--- /dev/null
+++ b/test/RustRti/src/lib/LoopedActionSender.lf
@@ -0,0 +1,36 @@
+/**
+ * A sender reactor that outputs integers in superdense time.
+ *
+ * @author Soroush Bateni
+ */
+target C
+
+/**
+ * @param take_a_break_after: Indicates how many messages are sent in consecutive superdense time
+ * @param break_interval: Determines how long the reactor should take a break after sending
+ * take_a_break_after messages.
+ */
+reactor Sender(take_a_break_after: int = 10, break_interval: time = 400 msec) {
+ output out: int
+ logical action act
+ state sent_messages: int = 0
+
+ reaction(startup, act) -> act, out {=
+ // Send a message on out
+ /* printf("At tag (%lld, %u) sending value %d.\n",
+ lf_time_logical_elapsed(),
+ lf_tag().microstep,
+ self->sent_messages
+ ); */
+ lf_set(out, self->sent_messages);
+ lf_print("Sender sent %d.", self->sent_messages);
+ self->sent_messages++;
+ if (self->sent_messages < self->take_a_break_after) {
+ lf_schedule(act, 0);
+ } else {
+ // Take a break
+ self->sent_messages=0;
+ lf_schedule(act, self->break_interval);
+ }
+ =}
+}
diff --git a/test/RustRti/src/lib/PassThrough.lf b/test/RustRti/src/lib/PassThrough.lf
new file mode 100644
index 0000000000..389905489a
--- /dev/null
+++ b/test/RustRti/src/lib/PassThrough.lf
@@ -0,0 +1,11 @@
+/** Forward the integer input on `in` to the output port `out`. */
+target C
+
+reactor PassThrough {
+ input in: int
+ output out: int
+
+ reaction(in) -> out {=
+ lf_set(out, in->value);
+ =}
+}
diff --git a/test/RustRti/src/lib/Test.lf b/test/RustRti/src/lib/Test.lf
new file mode 100644
index 0000000000..69e4f79b2c
--- /dev/null
+++ b/test/RustRti/src/lib/Test.lf
@@ -0,0 +1,15 @@
+target C
+
+reactor TestDouble(expected: double[] = {1.0, 1.0, 1.0, 1.0}) {
+ input in: double
+ state count: int = 0
+
+ reaction(in) {=
+ printf("Received: %f\n", in->value);
+ if (in->value != self->expected[self->count]) {
+ printf("ERROR: Expected %f.\n", self->expected[self->count]);
+ exit(1);
+ }
+ self->count++;
+ =}
+}
diff --git a/test/RustRti/src/lib/TestCount.lf b/test/RustRti/src/lib/TestCount.lf
new file mode 100644
index 0000000000..e4fbb82b02
--- /dev/null
+++ b/test/RustRti/src/lib/TestCount.lf
@@ -0,0 +1,34 @@
+/**
+ * Test that a counting sequence of inputs starts with the specified start parameter value,
+ * increments by the specified stride, and receives the specified number of inputs.
+ *
+ * @param start The starting value for the expected inputs. Default is 1.
+ * @param stride The increment for the inputs. Default is 1.
+ * @param num_inputs The number of inputs expected. Default is 1.
+ */
+target C
+
+reactor TestCount(start: int = 1, stride: int = 1, num_inputs: int = 1) {
+ state count: int = start
+ state inputs_received: int = 0
+ input in: int
+
+ reaction(in) {=
+ lf_print("Received %d.", in->value);
+ if (in->value != self->count) {
+ lf_print_error_and_exit("Expected %d.", self->count);
+ }
+ self->count += self->stride;
+ self->inputs_received++;
+ =}
+
+ reaction(shutdown) {=
+ lf_print("Shutdown invoked.");
+ if (self->inputs_received != self->num_inputs) {
+ lf_print_error_and_exit("Expected to receive %d inputs, but got %d.",
+ self->num_inputs,
+ self->inputs_received
+ );
+ }
+ =}
+}
diff --git a/test/RustRti/src/lib/TestCountMultiport.lf b/test/RustRti/src/lib/TestCountMultiport.lf
new file mode 100644
index 0000000000..a0b0db294d
--- /dev/null
+++ b/test/RustRti/src/lib/TestCountMultiport.lf
@@ -0,0 +1,41 @@
+/**
+ * Test that a counting sequence of inputs starts with the specified start parameter value,
+ * increments by the specified stride, and receives the specified number of inputs. This version has
+ * a multiport input, and each input is expected to be present and incremented over the previous
+ * input.
+ *
+ * @param start The starting value for the expected inputs. Default is 1.
+ * @param stride The increment for the inputs. Default is 1.
+ * @param num_inputs The number of inputs expected on each channel. Default is 1.
+ */
+target C
+
+reactor TestCountMultiport(start: int = 1, stride: int = 1, num_inputs: int = 1, width: int = 2) {
+ state count: int = start
+ state inputs_received: int = 0
+ input[width] in: int
+
+ reaction(in) {=
+ for (int i = 0; i < in_width; i++) {
+ if (!in[i]->is_present) {
+ lf_print_error_and_exit("No input on channel %d.", i);
+ }
+ lf_print("Received %d on channel %d.", in[i]->value, i);
+ if (in[i]->value != self->count) {
+ lf_print_error_and_exit("Expected %d.", self->count);
+ }
+ self->count += self->stride;
+ }
+ self->inputs_received++;
+ =}
+
+ reaction(shutdown) {=
+ lf_print("Shutdown invoked.");
+ if (self->inputs_received != self->num_inputs) {
+ lf_print_error_and_exit("Expected to receive %d inputs, but only got %d.",
+ self->num_inputs,
+ self->inputs_received
+ );
+ }
+ =}
+}