Skip to content

Commit

Permalink
Reland "[engine] Limit number of active subprocesses"
Browse files Browse the repository at this point in the history
This reverts commit 0dae038.

Reason for revert: There was a potential for deadlocks in the first
version of this change. Previously, the semaphore used to limit the
number of running subprocesses wouldn't get released, allowing another
subproces to start, until the subprocess completed *and* the `wait()`
Starlark function was called on the subprocess object.

This meant that it was not safe to launch a large number of subprocesses
and then call `wait()` on them in any order other than the same order in
which they were started. Otherwise the code would run the risk of
calling `wait()` on a subprocess that couldn't start until another
subprocess had `wait()` called on it, leading to a deadlock.

Now the semaphore gets released by a separate goroutine immediately
after the subprocess completes, even if the Starlark code hasn't called
`wait()` yet, making this class of deadlock impossible.

Original change's description:
> Revert "[engine] Limit number of active subprocesses"
>
> This reverts commit 612779b.
>
> Reason for revert: `shac check --all` is hanging in large repos:
> https://ci.chromium.org/b/8767597391211255425
>
> Original change's description:
> > [engine] Limit number of active subprocesses
> >
> > Previously the number of concurrent subprocess invocations launched by
> > `ctx.os.exec()` was unbounded, which could place a strain on the system.
> > Now there's effectively a pool of NumCPU+2 workers for running
> > subprocesses.
> >
> > `ctx.os.exec()` returns immediately, but the underlying subprocess is
> > started asynchronously.
> >
> > `ba -against main` showed no significant difference in the results of
> > the `ctx.os.exec()` benchmarks.
> >
> > Change-Id: I76e4542249783c9a503f0f927e327e9f90f8bb04
> > Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/867979
> > Reviewed-by: Ina Huh <[email protected]>
> > Commit-Queue: Oliver Newman <[email protected]>
>
> Change-Id: Icfd3611825b1995948c856170ddc353b7ebfb1eb
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/929633
> Fuchsia-Auto-Submit: Oliver Newman <[email protected]>
> Commit-Queue: Auto-Submit <[email protected]>
> Reviewed-by: RubberStamper 🤖 <[email protected]>

Change-Id: Iefdd7aebc04d03e60f925f136a08eebc28e5bb63
Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/929654
Reviewed-by: Ina Huh <[email protected]>
Fuchsia-Auto-Submit: Oliver Newman <[email protected]>
Commit-Queue: Auto-Submit <[email protected]>
  • Loading branch information
orn688 authored and CQ Bot committed Oct 11, 2023
1 parent 9a096d6 commit 8c21bec
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 30 deletions.
10 changes: 9 additions & 1 deletion internal/engine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.starlark.net/resolve"
"go.starlark.net/starlark"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
)

Expand Down Expand Up @@ -364,6 +365,8 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
packages: packages,
}

subprocessSem := semaphore.NewWeighted(int64(runtime.NumCPU()) + 2)

var vars map[string]string

newState := func(scm scmCheckout, subdir string, idx int) (*shacState, error) {
Expand Down Expand Up @@ -404,6 +407,7 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
sandbox: sb,
scm: scm,
subdir: subdir,
subprocessSem: subprocessSem,
tmpdir: filepath.Join(tmpdir, strconv.Itoa(idx)),
writableRoot: doc.WritableRoot,
vars: vars,
Expand Down Expand Up @@ -464,7 +468,7 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
if err != nil {
return err
}
shacStates = []*shacState{state}
shacStates = append(shacStates, state)
}

// Parse the starlark files. Run everything from our errgroup.
Expand All @@ -488,6 +492,7 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
if cb == nil {
loop = false
} else {
// Actually run the check.
eg.Go(cb)
}
case <-done:
Expand Down Expand Up @@ -644,6 +649,9 @@ type shacState struct {
filter CheckFilter
passthroughEnv []*PassthroughEnv

// Limits the number of concurrent subprocesses launched by ctx.os.exec().
subprocessSem *semaphore.Weighted

// Set when fail() is called. This happens only during the first phase, thus
// no mutex is needed.
failErr *failure
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2273,7 +2273,7 @@ func TestTestDataPrint(t *testing.T) {
},
{
name: "ctx-os-exec-parallel.star",
want: strings.Repeat("[//ctx-os-exec-parallel.star:27] Hello, world\n", 10),
want: strings.Repeat("[//ctx-os-exec-parallel.star:32] Hello, world\n", 1000),
},
{
name: "ctx-os-exec-relpath.star",
Expand Down
69 changes: 46 additions & 23 deletions internal/engine/runtime_ctx_os.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type subprocess struct {
raiseOnFailure bool
okRetcodes []int
tempDir string
errs <-chan error

waitCalled bool
}
Expand Down Expand Up @@ -85,12 +86,16 @@ func (s *subprocess) wait() (starlark.Value, error) {
return nil, fmt.Errorf("wait was already called")
}
s.waitCalled = true
val, err := s.waitInner()
if err2 := s.cleanup(); err == nil {
err = err2
}
return val, err
}

defer s.cleanup()

err := s.cmd.Wait()
func (s *subprocess) waitInner() (starlark.Value, error) {
retcode := 0
if err != nil {
if err := <-s.errs; err != nil {
var errExit *exec.ExitError
if errors.As(err, &errExit) {
retcode = errExit.ExitCode()
Expand Down Expand Up @@ -127,16 +132,27 @@ func (s *subprocess) wait() (starlark.Value, error) {
}

func (s *subprocess) cleanup() error {
// Wait for the subprocess to launch before trying to kill it. s.startErrs
// gets closed after the subprocess starts, so even if the error has already
// been received by `wait()`, this receive will return due to the channel
// being closed.
<-s.errs
// Kill the process before doing any other cleanup steps to ensure resources
// are no longer in use.
err := s.cmd.Process.Kill()
// Kill() doesn't block until the process actually completes, so we need to
// wait before cleaning up resources.
_ = s.cmd.Wait()
// are no longer in use before cleaning them up.
var err error
// If Process is not nil then the command successfully started. If
// ProcessState is nil then the command hasn't yet completed.
if s.cmd.Process != nil && s.cmd.ProcessState == nil {
err = s.cmd.Process.Kill()
// Kill() is non-blocking, so it's necessary to wait for the process to
// exit before cleaning up resources.
_ = s.cmd.Wait()
}

if err2 := os.RemoveAll(s.tempDir); err == nil {
err = err2
}

buffers.push(s.stdout)
buffers.push(s.stderr)
s.stdout, s.stderr = nil, nil
Expand Down Expand Up @@ -212,15 +228,6 @@ func ctxOsExec(ctx context.Context, s *shacState, name string, args starlark.Tup
return os.RemoveAll(tempDir)
})

stdout := buffers.get()
stderr := buffers.get()

cleanupFuncs = append(cleanupFuncs, func() error {
buffers.push(stdout)
buffers.push(stderr)
return nil
})

env := map[string]string{
"PATH": os.Getenv("PATH"),
"TEMP": tempDir,
Expand Down Expand Up @@ -387,15 +394,29 @@ func ctxOsExec(ctx context.Context, s *shacState, name string, args starlark.Tup

cmd := s.sandbox.Command(ctx, config)

cmd.Stdin = stdin
stdout, stderr := buffers.get(), buffers.get()
// TODO(olivernewman): Also handle commands that may output non-utf-8 bytes.
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.Stdin = stdin

err = execsupport.Start(cmd)
if err != nil {
return nil, err
}
errs := make(chan error, 1)
go func() {
errs <- func() error {
if err := s.subprocessSem.Acquire(ctx, 1); err != nil {
return err
}
defer s.subprocessSem.Release(1)

if err := execsupport.Start(cmd); err != nil {
return err
}
return cmd.Wait()
}()
// Signals to subprocess.wait() that the subprocess is done, whether or
// not it was successful.
close(errs)
}()

proc := &subprocess{
cmd: cmd,
Expand All @@ -405,7 +426,9 @@ func ctxOsExec(ctx context.Context, s *shacState, name string, args starlark.Tup
raiseOnFailure: bool(argraiseOnFailure),
okRetcodes: okRetcodes,
tempDir: tempDir,
errs: errs,
}

// Only clean up now if starting the subprocess failed; otherwise it will
// get cleaned up by wait().
cleanupFuncs = cleanupFuncs[:0]
Expand Down
13 changes: 9 additions & 4 deletions internal/engine/testdata/print/ctx-os-exec-parallel.star
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ def cb(ctx):
else:
cmd = ["./hello_world.sh"]

procs = []
for _ in range(10):
procs.append(ctx.os.exec(cmd))
# Launch more parallel subprocesses than shac will actually allow to run in
# parallel, i.e. more than any realistic machine will have cores (but not
# too many, or the test will be very slow).
num_procs = 1000
procs = [ctx.os.exec(cmd) for _ in range(num_procs)]

for proc in procs:
# It should be possible to wait on the subprocesses in the reverse of the
# order in which they were started without causing a deadlock; the lock
# should be released asynchronously, not by calling wait().
for proc in reversed(procs):
res = proc.wait()
print(res.stdout.strip())

Expand Down
2 changes: 1 addition & 1 deletion internal/engine/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (
// Version is the current tool version.
//
// TODO(maruel): Add proper version, preferably from git tag.
Version = shacVersion{0, 1, 14}
Version = shacVersion{0, 1, 15}
)

func (v shacVersion) String() string {
Expand Down

0 comments on commit 8c21bec

Please sign in to comment.