Skip to content

Commit

Permalink
dynamic host volumes: volume fingerprinting (#24613)
Browse files Browse the repository at this point in the history
and expand the demo a bit
  • Loading branch information
gulducat authored and tgross committed Dec 13, 2024
1 parent 58903b2 commit d579dae
Show file tree
Hide file tree
Showing 22 changed files with 540 additions and 80 deletions.
2 changes: 2 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ type DriverInfo struct {
type HostVolumeInfo struct {
Path string
ReadOnly bool
// ID is set for dynamic host volumes only.
ID string
}

// HostNetworkInfo is used to return metadata about a given HostNetwork
Expand Down
19 changes: 9 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.updateNodeFromDriver,
c.updateNodeFromDevices,
c.updateNodeFromCSI,
c.updateNodeFromHostVol,
)

// Initialize the server manager
Expand Down Expand Up @@ -535,16 +536,14 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)

c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger,
c.stateDB, hostVolumeRequestTimeout,
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
if err != nil {
// NewHostVolumeManager will only err if it fails to read state store,
// or if one or more required plugins do not exist, so halt the client
// because something needs to be fixed by a cluster admin.
return nil, err
}
// set up dynamic host volume manager
c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{
PluginDir: cfg.HostVolumePluginDir,
SharedMountDir: cfg.AllocMountsDir,
StateMgr: c.stateDB,
UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume,
})
c.pluginManagers.RegisterAndRun(c.hostVolumeManager)

// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the
Expand Down
2 changes: 1 addition & 1 deletion client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (v *HostVolume) Delete(
ctx, cancelFn := v.requestContext()
defer cancelFn()

_, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it?
_, err := v.c.hostVolumeManager.Delete(ctx, req)
if err != nil {
v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err)
return err
Expand Down
40 changes: 31 additions & 9 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package client
import (
"path/filepath"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)

Expand All @@ -26,16 +26,22 @@ func TestHostVolume(t *testing.T) {
client.stateDB = memdb

tmp := t.TempDir()
var err error
manager := hvm.NewHostVolumeManager(testlog.HCLogger(t), hvm.Config{
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: tmp,
})
client.hostVolumeManager = manager
expectDir := filepath.Join(tmp, "test-vol-id")
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", tmp)
must.NoError(t, err)

t.Run("happy", func(t *testing.T) {

/* create */

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Name: "test-vol-name",
ID: "test-vol-id",
PluginID: "mkdir", // real plugin really makes a dir
}
var resp cstructs.ClientHostVolumeCreateResponse
Expand All @@ -56,8 +62,19 @@ func TestHostVolume(t *testing.T) {
CreateReq: req,
}
must.Eq(t, expectState, vols[0])
// and should be fingerprinted
must.Eq(t, hvm.VolumeMap{
req.Name: {
ID: req.ID,
Name: req.Name,
Path: expectDir,
},
}, client.Node().HostVolumes)

/* delete */

delReq := &cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: "test-vol-id",
PluginID: "mkdir",
HostPath: expectDir,
Expand All @@ -72,6 +89,8 @@ func TestHostVolume(t *testing.T) {
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
// and the fingerprint, too
must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes)
})

t.Run("missing plugin", func(t *testing.T) {
Expand All @@ -92,9 +111,12 @@ func TestHostVolume(t *testing.T) {

t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
must.NoError(t, err)
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), hvm.Config{
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: "host_volume_endpoint_test.go",
})

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Expand Down
19 changes: 15 additions & 4 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,28 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
"path", path)
log.Debug("running plugin")

resp := &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
}

if _, err := os.Stat(path); err == nil {
// already exists
return resp, nil
} else if !os.IsNotExist(err) {
// doesn't exist, but some other path error
log.Debug("error with plugin", "error", err)
return nil, err
}

err := os.Mkdir(path, 0o700)
if err != nil {
log.Debug("error with plugin", "error", err)
return nil, err
}

log.Debug("plugin ran successfully")
return &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
}, nil
return resp, nil
}

func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
Expand Down
97 changes: 67 additions & 30 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"context"
"errors"
"path/filepath"
"time"
"sync"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

var (
Expand All @@ -26,42 +27,68 @@ type HostVolumeStateManager interface {
DeleteDynamicHostVolume(string) error
}

type Config struct {
// PluginDir is where external plugins may be found.
PluginDir string

// SharedMountDir is where plugins should place the directory
// that will later become a volume HostPath
SharedMountDir string

// StateMgr manages client state to restore on agent restarts.
StateMgr HostVolumeStateManager

// UpdateNodeVols is run to update the node when a volume is created
// or deleted.
UpdateNodeVols HostVolumeNodeUpdater
}

type HostVolumeManager struct {
pluginDir string
sharedMountDir string
stateMgr HostVolumeStateManager

log hclog.Logger
updateNodeVols HostVolumeNodeUpdater
log hclog.Logger
}

func NewHostVolumeManager(logger hclog.Logger,
state HostVolumeStateManager, restoreTimeout time.Duration,
pluginDir, sharedMountDir string) (*HostVolumeManager, error) {

log := logger.Named("host_volume_mgr")

// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
hvm := &HostVolumeManager{
pluginDir: pluginDir,
sharedMountDir: sharedMountDir,
stateMgr: state,
log: log,
func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager {
// db TODO(1.10.0): document plugin config options
return &HostVolumeManager{
pluginDir: config.PluginDir,
sharedMountDir: config.SharedMountDir,
stateMgr: config.StateMgr,
updateNodeVols: config.UpdateNodeVols,
log: logger.Named("host_volume_manager"),
}
}

if err := hvm.restoreState(state, restoreTimeout); err != nil {
return nil, err
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
if req == nil || resp == nil {
return nil
}
return &structs.ClientHostVolumeConfig{
Name: req.Name,
ID: req.ID,
Path: resp.Path,

// dynamic volumes, like CSI, have more robust `capabilities`,
// so we always set ReadOnly to false, and let the scheduler
// decide when to ignore this and check capabilities instead.
ReadOnly: false,
}

return hvm, nil
}

func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error {
vols, err := state.GetDynamicHostVolumes()
func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) {
vols, err := hvm.stateMgr.GetDynamicHostVolumes()
if err != nil {
return err
return nil, err
}

volumes := make(VolumeMap)
var mut sync.Mutex

if len(vols) == 0 {
return nil // nothing to do
return volumes, nil // nothing to do
}

// re-"create" the volumes - plugins have the best knowledge of their
Expand All @@ -76,17 +103,20 @@ func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout
return err
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil
}
mut.Lock()
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
mut.Unlock()
return nil
})
}
mErr := group.Wait()
return helper.FlattenMultierror(mErr.ErrorOrNil())
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
Expand Down Expand Up @@ -139,9 +169,11 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return nil, helper.FlattenMultierror(err)
}

// db TODO(1.10.0): now we need to add the volume to the node fingerprint!
hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp))

resp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: req.Name,
VolumeID: req.ID,
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeBytes,
}
Expand All @@ -162,12 +194,17 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
return nil, err
}

resp := &cstructs.ClientHostVolumeDeleteResponse{}

if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
return nil, err // bail so a user may retry
}

hvm.updateNodeVols(req.Name, nil)

resp := &cstructs.ClientHostVolumeDeleteResponse{
VolumeName: req.Name,
VolumeID: req.ID,
}

return resp, nil
}
Loading

0 comments on commit d579dae

Please sign in to comment.