Skip to content

Commit

Permalink
Enabled C federated tests with Rust RTI
Browse files Browse the repository at this point in the history
- At first, thirty-four C federated tests are evaluated with Rust RTI
  • Loading branch information
chanijjani committed Feb 24, 2024
1 parent e2d06f3 commit 48de675
Show file tree
Hide file tree
Showing 8 changed files with 498 additions and 7 deletions.
12 changes: 12 additions & 0 deletions core/src/integrationTest/java/org/lflang/tests/RuntimeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ public void runFederatedTests() {
false);
}

@Test
public void runFederatedTestsWithRustRti() {
Assumptions.assumeTrue(supportsFederatedExecution(), Message.NO_FEDERATION_SUPPORT);
runTestsForTargetsWithRustRti(
Message.DESC_FEDERATED_WITH_RUST_RTI,
TestCategory.FEDERATED::equals,
Transformers::noChanges,
Configurators::noChanges,
TestLevel.EXECUTION,
false);
}

/** Run the tests for modal reactors. */
@Test
public void runModalTests() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public void runFederatedTests() {
super.runFederatedTests();
}

@Test
public void runFederatedTestsWithRustRti() {
Assumptions.assumeFalse(isWindows(), Message.NO_WINDOWS_SUPPORT);
super.runFederatedTestsWithRustRti();
}

@Test
public void runModalTests() {
super.runModalTests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,97 @@ public boolean doGenerate(Resource resource, LFGeneratorContext context) throws
return false;
}

/**
* Produce LF code for each federate in a separate file, then invoke a target-specific code
* generator for each of those files.
*
* @param resource The resource that has the federated main reactor in it
* @param context The context in which to carry out the code generation.
* @return False if no errors have occurred, true otherwise.
*/
public boolean doGenerateForRustRTI(Resource resource, LFGeneratorContext context)
throws IOException {
if (!federatedExecutionIsSupported(resource)) return true;
cleanIfNeeded(context);

// In a federated execution, we need keepalive to be true,
// otherwise a federate could exit simply because it hasn't received
// any messages.
KeepaliveProperty.INSTANCE.override(targetConfig, true);

// Process command-line arguments
processCLIArguments(context);

// Find the federated reactor
Reactor federation = FedASTUtils.findFederatedReactor(resource);

// Make sure the RTI host is set correctly.
setRTIHost(federation);

// Create the FederateInstance objects.
ReactorInstance main = createFederateInstances(federation, context);

// Insert reactors that split multiports into many ports.
insertIndexers(main, resource);

// Clear banks so that each bank member becomes a single federate.
for (Instantiation instantiation : ASTUtils.allInstantiations(federation)) {
instantiation.setWidthSpec(null);
instantiation.setWidthSpec(null);
}

// Find all the connections between federates.
// For each connection between federates, replace it in the
// AST with an action (which inherits the delay) and three reactions.
// The action will be physical for physical connections and logical
// for logical connections.
replaceFederateConnectionsWithProxies(federation, main, resource);

FedEmitter fedEmitter =
new FedEmitter(
fileConfig,
ASTUtils.toDefinition(mainDef.getReactorClass()),
messageReporter,
rtiConfig);

// Generate LF code for each federate.
Map<Path, CodeMap> lf2lfCodeMapMap = new HashMap<>();
for (FederateInstance federate : federates) {
lf2lfCodeMapMap.putAll(fedEmitter.generateFederate(context, federate, federates.size()));
}

// Do not invoke target code generators if --no-compile flag is used.
if (context.getTargetConfig().get(NoCompileProperty.INSTANCE)) {
context.finish(Status.GENERATED, lf2lfCodeMapMap);
return false;
}

// If the RTI is to be built locally, set up a build environment for it.
prepareRtiBuildEnvironment(context);

Map<Path, CodeMap> codeMapMap =
compileFederates(
context,
lf2lfCodeMapMap,
subContexts -> {
createDockerFiles(context, subContexts);
generateLaunchScriptForRustRti();
// If an error has occurred during codegen of any federate, report it.
subContexts.forEach(
c -> {
if (c.getErrorReporter().getErrorsOccurred()) {
context
.getErrorReporter()
.at(c.getFileConfig().srcFile)
.error("Failure during code generation of " + c.getFileConfig().srcFile);
}
});
});

context.finish(Status.COMPILED, codeMapMap);
return false;
}

/**
* Prepare a build environment for the rti alongside the generated sources of the federates.
*
Expand Down Expand Up @@ -229,6 +320,11 @@ private void generateLaunchScript() {
.doGenerate(federates, rtiConfig);
}

private void generateLaunchScriptForRustRti() {
new FedLauncherGenerator(this.targetConfig, this.fileConfig, this.messageReporter)
.doGenerateForRustRTI(federates, new RtiConfig());
}

/**
* Generate a Dockerfile for each federate and a docker-compose.yml for the federation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,169 @@ public void doGenerate(List<FederateInstance> federates, RtiConfig rtiConfig) {
}
}

/**
* Create the launcher shell scripts. This will create one or two files in the output path (bin
* directory). The first has name equal to the filename of the source file without the ".lf"
* extension. This will be a shell script that launches the RTI and the federates. If, in
* addition, either the RTI or any federate is mapped to a particular machine (anything other than
* the default "localhost" or "0.0.0.0"), then this will generate a shell script in the bin
* directory with name filename_distribute.sh that copies the relevant source files to the remote
* host and compiles them so that they are ready to execute using the launcher.
*
* <p>A precondition for this to work is that the user invoking this code generator can log into
* the remote host without supplying a password. Specifically, you have to have installed your
* public key (typically found in ~/.ssh/id_rsa.pub) in ~/.ssh/authorized_keys on the remote host.
* In addition, the remote host must be running an ssh service. On an Arch Linux system using
* systemd, for example, this means running:
*
* <p>sudo systemctl <start|enable> ssh.service
*
* <p>Enable means to always start the service at startup, whereas start means to just start it
* this once.
*
* @param federates A list of federate instances in the federation
* @param rtiConfig Can have values for 'host', 'dir', and 'user'
*/
public void doGenerateForRustRTI(List<FederateInstance> federates, RtiConfig rtiConfig) {
// NOTE: It might be good to use screen when invoking the RTI
// or federates remotely, so you can detach and the process keeps running.
// However, I was unable to get it working properly.
// What this means is that the shell that invokes the launcher
// needs to remain live for the duration of the federation.
// If that shell is killed, the federation will die.
// Hence, it is reasonable to launch the federation on a
// machine that participates in the federation, for example,
// on the machine that runs the RTI. The command I tried
// to get screen to work looks like this:
// ssh -t «target» cd «path»; screen -S «filename»_«federate.name» -L
// bin/«filename»_«federate.name» 2>&1
// var outPath = binGenPath
StringBuilder shCode = new StringBuilder();
StringBuilder distCode = new StringBuilder();
shCode.append(getSetupCode()).append("\n");
String distHeader = getDistHeader();
String host = rtiConfig.getHost();
String target = host;

String user = rtiConfig.getUser();
if (user != null) {
target = user + "@" + host;
}

shCode.append("#### Host is ").append(host);

// Launch the RTI in the foreground.
if (host.equals("localhost") || host.equals("0.0.0.0")) {
// FIXME: the paths below will not work on Windows
shCode.append(getLaunchCodeForRustRti(Integer.toString(federates.size()))).append("\n");
} else {
// Start the RTI on the remote machine - Not supported yet for Rust RTI.
}

// Index used for storing pids of federates
int federateIndex = 0;
for (FederateInstance federate : federates) {
var buildConfig = getBuildConfig(federate, fileConfig, messageReporter);
if (federate.isRemote) {
if (distCode.isEmpty()) distCode.append(distHeader).append("\n");
distCode.append(getDistCode(rtiConfig.getDirectory(), federate)).append("\n");
shCode
.append(getFedRemoteLaunchCode(rtiConfig.getDirectory(), federate, federateIndex++))
.append("\n");
} else {
String executeCommand = buildConfig.localExecuteCommand();
shCode
.append(getFedLocalLaunchCode(federate, executeCommand, federateIndex++))
.append("\n");
}
}
if (host.equals("localhost") || host.equals("0.0.0.0")) {
// Local PID managements
shCode.append(
"echo \"#### Bringing the RTI back to foreground so it can receive Control-C.\"" + "\n");
shCode.append("fg %1" + "\n");
}
// Wait for launched processes to finish
shCode
.append(
String.join(
"\n",
"echo \"RTI has exited. Wait for federates to exit.\"",
"# Wait for launched processes to finish.",
"# The errors are handled separately via trap.",
"for pid in \"${pids[@]}\"",
"do",
" wait $pid || exit $?",
"done",
"echo \"All done.\"",
"EXITED_SUCCESSFULLY=true"))
.append("\n");

// Create bin directory for the script.
if (!Files.exists(fileConfig.binPath)) {
try {
Files.createDirectories(fileConfig.binPath);
} catch (IOException e) {
messageReporter.nowhere().error("Unable to create directory: " + fileConfig.binPath);
}
}

// Write the launcher file.
File file = fileConfig.binPath.resolve(fileConfig.name).toFile();
messageReporter.nowhere().info("Script for launching the federation: " + file);

// Delete file previously produced, if any.
if (file.exists()) {
if (!file.delete())
messageReporter
.nowhere()
.error("Failed to delete existing federated launch script \"" + file + "\"");
}

FileOutputStream fOut = null;
try {
fOut = new FileOutputStream(file);
} catch (FileNotFoundException e) {
messageReporter.nowhere().error("Unable to find file: " + file);
}
if (fOut != null) {
try {
fOut.write(shCode.toString().getBytes());
fOut.close();
} catch (IOException e) {
messageReporter.nowhere().error("Unable to write to file: " + file);
}
}

if (!file.setExecutable(true, false)) {
messageReporter.nowhere().warning("Unable to make launcher script executable.");
}

// Write the distributor file.
// Delete the file even if it does not get generated.
file = fileConfig.binPath.resolve(fileConfig.name + "_distribute.sh").toFile();
if (file.exists()) {
if (!file.delete())
messageReporter
.nowhere()
.error("Failed to delete existing federated distributor script \"" + file + "\"");
}
if (distCode.length() > 0) {
try {
fOut = new FileOutputStream(file);
fOut.write(distCode.toString().getBytes());
fOut.close();
if (!file.setExecutable(true, false)) {
messageReporter.nowhere().warning("Unable to make file executable: " + file);
}
} catch (FileNotFoundException e) {
messageReporter.nowhere().error("Unable to find file: " + file);
} catch (IOException e) {
messageReporter.nowhere().error("Unable to write to file " + file);
}
}
}

private String getSetupCode() {
return String.join(
"\n",
Expand Down Expand Up @@ -377,6 +540,35 @@ private String getLaunchCode(String rtiLaunchCode) {
"sleep 1");
}

private String getLaunchCodeForRustRti(String numberOfFederates) {
String launchCodeWithoutLogging =
new String("cargo run -- -i ${FEDERATION_ID} -n " + numberOfFederates + " -c init &");
return String.join(
"\n",
"echo \"#### Launching the Rust runtime infrastructure (RTI).\"",
"# The Rust RTI is started first to allow proper boot-up",
"# before federates will try to connect.",
"# The RTI will be brought back to foreground",
"# to be responsive to user inputs after all federates",
"# are launched.",
"RUST_RTI_REMOTE_PATHS=`find ~/ -name rti_remote.rs`",
"if [ \"${RUST_RTI_REMOTE_PATHS}\" = \"\" ]; then",
" git clone https://github.com/hokeun/lf-rust-rti.git",
" cd lf-rust-rti/rust/rti",
"else",
" FIRST_RUST_RTI_REMOTE_PATH=($RUST_RTI_REMOTE_PATHS)",
" FIRST_RUST_RTI_PATH=${FIRST_RUST_RTI_REMOTE_PATH[0]%/*}",
" cd ${FIRST_RUST_RTI_PATH}; cd ../",
"fi",
launchCodeWithoutLogging,
"# Store the PID of the RTI",
"RTI=$!",
"# Wait for the RTI to boot up before",
"# starting federates (this could be done by waiting for a specific output",
"# from the RTI, but here we use sleep)",
"sleep 1");
}

private String getRemoteLaunchCode(
Object host, Object target, String logFileName, String rtiLaunchString) {
return String.join(
Expand Down Expand Up @@ -590,7 +782,7 @@ private String getFedLocalLaunchCode(
private BuildConfig getBuildConfig(
FederateInstance federate, FederationFileConfig fileConfig, MessageReporter messageReporter) {
return switch (federate.targetConfig.target) {
case C, CCPP -> new CBuildConfig(federate, fileConfig, messageReporter);
case C, CCPP, RustRti -> new CBuildConfig(federate, fileConfig, messageReporter);
case Python -> new PyBuildConfig(federate, fileConfig, messageReporter);
case TS -> new TsBuildConfig(federate, fileConfig, messageReporter);
case CPP, Rust -> throw new UnsupportedOperationException();
Expand Down
Loading

0 comments on commit 48de675

Please sign in to comment.