Skip to content

Commit

Permalink
dynamic host volumes: client state (#24595)
Browse files Browse the repository at this point in the history
store dynamic host volume creations in client state,
so they can be "restored" on agent restart. restore works
by repeating the same Create operation as initial creation,
and expecting the plugin to be idempotent.

this is (potentially) especially important after host restarts,
which may have dropped mount points or such.
  • Loading branch information
gulducat authored Dec 3, 2024
1 parent d08bc07 commit 70bacbd
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 29 deletions.
9 changes: 8 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,16 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)

c.hostVolumeManager = hvm.NewHostVolumeManager(logger,
c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger,
c.stateDB, hostVolumeRequestTimeout,
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
if err != nil {
// NewHostVolumeManager will only err if it fails to read state store,
// or if one or more required plugins do not exist, so halt the client
// because something needs to be fixed by a cluster admin.
return nil, err
}

// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the
Expand Down
29 changes: 25 additions & 4 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package client
import (
"path/filepath"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/shoenig/test/must"
Expand All @@ -20,10 +22,15 @@ func TestHostVolume(t *testing.T) {
client, cleanup := TestClient(t, nil)
defer cleanup()

memdb := state.NewMemDB(testlog.HCLogger(t))
client.stateDB = memdb

tmp := t.TempDir()
var err error
expectDir := filepath.Join(tmp, "test-vol-id")
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
"/no/ext/plugins", tmp)
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", tmp)
must.NoError(t, err)

t.Run("happy", func(t *testing.T) {
req := &cstructs.ClientHostVolumeCreateRequest{
Expand All @@ -40,6 +47,15 @@ func TestHostVolume(t *testing.T) {
}, resp)
// technically this is testing "mkdir" more than the RPC
must.DirExists(t, expectDir)
// ensure we saved to client state
vols, err := memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 1, vols)
expectState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: req,
}
must.Eq(t, expectState, vols[0])

delReq := &cstructs.ClientHostVolumeDeleteRequest{
ID: "test-vol-id",
Expand All @@ -52,6 +68,10 @@ func TestHostVolume(t *testing.T) {
must.NotNil(t, delResp)
// again, actually testing the "mkdir" plugin
must.DirNotExists(t, expectDir)
// client state should be deleted
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
})

t.Run("missing plugin", func(t *testing.T) {
Expand All @@ -72,8 +92,9 @@ func TestHostVolume(t *testing.T) {

t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
"/no/ext/plugins", "host_volume_endpoint_test.go")
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
must.NoError(t, err)

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Expand Down
11 changes: 5 additions & 6 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ type HostVolumePlugin interface {
}

type HostVolumePluginCreateResponse struct {
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
Context map[string]string `json:"context"` // metadata
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
}

const HostVolumePluginMkdirID = "mkdir"
Expand Down Expand Up @@ -70,7 +69,6 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
return &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
Context: map[string]string{},
}, nil
}

Expand Down Expand Up @@ -147,8 +145,9 @@ func (p *HostVolumePluginExternal) Version(ctx context.Context) (*version.Versio
func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {

params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null"
params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): document if this is nil, then PARAMETERS env will be "null"
if err != nil {
// this is a proper error, because users can set this in the volume spec
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
Expand All @@ -165,7 +164,7 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
}

var pluginResp HostVolumePluginCreateResponse
err = json.Unmarshal(stdout, &pluginResp)
err = json.Unmarshal(stdout, &pluginResp) // db TODO(1.10.0): if this fails, then the volume may have been created, according to the plugin, but Nomad will not save it
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions client/hostvolumemanager/host_volume_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestHostVolumePluginMkdir(t *testing.T) {
must.Eq(t, &HostVolumePluginCreateResponse{
Path: target,
SizeBytes: 0,
Context: map[string]string{},
}, resp)
must.DirExists(t, target)

Expand Down Expand Up @@ -115,7 +114,6 @@ func TestHostVolumePluginExternal(t *testing.T) {
must.Eq(t, &HostVolumePluginCreateResponse{
Path: target,
SizeBytes: 5,
Context: map[string]string{"key": "val"},
}, resp)
must.DirExists(t, target)
logged := getLogs()
Expand Down
92 changes: 85 additions & 7 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,86 @@ import (
"context"
"errors"
"path/filepath"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
)

var (
ErrPluginNotExists = errors.New("no such plugin")
ErrPluginNotExecutable = errors.New("plugin not executable")
)

type HostVolumeStateManager interface {
PutDynamicHostVolume(*cstructs.HostVolumeState) error
GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error)
DeleteDynamicHostVolume(string) error
}

type HostVolumeManager struct {
pluginDir string
sharedMountDir string
stateMgr HostVolumeStateManager

log hclog.Logger
}

func NewHostVolumeManager(logger hclog.Logger, pluginDir, sharedMountDir string) *HostVolumeManager {
func NewHostVolumeManager(logger hclog.Logger,
state HostVolumeStateManager, restoreTimeout time.Duration,
pluginDir, sharedMountDir string) (*HostVolumeManager, error) {

log := logger.Named("host_volume_mgr")

// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
return &HostVolumeManager{
log: log,
hvm := &HostVolumeManager{
pluginDir: pluginDir,
sharedMountDir: sharedMountDir,
stateMgr: state,
log: log,
}

if err := hvm.restoreState(state, restoreTimeout); err != nil {
return nil, err
}

return hvm, nil
}

func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error {
vols, err := state.GetDynamicHostVolumes()
if err != nil {
return err
}
if len(vols) == 0 {
return nil // nothing to do
}

// re-"create" the volumes - plugins have the best knowledge of their
// side effects, and they must be idempotent.
group := multierror.Group{}
for _, vol := range vols {
group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
}
return nil
})
}
mErr := group.Wait()
return helper.FlattenMultierror(mErr.ErrorOrNil())
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
Expand Down Expand Up @@ -63,14 +117,35 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return nil, err
}

volState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
// if we fail to write to state, delete the volume so it isn't left
// lying around without Nomad knowing about it.
hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err)
delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{
ID: req.ID,
PluginID: req.PluginID,
NodeID: req.NodeID,
HostPath: hvm.sharedMountDir,
Parameters: req.Parameters,
})
if delErr != nil {
hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr)
err = multierror.Append(err, delErr)
}
return nil, helper.FlattenMultierror(err)
}

// db TODO(1.10.0): now we need to add the volume to the node fingerprint!

resp := &cstructs.ClientHostVolumeCreateResponse{
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeBytes,
}

// db TODO(1.10.0): now we need to add it to the node fingerprint!
// db TODO(1.10.0): and save it in client state!

return resp, nil
}

Expand All @@ -89,7 +164,10 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,

resp := &cstructs.ClientHostVolumeDeleteResponse{}

// db TODO(1.10.0): save the client state!
if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
return nil, err // bail so a user may retry
}

return resp, nil
}
53 changes: 53 additions & 0 deletions client/hostvolumemanager/host_volumes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package hostvolumemanager

import (
"path/filepath"
"testing"
"time"

cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/shoenig/test/must"
)

// db TODO(1.10.0): improve hostvolumemanager tests.

func TestNewHostVolumeManager_restoreState(t *testing.T) {
log := testlog.HCLogger(t)
vol := &cstructs.HostVolumeState{
ID: "test-vol-id",
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
PluginID: "mkdir",
},
}

t.Run("happy", func(t *testing.T) {
// put our volume in state
state := cstate.NewMemDB(log)
must.NoError(t, state.PutDynamicHostVolume(vol))

// new volume manager should load it from state and run Create,
// resulting in a volume directory in this mountDir.
mountDir := t.TempDir()

_, err := NewHostVolumeManager(log, state, time.Second, "/wherever", mountDir)
must.NoError(t, err)

volPath := filepath.Join(mountDir, vol.ID)
must.DirExists(t, volPath)
})

t.Run("get error", func(t *testing.T) {
state := &cstate.ErrDB{}
_, err := NewHostVolumeManager(log, state, time.Second, "/wherever", "/wherever")
// error loading state should break the world
must.ErrorIs(t, err, cstate.ErrDBError)
})

// db TODO: test plugin error
}
41 changes: 41 additions & 0 deletions client/state/db_bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ var (

// nodeRegistrationKey is the key at which node registration data is stored.
nodeRegistrationKey = []byte("node_registration")

hostVolBucket = []byte("host_volumes_to_create")
)

// taskBucketName returns the bucket name for the given task name.
Expand Down Expand Up @@ -1048,6 +1050,45 @@ func (s *BoltStateDB) GetNodeRegistration() (*cstructs.NodeRegistration, error)
return &reg, err
}

func (s *BoltStateDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
b, err := tx.CreateBucketIfNotExists(hostVolBucket)
if err != nil {
return err
}
return b.Put([]byte(vol.ID), vol)
})
}

func (s *BoltStateDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) {
var vols []*cstructs.HostVolumeState
err := s.db.View(func(tx *boltdd.Tx) error {
b := tx.Bucket(hostVolBucket)
if b == nil {
return nil
}
return b.BoltBucket().ForEach(func(k, v []byte) error {
var vol cstructs.HostVolumeState
err := b.Get(k, &vol)
if err != nil {
return err
}
vols = append(vols, &vol)
return nil
})
})
if boltdd.IsErrNotFound(err) {
return nil, nil
}
return vols, err
}

func (s *BoltStateDB) DeleteDynamicHostVolume(id string) error {
return s.db.Update(func(tx *boltdd.Tx) error {
return tx.Bucket(hostVolBucket).Delete([]byte(id))
})
}

// init initializes metadata entries in a newly created state database.
func (s *BoltStateDB) init() error {
return s.db.Update(func(tx *boltdd.Tx) error {
Expand Down
Loading

0 comments on commit 70bacbd

Please sign in to comment.