From 623211a0ff3dabfc553d91f7f14afa91962b49db Mon Sep 17 00:00:00 2001 From: James Rasell Date: Tue, 31 Dec 2024 09:37:51 +0000 Subject: [PATCH] WIP --- drivers/docker/stats.go | 48 +++++++++++++++++++++--------------- drivers/docker/stats_test.go | 45 +++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index a139e599819..4fc6e58179c 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -6,6 +6,7 @@ package docker import ( "context" "encoding/json" + "errors" "fmt" "io" "sync" @@ -95,7 +96,6 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte ticker, cancel := helper.NewSafeTicker(interval) defer cancel() - var stats *containerapi.Stats for { select { @@ -104,25 +104,9 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte case <-h.doneCh: return case <-ticker.C: - // we need to use the streaming stats API here because our calculation for - // CPU usage depends on having the values from the previous read, which are - // not available in one-shot. This streaming stats can be reused over time, - // but require synchronization, which restricts the interval for the metrics. - statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true) - if err != nil && err != io.EOF { - h.logger.Debug("error collecting stats from container", "error", err) - return - } - - err = json.NewDecoder(statsReader.Body).Decode(&stats) - statsReader.Body.Close() - if err != nil && err != io.EOF { - h.logger.Error("error decoding stats data from container", "error", err) - return - } - - if stats == nil { - h.logger.Error("error decoding stats data: stats were nil") + stats, err := h.collectDockerStats(ctx) + if err != nil { + h.logger.Error("error collecting stats from container", "error", err) return } @@ -131,3 +115,27 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte } } } + +// collectDockerStats performs the stats collection from the Docker API. It is +// split into its own function for the purpose of aiding testing. +func (h *taskHandle) collectDockerStats(ctx context.Context) (*containerapi.Stats, error) { + + var stats *containerapi.Stats + + statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, false) + if err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to collect stats: %w", err) + } + + err = json.NewDecoder(statsReader.Body).Decode(&stats) + _ = statsReader.Body.Close() + if err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to decode stats: %w", err) + } + + if stats == nil { + return nil, errors.New("error decoding stats data: stats were nil") + } + + return stats, nil +} diff --git a/drivers/docker/stats_test.go b/drivers/docker/stats_test.go index d1147e585cc..9a656174505 100644 --- a/drivers/docker/stats_test.go +++ b/drivers/docker/stats_test.go @@ -4,14 +4,17 @@ package docker import ( + "context" "runtime" "sync" "testing" + "time" containerapi "github.com/docker/docker/api/types/container" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/lib/cpustats" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/drivers/docker/util" "github.com/shoenig/test/must" ) @@ -112,3 +115,45 @@ func TestDriver_DockerUsageSender(t *testing.T) { destCh.close() destCh.send(res) } + +func Test_taskHandle_collectDockerStats(t *testing.T) { + ci.Parallel(t) + testutil.DockerCompatible(t) + + // Start a Docker container and wait for it to be running, so we can + // guarantee stats generation. + driverCfg, dockerTaskConfig, _ := dockerTask(t) + + must.NoError(t, driverCfg.EncodeConcreteDriverConfig(dockerTaskConfig)) + + _, driverHarness, handle, cleanup := dockerSetup(t, driverCfg, nil) + defer cleanup() + must.NoError(t, driverHarness.WaitUntilStarted(driverCfg.ID, 5*time.Second)) + + // Generate a context, so the test doesn't hang on Docker problems and + // execute a single collection of the stats. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + dockerStats, err := handle.collectDockerStats(ctx) + must.NoError(t, err) + must.NotNil(t, dockerStats) + + // Ensure all the stats we use for calculating CPU percentages within + // DockerStatsToTaskResourceUsage are present and non-zero. + must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.TotalUsage) + must.Greater(t, 0, dockerStats.CPUStats.SystemUsage) + must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.UsageInKernelmode) + must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.TotalUsage) + must.Greater(t, 0, dockerStats.CPUStats.CPUUsage.UsageInUsermode) + + must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.TotalUsage) + must.Greater(t, 0, dockerStats.PreCPUStats.SystemUsage) + must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.UsageInKernelmode) + must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.TotalUsage) + must.Greater(t, 0, dockerStats.PreCPUStats.CPUUsage.UsageInUsermode) + + // Ensure the core memory stats are present and where desired, non-zero. + must.Greater(t, 0, dockerStats.MemoryStats.Usage) + must.MapContainsKey(t, dockerStats.MemoryStats.Stats, "file_mapped") +}