Skip to content

Commit

Permalink
workyish tm
Browse files Browse the repository at this point in the history
  • Loading branch information
supertick committed Mar 6, 2024
1 parent 80f1034 commit 45d7f8b
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 158 deletions.
4 changes: 1 addition & 3 deletions src/main/java/org/myrobotlab/process/PythonTerminal.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public PythonTerminal(TerminalManager service, String name) throws IOException {
@Override
public String getVersion() {
try {
processCommand(getScriptCmd("python --version"));
Service.sleep(300);
return outputCapture.toString();
return processBlockingCommand(getScriptCmd("python --version"));
} catch (Exception e) {
service.error(e);
}
Expand Down
147 changes: 102 additions & 45 deletions src/main/java/org/myrobotlab/process/Terminal.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.myrobotlab.generics.SlidingWindowList;
import org.myrobotlab.logging.LoggerFactory;
import org.myrobotlab.service.TerminalManager;
import org.myrobotlab.service.Runtime;
import org.slf4j.Logger;

public class Terminal {
Expand All @@ -27,6 +29,8 @@ public class Terminal {

public boolean isRunning = false;

protected final String BOUNDARY_MARKER = "----------------terminal-cmd-boundary-7MA4YWxkTrZu0gW----------------";

/**
* executor service for managing streams
*/
Expand All @@ -45,7 +49,8 @@ public class Terminal {
/**
* output buffer
*/
protected StringBuilder outputCapture = new StringBuilder();
// protected List<String> output = new SlidingWindowList<>(300);
protected StringBuilder output = new StringBuilder();

/**
* The pid of the sub process
Expand All @@ -72,12 +77,31 @@ public class Terminal {
*/
protected String shellCommand = null;

/**
* For synchronous output
*/
private transient BlockingQueue<String> blockingOutputQueue = new LinkedBlockingQueue<>();

/**
* The directory where the interactive shell will do its work, where the
* process will start
*/
protected String workspace = ".";

/**
* last command processed
*/
protected String lastCmd = null;


public static class TerminalCmd {
public long ts = System.currentTimeMillis();
public String src;
public String terminal;
public String cmd;
}


public Terminal(TerminalManager service, String name) {
// can increase to handle more input
this.executorService = Executors.newFixedThreadPool(3);
Expand All @@ -86,15 +110,24 @@ public Terminal(TerminalManager service, String name) {
}

public void clearOutput() {
outputCapture = new StringBuilder();
// output = new SlidingWindowList<>(300);
output = new StringBuilder();
}

private String determineShellCommand() {
String osName = System.getProperty("os.name").toLowerCase();
if (osName.contains("win")) {
return "cmd";
} else {
return "/bin/sh"; // Works for Unix/Linux/Mac
// return "/bin/sh"; // Works for Unix/Linux/Mac
String bashPath = "/bin/bash";
File bashFile = new File(bashPath);
if (bashFile.exists()) {
return bashPath;
} else {
// Fallback to sh if Bash is not found (less ideal)
return "/bin/sh";
}
}
}

Expand Down Expand Up @@ -124,12 +157,6 @@ public void processAndWait(String command) throws IOException {
* </pre>
*/

public String getCapturedOutput() {
synchronized (outputCapture) {
return outputCapture.toString();
}
}

public Set<Long> getPids() {
Set<Long> scanPids = new HashSet<>();
if (process.isAlive()) {
Expand Down Expand Up @@ -194,19 +221,55 @@ public boolean isWindows() {
}

public void processCommand(String input) {
try {
if (input == null) {
input = "";
processCommand(input, false);
}

public void processCommand(String input, boolean addBoundary) {
synchronized (lock) {
try {
if (input == null) {
input = "";
}
if (process == null) {
service.error("cannot process a command when the terminal isn't started");
return;
}
String cmd = null;
if (addBoundary) {
// windows/mac echo vs linux
cmd = String.format("%s\necho %s\n", input, BOUNDARY_MARKER);
} else {
cmd = String.format("%s\n", input);
}
lastCmd = cmd;
TerminalCmd terminalCmd = new TerminalCmd();
terminalCmd.src = service.getName();
terminalCmd.terminal = name;
terminalCmd.cmd = cmd;
service.invoke("publishCmd", terminalCmd);
OutputStream outputStream = process.getOutputStream();
outputStream.write(cmd.getBytes());
outputStream.flush();
} catch (Exception e) {
service.error(e);
}
if (process == null) {
service.error("cannot process a command when the terminal isn't started");
return;
}
}

// FIXME - should be synchronized with
public String processBlockingCommand(String input) {
synchronized (lock) {
blockingOutputQueue.clear();
processCommand(input, true);
String ret = null;
try {
while (isRunning && ret == null) {
ret = blockingOutputQueue.poll(100, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
service.error(e);
}
OutputStream outputStream = process.getOutputStream();
outputStream.write(String.format("%s\n", input).getBytes());
outputStream.flush();
} catch (Exception e) {
service.error(e);
return ret;
}
}

Expand Down Expand Up @@ -267,19 +330,29 @@ public void start(String workspace) {
private void startStreamGobbler(InputStream inputStream, String streamName) {
executorService.submit(() -> {
try {
byte[] buffer = new byte[1024]; // Adjust size as needed
byte[] buffer = new byte[8192]; // Adjust size as needed
int length;
StringBuilder dynamicBuffer = new StringBuilder();
while ((length = inputStream.read(buffer)) != -1) {
String text = new String(buffer, 0, length);
// Synchronize writing to the outputCapture to ensure thread safety
synchronized (outputCapture) {
System.out.print(text); // Print the text as it comes without
// waiting for a new line
outputCapture.append(text); // Append the text to the output capture
// asynchronous publishing of all stdout
service.invoke("publishLog", name, text);
service.invoke("publishStdOut", text);
output.append(text);
dynamicBuffer.append(text);
System.out.print(text);
if (dynamicBuffer.toString().contains(BOUNDARY_MARKER)) {
// Boundary marker found, handle command completion here
System.out.println("Command execution completed.");
// Remove the boundary marker from the output buffer
int index = dynamicBuffer.indexOf(BOUNDARY_MARKER);
dynamicBuffer.delete(index, index + BOUNDARY_MARKER.length());
blockingOutputQueue.add(dynamicBuffer.toString());
dynamicBuffer = new StringBuilder();
}
}
} catch (IOException e) {
e.printStackTrace();
service.error(e);
}
});
}
Expand Down Expand Up @@ -324,20 +397,4 @@ public int waitForCompletion() throws InterruptedException {
return process.exitValue();
}

public static void main(String[] args) {
try {
TerminalManager processor = (TerminalManager) Runtime.start("processor", "ManagedProcess");
Terminal shell = new Terminal(processor, "basic tty");
shell.start();
// Example usage of the new method if you want to process a list of
// commands
List<String> commands = Arrays.asList("echo Hello", "ls");
shell.processCommands(commands);
int exitCode = shell.waitForCompletion();
System.out.println("Shell exited with code: " + exitCode);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}

}
Loading

0 comments on commit 45d7f8b

Please sign in to comment.