From 6609238179c46c5230741d4dee000346070fb0bf Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Mon, 5 Aug 2024 14:14:37 +0100 Subject: [PATCH] Added proper SIGTERM/SIGKILL handling for sub-processes (#286) Fixes #284. The idea is to introduce a new flag to both `os.proc.call` and `os.ProcGroup.call`, as well as backing implementations in `os.ProcessLike`. The flag, `timeoutGracePeriod`, allows for a configurable use of `SIGKILL`, which forcibly kills a process, after the issue of a `SIGTERM`, which allows a process to clean up. The values for the flag have the following behaviours: |value | behaviour| |------|-----------| | -1 | When `timeout != -1`, only a `SIGTERM` will be issued, requiring the child process to gracefully terminate. | | 0 | When `timeout != -1`, only a `SIGKILL` is issued, demanding the child terminate immediately with no chance of graceful cleanup. | | n > 0 | `timeout != -1`, first issue a `SIGTERM` and then wait for a further `n` milliseconds before issuing a `SIGKILL`, this provides a reasonable timeframe for process cleanup, but accounts for misbehaving processes. | For now, the default has been set to `timeoutGracePeriod = 1000`. --- build.sc | 4 +- os/src/ProcessOps.scala | 136 ++++++++++++++++++++++++++++++---------- os/src/SubProcess.scala | 101 ++++++++++++++++++----------- 3 files changed, 172 insertions(+), 69 deletions(-) diff --git a/build.sc b/build.sc index f57b243b..b619614e 100644 --- a/build.sc +++ b/build.sc @@ -53,7 +53,9 @@ trait SafeDeps extends ScalaModule { trait MiMaChecks extends Mima { def mimaPreviousVersions = Seq("0.9.0", "0.9.1", "0.9.2", "0.9.3", "0.10.0") override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq( - ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs") + ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"), + // this is fine, because ProcessLike is sealed (and its subclasses should be final) + ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.joinPumperThreadsHook") ) } diff --git a/os/src/ProcessOps.scala b/os/src/ProcessOps.scala index 5983aaa8..fa8cc973 100644 --- a/os/src/ProcessOps.scala +++ b/os/src/ProcessOps.scala @@ -45,17 +45,25 @@ case class proc(command: Shellable*) { * `call` provides a number of parameters that let you configure how the subprocess * is run: * - * @param cwd the working directory of the subprocess - * @param env any additional environment variables you wish to set in the subprocess - * @param stdin any data you wish to pass to the subprocess's standard input - * @param stdout How the process's output stream is configured. - * @param stderr How the process's error stream is configured. - * @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout - * @param timeout how long to wait in milliseconds for the subprocess to complete - * @param check disable this to avoid throwing an exception if the subprocess - * fails with a non-zero exit code - * @param propagateEnv disable this to avoid passing in this parent process's - * environment variables to the subprocess + * @param cwd the working directory of the subprocess + * @param env any additional environment variables you wish to set in the subprocess + * @param stdin any data you wish to pass to the subprocess's standard input + * @param stdout How the process's output stream is configured. + * @param stderr How the process's error stream is configured. + * @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout + * @param timeout how long to wait in milliseconds for the subprocess to complete + * (-1 for no timeout) + * @param check disable this to avoid throwing an exception if the subprocess + * fails with a non-zero exit code + * @param propagateEnv disable this to avoid passing in this parent process's + * environment variables to the subprocess + * @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the + * subprocess to gracefully terminate before attempting to + * forcibly kill it + * (-1 for no kill, 0 for always kill immediately) + * + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ def call( cwd: Path = null, @@ -66,7 +74,9 @@ case class proc(command: Shellable*) { mergeErrIntoOut: Boolean = false, timeout: Long = -1, check: Boolean = true, - propagateEnv: Boolean = true + propagateEnv: Boolean = true, + // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) + timeoutGracePeriod: Long = 100 ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -87,7 +97,7 @@ case class proc(command: Shellable*) { propagateEnv ) - sub.join(timeout) + sub.join(timeout, timeoutGracePeriod) val chunksSeq = chunks.iterator.asScala.toIndexedSeq val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq) @@ -95,6 +105,30 @@ case class proc(command: Shellable*) { else throw SubprocessException(res) } + // forwarder for the new timeoutGracePeriod flag + private[os] def call( + cwd: Path, + env: Map[String, String], + stdin: ProcessInput, + stdout: ProcessOutput, + stderr: ProcessOutput, + mergeErrIntoOut: Boolean, + timeout: Long, + check: Boolean, + propagateEnv: Boolean + ): CommandResult = call( + cwd, + env, + stdin, + stdout, + stderr, + mergeErrIntoOut, + timeout, + check, + propagateEnv, + timeoutGracePeriod = 100 + ) + /** * The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures * and starts a subprocess, and returns it as a `java.lang.Process` for you to @@ -181,24 +215,31 @@ case class ProcGroup private[os] (commands: Seq[proc]) { * `call` provides a number of parameters that let you configure how the pipeline * is run: * - * @param cwd the working directory of the pipeline - * @param env any additional environment variables you wish to set in the pipeline - * @param stdin any data you wish to pass to the pipelines's standard input (to the first process) - * @param stdout How the pipelines's output stream is configured (the last process stdout) - * @param stderr How the process's error stream is configured (set for all processes) - * @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the - * stderr will be forwarded with stdout to subsequent processes in the pipeline. - * @param timeout how long to wait in milliseconds for the pipeline to complete - * @param check disable this to avoid throwing an exception if the pipeline - * fails with a non-zero exit code - * @param propagateEnv disable this to avoid passing in this parent process's - * environment variables to the pipeline - * @param pipefail if true, the pipeline's exitCode will be the exit code of the first - * failing process. If no process fails, the exit code will be 0. - * @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process - * will be caught and handled by killing the writing process. This behaviour - * is consistent with handlers of SIGPIPE signals in most programs - * supporting interruptable piping. Disabled by default on Windows. + * @param cwd the working directory of the pipeline + * @param env any additional environment variables you wish to set in the pipeline + * @param stdin any data you wish to pass to the pipelines's standard input (to the first process) + * @param stdout How the pipelines's output stream is configured (the last process stdout) + * @param stderr How the process's error stream is configured (set for all processes) + * @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the + * stderr will be forwarded with stdout to subsequent processes in the pipeline. + * @param timeout how long to wait in milliseconds for the pipeline to complete + * @param check disable this to avoid throwing an exception if the pipeline + * fails with a non-zero exit code + * @param propagateEnv disable this to avoid passing in this parent process's + * environment variables to the pipeline + * @param pipefail if true, the pipeline's exitCode will be the exit code of the first + * failing process. If no process fails, the exit code will be 0. + * @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process + * will be caught and handled by killing the writing process. This behaviour + * is consistent with handlers of SIGPIPE signals in most programs + * supporting interruptable piping. Disabled by default on Windows. + * @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the + * subprocess to gracefully terminate before attempting to + * forcibly kill it + * (-1 for no kill, 0 for always kill immediately) + * + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ def call( cwd: Path = null, @@ -211,7 +252,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) { check: Boolean = true, propagateEnv: Boolean = true, pipefail: Boolean = true, - handleBrokenPipe: Boolean = !isWindows + handleBrokenPipe: Boolean = !isWindows, + // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) + timeoutGracePeriod: Long = 100 ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -232,7 +275,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { pipefail ) - sub.join(timeout) + sub.join(timeout, timeoutGracePeriod) val chunksSeq = chunks.iterator.asScala.toIndexedSeq val res = @@ -241,6 +284,33 @@ case class ProcGroup private[os] (commands: Seq[proc]) { else throw SubprocessException(res) } + private[os] def call( + cwd: Path, + env: Map[String, String], + stdin: ProcessInput, + stdout: ProcessOutput, + stderr: ProcessOutput, + mergeErrIntoOut: Boolean, + timeout: Long, + check: Boolean, + propagateEnv: Boolean, + pipefail: Boolean, + handleBrokenPipe: Boolean + ): CommandResult = call( + cwd, + env, + stdin, + stdout, + stderr, + mergeErrIntoOut, + timeout, + check, + propagateEnv, + pipefail, + handleBrokenPipe, + timeoutGracePeriod = 100 + ) + /** * The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes, * and returns a [[ProcessPipeline]] for you to interact with however you like. diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index 61961619..075b2a83 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -53,15 +53,55 @@ sealed trait ProcessLike extends java.lang.AutoCloseable { * Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr * from the subprocess to be handled. By default waits indefinitely; if a time * limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by - * the time the timeout has occurred + * the time the timeout has occurred. + * + * By default, a process is destroyed by sending a `SIGTERM` signal, which allows an opportunity + * for it to clean up any resources it was using. If the process is unresponsive to this, a + * `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is + * `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent. + * + * @returns `true` when the process did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise. + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. + */ + def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = { + val exitedCleanly = waitFor(timeout) + if (!exitedCleanly) { + assume( + timeout != -1, + "if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable" + ) + if (timeoutGracePeriod == -1) destroy() + else if (timeoutGracePeriod == 0) destroyForcibly() + else { + destroy() + if (!waitFor(timeoutGracePeriod)) { + destroyForcibly() + } + } + waitFor(-1) + } + joinPumperThreadsHook() + exitedCleanly + } + + @deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4") + private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 100) + + /** + * A hook method used by `join` to close the input and output streams associated with the process, not for public consumption. */ - def join(timeout: Long = -1): Boolean + private[os] def joinPumperThreadsHook(): Unit } /** * Represents a spawn subprocess that has started and may or may not have * completed. */ +@deprecatedInheritance( + "this class will be made final: if you are using it be aware that `join` has a new overloading", + "0.10.4" +) class SubProcess( val wrapped: java.lang.Process, val inputPumperThread: Option[Thread], @@ -114,22 +154,9 @@ class SubProcess( } } - /** - * Wait up to `millis` for the subprocess to terminate and all stdout and stderr - * from the subprocess to be handled. By default waits indefinitely; if a time - * limit is given, explicitly destroys the subprocess if it has not completed by - * the time the timeout has occurred - */ - def join(timeout: Long = -1): Boolean = { - val exitedCleanly = waitFor(timeout) - if (!exitedCleanly) { - destroy() - destroyForcibly() - waitFor(-1) - } + private[os] def joinPumperThreadsHook(): Unit = { outputPumperThread.foreach(_.join()) errorPumperThread.foreach(_.join()) - exitedCleanly } } @@ -222,6 +249,10 @@ object SubProcess { } } +@deprecatedInheritance( + "this class will be made final: if you are using it be aware that `join` has a new overloading", + "0.10.4" +) class ProcessPipeline( val processes: Seq[SubProcess], pipefail: Boolean, @@ -312,12 +343,12 @@ class ProcessPipeline( } /** - * Wait up to `millis` for the [[ProcessPipeline]] to terminate, by default waits + * Wait up to `timeout` for the [[ProcessPipeline]] to terminate, by default waits * indefinitely. Returns `true` if the [[ProcessPipeline]] has terminated by the time * this method returns. * * Waits for each process one by one, while aggregating the total time waited. If - * [[timeout]] has passed before all processes have terminated, returns `false`. + * `timeout` has passed before all processes have terminated, returns `false`. */ override def waitFor(timeout: Long = -1): Boolean = { @tailrec @@ -340,28 +371,28 @@ class ProcessPipeline( } /** - * Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes + * Wait up to `timeout` for the [[ProcessPipeline]] to terminate all the processes * in pipeline. By default waits indefinitely; if a time limit is given, explicitly * destroys each process if it has not completed by the time the timeout has occurred. + * + * By default, the processes are destroyed by sending `SIGTERM` signals, which allows an opportunity + * for them to clean up any resources it. If any process is unresponsive to this, a + * `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is + * `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent. + * + * @returns `true` when the processes did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise. + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ - override def join(timeout: Long = -1): Boolean = { - @tailrec - def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean = - processesLeft match { - case Nil => result - case head :: tail => - val elapsed = System.currentTimeMillis() - startedAt - val timeoutLeft = Math.max(0, timeout - elapsed) - val exitedCleanly = head.join(timeoutLeft) - joinRec(startedAt, tail, result && exitedCleanly) - } - + override def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = { + // in this case, the grace period does not apply, so fine if (timeout == -1) { processes.forall(_.join()) - } else { - val timeNow = System.currentTimeMillis() - joinRec(timeNow, processes, true) - } + } else super.join(timeout, timeoutGracePeriod) + } + + private[os] def joinPumperThreadsHook(): Unit = { + processes.foreach(_.joinPumperThreadsHook()) } }