Skip to content

Commit

Permalink
Added proper SIGTERM/SIGKILL handling for sub-processes (#286)
Browse files Browse the repository at this point in the history
Fixes #284.

The idea is to introduce a new flag to both `os.proc.call` and
`os.ProcGroup.call`, as well as backing implementations in
`os.ProcessLike`. The flag, `timeoutGracePeriod`, allows for a
configurable use of `SIGKILL`, which forcibly kills a process, after the
issue of a `SIGTERM`, which allows a process to clean up. The values for
the flag have the following behaviours:

|value | behaviour|
|------|-----------|
| -1 | When `timeout != -1`, only a `SIGTERM` will be issued, requiring
the child process to gracefully terminate. |
| 0 | When `timeout != -1`, only a `SIGKILL` is issued, demanding the
child terminate immediately with no chance of graceful cleanup. |
| n > 0 | `timeout != -1`, first issue a `SIGTERM` and then wait for a
further `n` milliseconds before issuing a `SIGKILL`, this provides a
reasonable timeframe for process cleanup, but accounts for misbehaving
processes. |

For now, the default has been set to `timeoutGracePeriod = 1000`.
  • Loading branch information
j-mie6 authored Aug 5, 2024
1 parent 6ab4c65 commit 6609238
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 69 deletions.
4 changes: 3 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ trait SafeDeps extends ScalaModule {
trait MiMaChecks extends Mima {
def mimaPreviousVersions = Seq("0.9.0", "0.9.1", "0.9.2", "0.9.3", "0.10.0")
override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq(
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs")
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"),
// this is fine, because ProcessLike is sealed (and its subclasses should be final)
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.joinPumperThreadsHook")
)
}

Expand Down
136 changes: 103 additions & 33 deletions os/src/ProcessOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,25 @@ case class proc(command: Shellable*) {
* `call` provides a number of parameters that let you configure how the subprocess
* is run:
*
* @param cwd the working directory of the subprocess
* @param env any additional environment variables you wish to set in the subprocess
* @param stdin any data you wish to pass to the subprocess's standard input
* @param stdout How the process's output stream is configured.
* @param stderr How the process's error stream is configured.
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
* @param timeout how long to wait in milliseconds for the subprocess to complete
* @param check disable this to avoid throwing an exception if the subprocess
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param cwd the working directory of the subprocess
* @param env any additional environment variables you wish to set in the subprocess
* @param stdin any data you wish to pass to the subprocess's standard input
* @param stdout How the process's output stream is configured.
* @param stderr How the process's error stream is configured.
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
* @param timeout how long to wait in milliseconds for the subprocess to complete
* (-1 for no timeout)
* @param check disable this to avoid throwing an exception if the subprocess
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
*
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
def call(
cwd: Path = null,
Expand All @@ -66,7 +74,9 @@ case class proc(command: Shellable*) {
mergeErrIntoOut: Boolean = false,
timeout: Long = -1,
check: Boolean = true,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
): CommandResult = {

val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
Expand All @@ -87,14 +97,38 @@ case class proc(command: Shellable*) {
propagateEnv
)

sub.join(timeout)
sub.join(timeout, timeoutGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq)
if (res.exitCode == 0 || !check) res
else throw SubprocessException(res)
}

// forwarder for the new timeoutGracePeriod flag
private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
timeoutGracePeriod = 100
)

/**
* The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures
* and starts a subprocess, and returns it as a `java.lang.Process` for you to
Expand Down Expand Up @@ -181,24 +215,31 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
* `call` provides a number of parameters that let you configure how the pipeline
* is run:
*
* @param cwd the working directory of the pipeline
* @param env any additional environment variables you wish to set in the pipeline
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
* @param stdout How the pipelines's output stream is configured (the last process stdout)
* @param stderr How the process's error stream is configured (set for all processes)
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
* @param timeout how long to wait in milliseconds for the pipeline to complete
* @param check disable this to avoid throwing an exception if the pipeline
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the pipeline
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
* failing process. If no process fails, the exit code will be 0.
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param cwd the working directory of the pipeline
* @param env any additional environment variables you wish to set in the pipeline
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
* @param stdout How the pipelines's output stream is configured (the last process stdout)
* @param stderr How the process's error stream is configured (set for all processes)
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
* @param timeout how long to wait in milliseconds for the pipeline to complete
* @param check disable this to avoid throwing an exception if the pipeline
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the pipeline
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
* failing process. If no process fails, the exit code will be 0.
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
*
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
def call(
cwd: Path = null,
Expand All @@ -211,7 +252,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
check: Boolean = true,
propagateEnv: Boolean = true,
pipefail: Boolean = true,
handleBrokenPipe: Boolean = !isWindows
handleBrokenPipe: Boolean = !isWindows,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
): CommandResult = {
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]

Expand All @@ -232,7 +275,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail
)

sub.join(timeout)
sub.join(timeout, timeoutGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res =
Expand All @@ -241,6 +284,33 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
else throw SubprocessException(res)
}

private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
pipefail: Boolean,
handleBrokenPipe: Boolean
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
pipefail,
handleBrokenPipe,
timeoutGracePeriod = 100
)

/**
* The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes,
* and returns a [[ProcessPipeline]] for you to interact with however you like.
Expand Down
101 changes: 66 additions & 35 deletions os/src/SubProcess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,55 @@ sealed trait ProcessLike extends java.lang.AutoCloseable {
* Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr
* from the subprocess to be handled. By default waits indefinitely; if a time
* limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by
* the time the timeout has occurred
* the time the timeout has occurred.
*
* By default, a process is destroyed by sending a `SIGTERM` signal, which allows an opportunity
* for it to clean up any resources it was using. If the process is unresponsive to this, a
* `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is
* `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent.
*
* @returns `true` when the process did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise.
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = {
val exitedCleanly = waitFor(timeout)
if (!exitedCleanly) {
assume(
timeout != -1,
"if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable"
)
if (timeoutGracePeriod == -1) destroy()
else if (timeoutGracePeriod == 0) destroyForcibly()
else {
destroy()
if (!waitFor(timeoutGracePeriod)) {
destroyForcibly()
}
}
waitFor(-1)
}
joinPumperThreadsHook()
exitedCleanly
}

@deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4")
private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 100)

/**
* A hook method used by `join` to close the input and output streams associated with the process, not for public consumption.
*/
def join(timeout: Long = -1): Boolean
private[os] def joinPumperThreadsHook(): Unit
}

/**
* Represents a spawn subprocess that has started and may or may not have
* completed.
*/
@deprecatedInheritance(
"this class will be made final: if you are using it be aware that `join` has a new overloading",
"0.10.4"
)
class SubProcess(
val wrapped: java.lang.Process,
val inputPumperThread: Option[Thread],
Expand Down Expand Up @@ -114,22 +154,9 @@ class SubProcess(
}
}

/**
* Wait up to `millis` for the subprocess to terminate and all stdout and stderr
* from the subprocess to be handled. By default waits indefinitely; if a time
* limit is given, explicitly destroys the subprocess if it has not completed by
* the time the timeout has occurred
*/
def join(timeout: Long = -1): Boolean = {
val exitedCleanly = waitFor(timeout)
if (!exitedCleanly) {
destroy()
destroyForcibly()
waitFor(-1)
}
private[os] def joinPumperThreadsHook(): Unit = {
outputPumperThread.foreach(_.join())
errorPumperThread.foreach(_.join())
exitedCleanly
}
}

Expand Down Expand Up @@ -222,6 +249,10 @@ object SubProcess {
}
}

@deprecatedInheritance(
"this class will be made final: if you are using it be aware that `join` has a new overloading",
"0.10.4"
)
class ProcessPipeline(
val processes: Seq[SubProcess],
pipefail: Boolean,
Expand Down Expand Up @@ -312,12 +343,12 @@ class ProcessPipeline(
}

/**
* Wait up to `millis` for the [[ProcessPipeline]] to terminate, by default waits
* Wait up to `timeout` for the [[ProcessPipeline]] to terminate, by default waits
* indefinitely. Returns `true` if the [[ProcessPipeline]] has terminated by the time
* this method returns.
*
* Waits for each process one by one, while aggregating the total time waited. If
* [[timeout]] has passed before all processes have terminated, returns `false`.
* `timeout` has passed before all processes have terminated, returns `false`.
*/
override def waitFor(timeout: Long = -1): Boolean = {
@tailrec
Expand All @@ -340,28 +371,28 @@ class ProcessPipeline(
}

/**
* Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes
* Wait up to `timeout` for the [[ProcessPipeline]] to terminate all the processes
* in pipeline. By default waits indefinitely; if a time limit is given, explicitly
* destroys each process if it has not completed by the time the timeout has occurred.
*
* By default, the processes are destroyed by sending `SIGTERM` signals, which allows an opportunity
* for them to clean up any resources it. If any process is unresponsive to this, a
* `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is
* `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent.
*
* @returns `true` when the processes did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise.
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
override def join(timeout: Long = -1): Boolean = {
@tailrec
def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean =
processesLeft match {
case Nil => result
case head :: tail =>
val elapsed = System.currentTimeMillis() - startedAt
val timeoutLeft = Math.max(0, timeout - elapsed)
val exitedCleanly = head.join(timeoutLeft)
joinRec(startedAt, tail, result && exitedCleanly)
}

override def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = {
// in this case, the grace period does not apply, so fine
if (timeout == -1) {
processes.forall(_.join())
} else {
val timeNow = System.currentTimeMillis()
joinRec(timeNow, processes, true)
}
} else super.join(timeout, timeoutGracePeriod)
}

private[os] def joinPumperThreadsHook(): Unit = {
processes.foreach(_.joinPumperThreadsHook())
}
}

Expand Down

0 comments on commit 6609238

Please sign in to comment.