Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed Dec 31, 2024
1 parent 618135e commit 623211a
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 20 deletions.
48 changes: 28 additions & 20 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package docker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -95,7 +96,6 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte

ticker, cancel := helper.NewSafeTicker(interval)
defer cancel()
var stats *containerapi.Stats

for {
select {
Expand All @@ -104,25 +104,9 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
case <-h.doneCh:
return
case <-ticker.C:
// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot. This streaming stats can be reused over time,
// but require synchronization, which restricts the interval for the metrics.
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
statsReader.Body.Close()
if err != nil && err != io.EOF {
h.logger.Error("error decoding stats data from container", "error", err)
return
}

if stats == nil {
h.logger.Error("error decoding stats data: stats were nil")
stats, err := h.collectDockerStats(ctx)
if err != nil {
h.logger.Error("error collecting stats from container", "error", err)
return
}

Expand All @@ -131,3 +115,27 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
}
}
}

// collectDockerStats performs the stats collection from the Docker API. It is
// split into its own function for the purpose of aiding testing.
func (h *taskHandle) collectDockerStats(ctx context.Context) (*containerapi.Stats, error) {

var stats *containerapi.Stats

statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, false)
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to collect stats: %w", err)
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
_ = statsReader.Body.Close()
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to decode stats: %w", err)
}

if stats == nil {
return nil, errors.New("error decoding stats data: stats were nil")
}

return stats, nil
}
45 changes: 45 additions & 0 deletions drivers/docker/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package docker

import (
"context"
"runtime"
"sync"
"testing"
"time"

containerapi "github.com/docker/docker/api/types/container"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/cpustats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/drivers/docker/util"
"github.com/shoenig/test/must"
)
Expand Down Expand Up @@ -112,3 +115,45 @@ func TestDriver_DockerUsageSender(t *testing.T) {
destCh.close()
destCh.send(res)
}

func Test_taskHandle_collectDockerStats(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)

// Start a Docker container and wait for it to be running, so we can
// guarantee stats generation.
driverCfg, dockerTaskConfig, _ := dockerTask(t)

must.NoError(t, driverCfg.EncodeConcreteDriverConfig(dockerTaskConfig))

_, driverHarness, handle, cleanup := dockerSetup(t, driverCfg, nil)
defer cleanup()
must.NoError(t, driverHarness.WaitUntilStarted(driverCfg.ID, 5*time.Second))

// Generate a context, so the test doesn't hang on Docker problems and
// execute a single collection of the stats.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

dockerStats, err := handle.collectDockerStats(ctx)
must.NoError(t, err)
must.NotNil(t, dockerStats)

// Ensure all the stats we use for calculating CPU percentages within
// DockerStatsToTaskResourceUsage are present and non-zero.
must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.TotalUsage)
must.Greater(t, 0, dockerStats.CPUStats.SystemUsage)
must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.UsageInKernelmode)
must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.TotalUsage)
must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.UsageInUsermode)

must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.TotalUsage)
must.Greater(t, 0, dockerStats.PreCPUStats.SystemUsage)
must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.UsageInKernelmode)
must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.TotalUsage)
must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.UsageInUsermode)

// Ensure the core memory stats are present and where desired, non-zero.
must.Greater(t, 0, dockerStats.MemoryStats.Usage)
must.MapContainsKey(t, dockerStats.MemoryStats.Stats, "file_mapped")
}

0 comments on commit 623211a

Please sign in to comment.