Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to register shutdown hooks for os.call and os.spawn APIs, overhaul destroy APIs #324

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Readme.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,9 @@ os.call(cmd: os.Shellable,
mergeErrIntoOut: Boolean = false,
timeout: Long = Long.MaxValue,
check: Boolean = true,
propagateEnv: Boolean = true): os.CommandResult
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the time unit used for shutdownGracePeriod? Should we name it shutdownGracePeriodMsec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably leave it as is, the scaladoc says it's in milliseconds so that should be enough

shutdownHook: Boolean = true): os.CommandResult
----

_Also callable via `os.proc(cmd).call(...)`_
Expand Down Expand Up @@ -1853,7 +1855,9 @@ os.spawn(cmd: os.Shellable,
stdout: os.ProcessOutput = os.Pipe,
stderr: os.ProcessOutput = os.Pipe,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true): os.SubProcess
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
shutdownHook: Boolean = true): os.SubProcess
----

_Also callable via `os.proc(cmd).spawn(...)`_
Expand Down
10 changes: 9 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,21 @@ object os extends Module {
def forkEnv = super.forkEnv() ++ Map(
"TEST_JAR_WRITER_ASSEMBLY" -> testJarWriter.assembly().path.toString,
"TEST_JAR_READER_ASSEMBLY" -> testJarReader.assembly().path.toString,
"TEST_JAR_EXIT_ASSEMBLY" -> testJarExit.assembly().path.toString
"TEST_JAR_EXIT_ASSEMBLY" -> testJarExit.assembly().path.toString,
"TEST_SPAWN_EXIT_HOOK_ASSEMBLY" -> testSpawnExitHook.assembly().path.toString,
"TEST_SPAWN_EXIT_HOOK_ASSEMBLY2" -> testSpawnExitHook2.assembly().path.toString
)

object testJarWriter extends JavaModule
object testJarReader extends JavaModule
object testJarExit extends JavaModule
object testSpawnExitHook extends ScalaModule{
def scalaVersion = OsJvmModule.this.scalaVersion()
def moduleDeps = Seq(OsJvmModule.this)
}
object testSpawnExitHook2 extends JavaModule
}

object nohometest extends ScalaTests with OsLibTestModule
}

Expand Down
162 changes: 145 additions & 17 deletions os/src/ProcessOps.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package os

import java.util.concurrent.{ArrayBlockingQueue, Semaphore, TimeUnit}
import collection.JavaConverters._
import scala.annotation.tailrec
import java.lang.ProcessBuilder.Redirect
import os.SubProcess.InputStream
import java.io.IOException
import java.util.concurrent.LinkedBlockingQueue
import ProcessOps._
import scala.util.Try

object call {

Expand All @@ -28,7 +25,8 @@ object call {
timeout: Long = -1,
check: Boolean = true,
propagateEnv: Boolean = true,
timeoutGracePeriod: Long = 100
shutdownGracePeriod: Long = 100,
shutdownHook: Boolean = true
): CommandResult = {
os.proc(cmd).call(
cwd = cwd,
Expand All @@ -40,7 +38,38 @@ object call {
timeout = timeout,
check = check,
propagateEnv = propagateEnv,
timeoutGracePeriod = timeoutGracePeriod
shutdownGracePeriod = shutdownGracePeriod,
shutdownHook = shutdownHook
)
}
def apply(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this one is only for bin-compat, so we should deprecate it for removal.

cmd: Shellable,
env: Map[String, String],
// Make sure `cwd` only comes after `env`, so `os.call("foo", path)` is a compile error
// since the correct syntax is `os.call(("foo", path))`
cwd: Path,
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
timeoutGracePeriod: Long
): CommandResult = {
call(
cmd = cmd,
cwd = cwd,
env = env,
stdin = stdin,
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
timeout = timeout,
check = check,
propagateEnv = propagateEnv,
shutdownGracePeriod = timeoutGracePeriod,
shutdownHook = false
)
}
}
Expand All @@ -59,7 +88,9 @@ object spawn {
stdout: ProcessOutput = Pipe,
stderr: ProcessOutput = os.Inherit,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
shutdownHook: Boolean = true
): SubProcess = {
os.proc(cmd).spawn(
cwd = cwd,
Expand All @@ -68,7 +99,34 @@ object spawn {
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
propagateEnv = propagateEnv
propagateEnv = propagateEnv,
shutdownGracePeriod = shutdownGracePeriod,
shutdownHook = shutdownHook
)
}
def apply(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only provided for bin-compat? Shouldn't we deprecate it therefore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had issues with @deprecated things being accidentally resolved here before, but will add a comment like the other forwarders

cmd: Shellable,
// Make sure `cwd` only comes after `env`, so `os.spawn("foo", path)` is a compile error
// since the correct syntax is `os.spawn(("foo", path))`
env: Map[String, String],
cwd: Path,
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
propagateEnv: Boolean
): SubProcess = {
spawn(
cmd = cmd,
cwd = cwd,
env = env,
stdin = stdin,
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
propagateEnv = propagateEnv,
shutdownGracePeriod = 100,
shutdownHook = false
)
}
}
Expand Down Expand Up @@ -119,7 +177,7 @@ case class proc(command: Shellable*) {
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* @param shutdownGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
Expand All @@ -138,7 +196,8 @@ case class proc(command: Shellable*) {
check: Boolean = true,
propagateEnv: Boolean = true,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
shutdownGracePeriod: Long = 100,
shutdownHook: Boolean = true
): CommandResult = {

val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
Expand All @@ -159,7 +218,7 @@ case class proc(command: Shellable*) {
propagateEnv
)

sub.join(timeout, timeoutGracePeriod)
sub.join(timeout, shutdownGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq)
Expand Down Expand Up @@ -188,7 +247,32 @@ case class proc(command: Shellable*) {
timeout,
check,
propagateEnv,
timeoutGracePeriod = 100
shutdownGracePeriod = 100
)

private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
timeoutGracePeriod: Long
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
timeoutGracePeriod,
shutdownHook = false
)

/**
Expand All @@ -208,7 +292,9 @@ case class proc(command: Shellable*) {
stdout: ProcessOutput = Pipe,
stderr: ProcessOutput = os.Inherit,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
shutdownGracePeriod: Long = 100,
shutdownHook: Boolean = true
): SubProcess = {

val cmdChunks = commandChunks
Expand All @@ -230,19 +316,61 @@ case class proc(command: Shellable*) {
propagateEnv
)

lazy val shutdownHookThread =
if (!shutdownHook) None
else Some(new Thread("subprocess-shutdown-hook") {
override def run(): Unit = proc.destroy(shutdownGracePeriod)
})

lazy val shutdownHookMonitorThread = shutdownHookThread.map(t =>
new Thread("subprocess-shutdown-hook-monitor") {
override def run(): Unit = {
while (proc.wrapped.isAlive) Thread.sleep(1)
try Runtime.getRuntime().removeShutdownHook(t)
catch { case e: Throwable => /*do nothing*/ }
}
}
)

shutdownHookThread.foreach(Runtime.getRuntime().addShutdownHook)

lazy val proc: SubProcess = new SubProcess(
builder.start(),
resolvedStdin.processInput(proc.stdin).map(new Thread(_, commandStr + " stdin thread")),
resolvedStdout.processOutput(proc.stdout).map(new Thread(_, commandStr + " stdout thread")),
resolvedStderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread"))
resolvedStderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread")),
shutdownGracePeriod = shutdownGracePeriod,
shutdownHookMonitorThread = shutdownHookMonitorThread
)

shutdownHookMonitorThread.foreach(_.start())

proc.inputPumperThread.foreach(_.start())
proc.outputPumperThread.foreach(_.start())
proc.errorPumperThread.foreach(_.start())
proc
}

def spawn(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
propagateEnv: Boolean
): SubProcess = spawn(
cwd = cwd,
env = env,
stdin = stdin,
stdout = stdout,
stderr = stderr,
mergeErrIntoOut = mergeErrIntoOut,
propagateEnv = propagateEnv,
shutdownGracePeriod = 100,
shutdownHook = false
)

/**
* Pipes the output of this process into the input of the [[next]] process. Returns a
* [[ProcGroup]] containing both processes, which you can then either execute or
Expand Down Expand Up @@ -295,7 +423,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* @param shutdownGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
Expand All @@ -316,7 +444,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail: Boolean = true,
handleBrokenPipe: Boolean = !isWindows,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
shutdownGracePeriod: Long = 100
): CommandResult = {
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]

Expand All @@ -337,7 +465,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail
)

sub.join(timeout, timeoutGracePeriod)
sub.join(timeout, shutdownGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res =
Expand Down Expand Up @@ -370,7 +498,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
propagateEnv,
pipefail,
handleBrokenPipe,
timeoutGracePeriod = 100
shutdownGracePeriod = 100
)

/**
Expand Down
49 changes: 45 additions & 4 deletions os/src/SubProcess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,23 @@ class SubProcess(
val wrapped: java.lang.Process,
val inputPumperThread: Option[Thread],
val outputPumperThread: Option[Thread],
val errorPumperThread: Option[Thread]
val errorPumperThread: Option[Thread],
val shutdownGracePeriod: Long,
val shutdownHookMonitorThread: Option[Thread]
) extends ProcessLike {
def this(
wrapped: java.lang.Process,
inputPumperThread: Option[Thread],
outputPumperThread: Option[Thread],
errorPumperThread: Option[Thread]
) = this(
wrapped,
inputPumperThread,
outputPumperThread,
errorPumperThread,
100,
None
)
val stdin: SubProcess.InputStream = new SubProcess.InputStream(wrapped.getOutputStream)
val stdout: SubProcess.OutputStream = new SubProcess.OutputStream(wrapped.getInputStream)
val stderr: SubProcess.OutputStream = new SubProcess.OutputStream(wrapped.getErrorStream)
Expand All @@ -128,12 +143,38 @@ class SubProcess(
/**
* Attempt to destroy the subprocess (gently), via the underlying JVM APIs
*/
def destroy(): Unit = wrapped.destroy()
@deprecated("Use destroy(shutdownGracePeriod = Long.MaxValue)")
def destroy(): Unit = destroy(shutdownGracePeriod = Long.MaxValue)

/**
* Force-destroys the subprocess, via the underlying JVM APIs
* Destroys the subprocess, via the underlying JVM APIs, with configurable levels of
* aggressiveness:
*
* @param async set this to `true` if you do not want to wait on the subprocess exiting
* @param shutdownGracePeriod use this to override the default wait time for the subprocess
* to gracefully exit before destroying it forcibly. Defaults to the `shutdownGracePeriod`
* that was used to spawned the process, but can be set to 0
* (i.e. force exit immediately) or Long.MaxValue (i.e. never force exit)
* or anything in between. Typically defaults to 100 milliseconds
*/
def destroyForcibly(): Unit = wrapped.destroyForcibly()
def destroy(
shutdownGracePeriod: Long = this.shutdownGracePeriod,
async: Boolean = false
): Unit = {
wrapped.destroy()
if (!async) {
val now = System.currentTimeMillis()

while (wrapped.isAlive && System.currentTimeMillis() - now < shutdownGracePeriod) {
Thread.sleep(1)
}

if (wrapped.isAlive) wrapped.destroyForcibly()
}
}

@deprecated("Use destroy(shutdownGracePeriod = 0)")
def destroyForcibly(): Unit = destroy(shutdownGracePeriod = 0)

/**
* Alias for [[destroy]]
Expand Down
Loading