From c772de586f6bc3db610d837c5d42b636d76c34c2 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 19 Nov 2024 14:07:30 -0500 Subject: [PATCH 1/2] drivers: update ordering of events in StartTask to fix executor leak If an error occurs before a task is started, but after this executor process is created, the executor must be explicity stopped. This change updates the logic in StartTask so launching the task happens immediately after creating the executor. --- drivers/exec/driver.go | 16 +++++++++------- drivers/java/driver.go | 16 +++++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/drivers/exec/driver.go b/drivers/exec/driver.go index 70d2b43c2c2..5317550fa62 100644 --- a/drivers/exec/driver.go +++ b/drivers/exec/driver.go @@ -492,13 +492,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive Compute: d.compute, } - exec, pluginClient, err := executor.CreateExecutor( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), - d.nomadConfig, executorConfig) - if err != nil { - return nil, nil, fmt.Errorf("failed to create executor: %v", err) - } - user := cfg.User if cfg.DNS != nil { dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS) @@ -516,6 +509,15 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.logger.Debug("task capabilities", "capabilities", caps) + // If any error scenarios occur in StartTask and after creating the executor process, + // the executor process must be killed. + exec, pluginClient, err := executor.CreateExecutor( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), + d.nomadConfig, executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to create executor: %v", err) + } + execCmd := &executor.ExecCommand{ Cmd: driverConfig.Command, Args: driverConfig.Args, diff --git a/drivers/java/driver.go b/drivers/java/driver.go index 99575d77853..6123701c431 100644 --- a/drivers/java/driver.go +++ b/drivers/java/driver.go @@ -467,13 +467,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive Compute: d.nomadConfig.Topology.Compute(), } - exec, pluginClient, err := executor.CreateExecutor( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), - d.nomadConfig, executorConfig) - if err != nil { - return nil, nil, fmt.Errorf("failed to create executor: %v", err) - } - user := cfg.User if user == "" { user = "nobody" @@ -495,6 +488,15 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.logger.Debug("task capabilities", "capabilities", caps) + // If any error scenarios occur in StartTask and after creating the executor process, + // the executor process must be killed. + exec, pluginClient, err := executor.CreateExecutor( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), + d.nomadConfig, executorConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to create executor: %v", err) + } + execCmd := &executor.ExecCommand{ Cmd: absPath, Args: args, From c690e0574c9d6a315b9f91fb23ec62bb6eb997ac Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 20 Nov 2024 09:44:28 -0500 Subject: [PATCH 2/2] defer executor cleanup func --- .changelog/24495.txt | 3 ++ drivers/exec/driver.go | 14 ++++---- drivers/exec/driver_test.go | 64 ++++++++++++++++++++++++++++++++++++- drivers/java/driver.go | 14 ++++---- 4 files changed, 82 insertions(+), 13 deletions(-) create mode 100644 .changelog/24495.txt diff --git a/.changelog/24495.txt b/.changelog/24495.txt new file mode 100644 index 00000000000..1aa1cd0e974 --- /dev/null +++ b/.changelog/24495.txt @@ -0,0 +1,3 @@ +```release-note:bug +drivers: fix executor leak when drivers error starting tasks +``` diff --git a/drivers/exec/driver.go b/drivers/exec/driver.go index 5317550fa62..0c507309e5c 100644 --- a/drivers/exec/driver.go +++ b/drivers/exec/driver.go @@ -456,7 +456,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return nil } -func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (handle *drivers.TaskHandle, network *drivers.DriverNetwork, err error) { if _, ok := d.tasks.Get(cfg.ID); ok { return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) } @@ -481,7 +481,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) - handle := drivers.NewTaskHandle(taskHandleVersion) + handle = drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") @@ -509,14 +509,18 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.logger.Debug("task capabilities", "capabilities", caps) - // If any error scenarios occur in StartTask and after creating the executor process, - // the executor process must be killed. exec, pluginClient, err := executor.CreateExecutor( d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), d.nomadConfig, executorConfig) if err != nil { return nil, nil, fmt.Errorf("failed to create executor: %v", err) } + // prevent leaking executor in error scenarios + defer func() { + if err != nil { + pluginClient.Kill() + } + }() execCmd := &executor.ExecCommand{ Cmd: driverConfig.Command, @@ -540,7 +544,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive ps, err := exec.Launch(execCmd) if err != nil { - pluginClient.Kill() return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) } @@ -564,7 +567,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err) _ = exec.Shutdown("", 0) - pluginClient.Kill() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go index 9b1d86bb41d..7183ad8bfbc 100644 --- a/drivers/exec/driver_test.go +++ b/drivers/exec/driver_test.go @@ -35,6 +35,7 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" ) type mockIDValidator struct{} @@ -347,9 +348,70 @@ func TestExecDriver_StartWaitRecover(t *testing.T) { require.NoError(t, harness.DestroyTask(task.ID, true)) } +func TestExecDriver_NoOrphanedExecutor(t *testing.T) { + ci.Parallel(t) + ctestutils.ExecCompatible(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + d := newExecDriverTest(t, ctx) + harness := dtestutil.NewDriverHarness(t, d) + defer harness.Kill() + + config := &Config{ + NoPivotRoot: false, + DefaultModePID: executor.IsolationModePrivate, + DefaultModeIPC: executor.IsolationModePrivate, + } + + var data []byte + must.NoError(t, base.MsgPackEncode(&data, config)) + baseConfig := &base.Config{ + PluginConfig: data, + AgentConfig: &base.AgentConfig{ + Driver: &base.ClientDriverConfig{ + Topology: d.(*Driver).nomadConfig.Topology, + }, + }, + } + must.NoError(t, harness.SetConfig(baseConfig)) + + allocID := uuid.Generate() + taskName := "test" + task := &drivers.TaskConfig{ + AllocID: allocID, + ID: uuid.Generate(), + Name: taskName, + Resources: testResources(allocID, taskName), + } + + cleanup := harness.MkAllocDir(task, true) + defer cleanup() + + taskConfig := map[string]interface{}{} + taskConfig["command"] = "force-an-error" + must.NoError(t, task.EncodeConcreteDriverConfig(&taskConfig)) + + _, _, err := harness.StartTask(task) + must.Error(t, err) + defer harness.DestroyTask(task.ID, true) + + testPid := unix.Getpid() + tids, err := os.ReadDir(fmt.Sprintf("/proc/%d/task", testPid)) + must.NoError(t, err) + for _, tid := range tids { + children, err := os.ReadFile(fmt.Sprintf("/proc/%d/task/%s/children", testPid, tid.Name())) + must.NoError(t, err) + + pids := strings.Fields(string(children)) + must.Eq(t, 0, len(pids)) + } +} + // TestExecDriver_NoOrphans asserts that when the main // task dies, the orphans in the PID namespaces are killed by the kernel -func TestExecDriver_NoOrphans(t *testing.T) { +func TestExecDriver_NoOrphanedTasks(t *testing.T) { ci.Parallel(t) ctestutils.ExecCompatible(t) diff --git a/drivers/java/driver.go b/drivers/java/driver.go index 6123701c431..d918b06a69a 100644 --- a/drivers/java/driver.go +++ b/drivers/java/driver.go @@ -429,7 +429,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return nil } -func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (handle *drivers.TaskHandle, network *drivers.DriverNetwork, err error) { if _, ok := d.tasks.Get(cfg.ID); ok { return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) } @@ -456,7 +456,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args) - handle := drivers.NewTaskHandle(taskHandleVersion) + handle = drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") @@ -488,14 +488,18 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.logger.Debug("task capabilities", "capabilities", caps) - // If any error scenarios occur in StartTask and after creating the executor process, - // the executor process must be killed. exec, pluginClient, err := executor.CreateExecutor( d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), d.nomadConfig, executorConfig) if err != nil { return nil, nil, fmt.Errorf("failed to create executor: %v", err) } + // prevent leaking executor in error scenarios + defer func() { + if err != nil { + pluginClient.Kill() + } + }() execCmd := &executor.ExecCommand{ Cmd: absPath, @@ -518,7 +522,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive ps, err := exec.Launch(execCmd) if err != nil { - pluginClient.Kill() return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) } @@ -542,7 +545,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err) exec.Shutdown("", 0) - pluginClient.Kill() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) }