Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: volume fingerprinting #24613

Merged
merged 13 commits into from
Dec 9, 2024
2 changes: 2 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ type DriverInfo struct {
type HostVolumeInfo struct {
Path string
ReadOnly bool
// ID is set for dynamic host volumes only.
ID string
}

// HostNetworkInfo is used to return metadata about a given HostNetwork
Expand Down
19 changes: 9 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.updateNodeFromDriver,
c.updateNodeFromDevices,
c.updateNodeFromCSI,
c.updateNodeFromHostVol,
)

// Initialize the server manager
Expand Down Expand Up @@ -535,16 +536,14 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)

c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger,
c.stateDB, hostVolumeRequestTimeout,
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
if err != nil {
// NewHostVolumeManager will only err if it fails to read state store,
// or if one or more required plugins do not exist, so halt the client
// because something needs to be fixed by a cluster admin.
return nil, err
}
// set up dynamic host volume manager
c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{
PluginDir: cfg.HostVolumePluginDir,
SharedMountDir: cfg.AllocMountsDir,
StateMgr: c.stateDB,
UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume,
})
c.pluginManagers.RegisterAndRun(c.hostVolumeManager)

// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the
Expand Down
2 changes: 1 addition & 1 deletion client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (v *HostVolume) Delete(
ctx, cancelFn := v.requestContext()
defer cancelFn()

_, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it?
_, err := v.c.hostVolumeManager.Delete(ctx, req)
if err != nil {
v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err)
return err
Expand Down
40 changes: 31 additions & 9 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package client
import (
"path/filepath"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)

Expand All @@ -26,16 +26,22 @@ func TestHostVolume(t *testing.T) {
client.stateDB = memdb

tmp := t.TempDir()
var err error
manager := hvm.NewHostVolumeManager(testlog.HCLogger(t), hvm.Config{
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: tmp,
})
client.hostVolumeManager = manager
expectDir := filepath.Join(tmp, "test-vol-id")
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", tmp)
must.NoError(t, err)

t.Run("happy", func(t *testing.T) {

/* create */

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Name: "test-vol-name",
ID: "test-vol-id",
PluginID: "mkdir", // real plugin really makes a dir
}
var resp cstructs.ClientHostVolumeCreateResponse
Expand All @@ -56,8 +62,19 @@ func TestHostVolume(t *testing.T) {
CreateReq: req,
}
must.Eq(t, expectState, vols[0])
// and should be fingerprinted
must.Eq(t, hvm.VolumeMap{
req.Name: {
ID: req.ID,
Name: req.Name,
Path: expectDir,
},
}, client.Node().HostVolumes)

/* delete */

delReq := &cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: "test-vol-id",
PluginID: "mkdir",
HostPath: expectDir,
Expand All @@ -72,6 +89,8 @@ func TestHostVolume(t *testing.T) {
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
// and the fingerprint, too
must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes)
})

t.Run("missing plugin", func(t *testing.T) {
Expand All @@ -92,9 +111,12 @@ func TestHostVolume(t *testing.T) {

t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
must.NoError(t, err)
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), hvm.Config{
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: "host_volume_endpoint_test.go",
})

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Expand Down
19 changes: 15 additions & 4 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,28 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
"path", path)
log.Debug("running plugin")

resp := &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
}

if _, err := os.Stat(path); err == nil {
// already exists
return resp, nil
} else if !os.IsNotExist(err) {
// doesn't exist, but some other path error
log.Debug("error with plugin", "error", err)
return nil, err
}

err := os.Mkdir(path, 0o700)
if err != nil {
log.Debug("error with plugin", "error", err)
return nil, err
}

log.Debug("plugin ran successfully")
return &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
}, nil
return resp, nil
}

func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
Expand Down
97 changes: 67 additions & 30 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"context"
"errors"
"path/filepath"
"time"
"sync"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

var (
Expand All @@ -26,42 +27,68 @@ type HostVolumeStateManager interface {
DeleteDynamicHostVolume(string) error
}

type Config struct {
// PluginDir is where external plugins may be found.
PluginDir string

// SharedMountDir is where plugins should place the directory
// that will later become a volume HostPath
SharedMountDir string

// StateMgr manages client state to restore on agent restarts.
StateMgr HostVolumeStateManager

// UpdateNodeVols is run to update the node when a volume is created
// or deleted.
UpdateNodeVols HostVolumeNodeUpdater
}

type HostVolumeManager struct {
pluginDir string
sharedMountDir string
stateMgr HostVolumeStateManager

log hclog.Logger
updateNodeVols HostVolumeNodeUpdater
log hclog.Logger
}

func NewHostVolumeManager(logger hclog.Logger,
state HostVolumeStateManager, restoreTimeout time.Duration,
pluginDir, sharedMountDir string) (*HostVolumeManager, error) {

log := logger.Named("host_volume_mgr")

// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
hvm := &HostVolumeManager{
pluginDir: pluginDir,
sharedMountDir: sharedMountDir,
stateMgr: state,
log: log,
func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager {
// db TODO(1.10.0): document plugin config options
return &HostVolumeManager{
pluginDir: config.PluginDir,
sharedMountDir: config.SharedMountDir,
stateMgr: config.StateMgr,
updateNodeVols: config.UpdateNodeVols,
log: logger.Named("host_volume_manager"),
}
}

if err := hvm.restoreState(state, restoreTimeout); err != nil {
return nil, err
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
if req == nil || resp == nil {
return nil
}
return &structs.ClientHostVolumeConfig{
Name: req.Name,
ID: req.ID,
Path: resp.Path,

// dynamic volumes, like CSI, have more robust `capabilities`,
// so we always set ReadOnly to false, and let the scheduler
// decide when to ignore this and check capabilities instead.
Comment on lines +74 to +76
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the taskrunner's volume_hook will need to handle this too, but I'll follow-up with that later.

ReadOnly: false,
}

return hvm, nil
}

func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error {
vols, err := state.GetDynamicHostVolumes()
func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) {
vols, err := hvm.stateMgr.GetDynamicHostVolumes()
if err != nil {
return err
return nil, err
}

volumes := make(VolumeMap)
var mut sync.Mutex

if len(vols) == 0 {
return nil // nothing to do
return volumes, nil // nothing to do
}

// re-"create" the volumes - plugins have the best knowledge of their
Expand All @@ -76,17 +103,20 @@ func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout
return err
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil
}
mut.Lock()
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
mut.Unlock()
return nil
})
}
mErr := group.Wait()
return helper.FlattenMultierror(mErr.ErrorOrNil())
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
Expand Down Expand Up @@ -139,9 +169,11 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return nil, helper.FlattenMultierror(err)
}

// db TODO(1.10.0): now we need to add the volume to the node fingerprint!
hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp))

resp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: req.Name,
VolumeID: req.ID,
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeBytes,
}
Expand All @@ -162,12 +194,17 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
return nil, err
}

resp := &cstructs.ClientHostVolumeDeleteResponse{}

if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
return nil, err // bail so a user may retry
}

hvm.updateNodeVols(req.Name, nil)

resp := &cstructs.ClientHostVolumeDeleteResponse{
VolumeName: req.Name,
VolumeID: req.ID,
}

return resp, nil
}
Loading
Loading