Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: volume fingerprinting #24613

Merged
merged 13 commits into from
Dec 9, 2024
2 changes: 2 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ type DriverInfo struct {

// HostVolumeInfo is used to return metadata about a given HostVolume.
type HostVolumeInfo struct {
//ID string
//Name string
gulducat marked this conversation as resolved.
Show resolved Hide resolved
Path string
ReadOnly bool
}
Expand Down
6 changes: 5 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/hoststats"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"

Check failure on line 37 in client/client.go

View workflow job for this annotation

GitHub Actions / checks / checks

could not import github.com/hashicorp/nomad/client/hostvolumemanager (-: # github.com/hashicorp/nomad/client/hostvolumemanager
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/numalib"
Expand Down Expand Up @@ -411,6 +411,7 @@
c.updateNodeFromDriver,
c.updateNodeFromDevices,
c.updateNodeFromCSI,
c.updateNodeFromHostVol,
)

// Initialize the server manager
Expand Down Expand Up @@ -535,8 +536,10 @@
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)

c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger,
// TODO: config to align with the others?
c.hostVolumeManager = hvm.NewHostVolumeManager(logger,
c.stateDB, hostVolumeRequestTimeout,
c.batchNodeUpdates.updateNodeFromHostVolume,
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
if err != nil {
Expand All @@ -545,6 +548,7 @@
// because something needs to be fixed by a cluster admin.
return nil, err
}
c.pluginManagers.RegisterAndRun(c.hostVolumeManager)

// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the
Expand Down
2 changes: 1 addition & 1 deletion client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (v *HostVolume) Delete(
ctx, cancelFn := v.requestContext()
defer cancelFn()

_, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it?
_, err := v.c.hostVolumeManager.Delete(ctx, req)
if err != nil {
v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err)
return err
Expand Down
31 changes: 23 additions & 8 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)

Expand All @@ -26,16 +27,18 @@ func TestHostVolume(t *testing.T) {
client.stateDB = memdb

tmp := t.TempDir()
var err error
manager := hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, client.updateNodeFromHostVol, "/no/ext/plugins", tmp)
client.hostVolumeManager = manager
expectDir := filepath.Join(tmp, "test-vol-id")
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", tmp)
must.NoError(t, err)

t.Run("happy", func(t *testing.T) {

/* create */

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Name: "test-vol-name",
ID: "test-vol-id",
PluginID: "mkdir", // real plugin really makes a dir
}
var resp cstructs.ClientHostVolumeCreateResponse
Expand All @@ -56,8 +59,19 @@ func TestHostVolume(t *testing.T) {
CreateReq: req,
}
must.Eq(t, expectState, vols[0])
// and should be fingerprinted
must.Eq(t, hvm.VolumeMap{
req.Name: {
ID: req.ID,
Name: req.Name,
Path: expectDir,
},
}, client.Node().HostVolumes)

/* delete */

delReq := &cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: "test-vol-id",
PluginID: "mkdir",
HostPath: expectDir,
Expand All @@ -72,6 +86,8 @@ func TestHostVolume(t *testing.T) {
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
// and the fingerprint, too
must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes)
})

t.Run("missing plugin", func(t *testing.T) {
Expand All @@ -92,9 +108,8 @@ func TestHostVolume(t *testing.T) {

t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
must.NoError(t, err)
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, client.updateNodeFromHostVol, "/no/ext/plugins", "host_volume_endpoint_test.go")

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Expand Down
19 changes: 15 additions & 4 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,28 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
"path", path)
log.Debug("running plugin")

resp := &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
}

if _, err := os.Stat(path); err == nil {
// already exists
return resp, nil
} else if !os.IsNotExist(err) {
// doesn't exist, but some other path error
log.Debug("error with plugin", "error", err)
return nil, err
}

err := os.Mkdir(path, 0o700)
if err != nil {
log.Debug("error with plugin", "error", err)
return nil, err
}

log.Debug("plugin ran successfully")
return &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
}, nil
return resp, nil
}

func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
Expand Down
115 changes: 96 additions & 19 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,40 @@
"context"
"errors"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

var (
ErrPluginNotExists = errors.New("no such plugin")
ErrPluginNotExecutable = errors.New("plugin not executable")
ErrUpdateTimeout = errors.New("timeout updating client node")
)

type HostVolumeStateManager interface {
type VolumeMap map[string]*structs.ClientHostVolumeConfig

// UpdateVolumeMap returns true if it changes the volume map.
func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) {
gulducat marked this conversation as resolved.
Show resolved Hide resolved
cur, ok := volumes[name]
if !ok || !cur.Equal(vol) { // TODO: revisit
changed = true
if vol == nil {
delete(volumes, name)
} else {
volumes[vol.Name] = vol
}
}
//
return changed
}

type HostVolumeStateManager interface { // TODO: interface.go
PutDynamicHostVolume(*cstructs.HostVolumeState) error
GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error)
DeleteDynamicHostVolume(string) error
Expand All @@ -31,37 +51,54 @@
sharedMountDir string
stateMgr HostVolumeStateManager

createTimeout time.Duration

updateNodeVols HostVolumeNodeUpdater

Check failure on line 56 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

undefined: HostVolumeNodeUpdater

Check failure on line 56 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: HostVolumeNodeUpdater

Check failure on line 56 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: HostVolumeNodeUpdater

Check failure on line 56 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

undefined: HostVolumeNodeUpdater

log hclog.Logger
}

func NewHostVolumeManager(logger hclog.Logger,
state HostVolumeStateManager, restoreTimeout time.Duration,
pluginDir, sharedMountDir string) (*HostVolumeManager, error) {
nodeUpdater HostVolumeNodeUpdater,

Check failure on line 63 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

undefined: HostVolumeNodeUpdater

Check failure on line 63 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: HostVolumeNodeUpdater

Check failure on line 63 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: HostVolumeNodeUpdater

Check failure on line 63 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

undefined: HostVolumeNodeUpdater
pluginDir, sharedMountDir string) *HostVolumeManager {

log := logger.Named("host_volume_mgr")

// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
hvm := &HostVolumeManager{
// db TODO(1.10.0): document plugin config options
return &HostVolumeManager{
pluginDir: pluginDir,
sharedMountDir: sharedMountDir,
stateMgr: state,
updateNodeVols: nodeUpdater,
createTimeout: restoreTimeout,
log: log,
}
}

if err := hvm.restoreState(state, restoreTimeout); err != nil {
return nil, err
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
if req == nil || resp == nil {
return nil
}
return &structs.ClientHostVolumeConfig{
Name: req.Name,
Path: resp.Path,
ReadOnly: false, // db TODO: add to request? (??)
gulducat marked this conversation as resolved.
Show resolved Hide resolved
ID: req.ID,
}

return hvm, nil
}

func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error {
vols, err := state.GetDynamicHostVolumes()
func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) {
vols, err := hvm.stateMgr.GetDynamicHostVolumes()
if err != nil {
return err
return nil, err
}

volumes := make(VolumeMap)
var mut sync.Mutex

if len(vols) == 0 {
return nil // nothing to do
return volumes, nil // nothing to do
}

// re-"create" the volumes - plugins have the best knowledge of their
Expand All @@ -76,17 +113,20 @@
return err
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil
}
mut.Lock()
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
mut.Unlock()
return nil
})
}
mErr := group.Wait()
return helper.FlattenMultierror(mErr.ErrorOrNil())
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
Expand All @@ -107,6 +147,8 @@
func (hvm *HostVolumeManager) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {

// TODO: make hvm stateful, hold a map of creates, check it, and bail if identical: req.Equal(old)

plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -139,9 +181,11 @@
return nil, helper.FlattenMultierror(err)
}

// db TODO(1.10.0): now we need to add the volume to the node fingerprint!
hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp))

resp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: req.Name,
VolumeID: req.ID,
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeBytes,
}
Expand All @@ -162,12 +206,45 @@
return nil, err
}

resp := &cstructs.ClientHostVolumeDeleteResponse{}

if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
return nil, err // bail so a user may retry
}

hvm.updateNodeVols(req.Name, nil)

resp := &cstructs.ClientHostVolumeDeleteResponse{
VolumeName: req.Name,
VolumeID: req.ID,
}

return resp, nil
}

/* impelement client.PluginManager interface */

func (hvm *HostVolumeManager) Run() {
return // nothing to do here.
}

func (hvm *HostVolumeManager) Shutdown() {
return // again, nothing to do.
}

// PluginType is misleading, because this fingerprint is for *volumes*
func (hvm *HostVolumeManager) PluginType() string {
return "host_volume" // TODO: const?
}
func (hvm *HostVolumeManager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} {
ctx, cancel := context.WithTimeout(context.Background(), hvm.createTimeout)
defer cancel()
volumes, err := hvm.restoreFromState(ctx)
if err != nil {
hvm.log.Error("failed to restore state", "error", err)
return ctx.Done()
}
for name, vol := range volumes {

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

declared and not used: name

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

declared and not used: vol

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

declared and not used: name

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

declared and not used: vol (typecheck)

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

declared and not used: name

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / checks / checks

declared and not used: vol) (typecheck)

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

declared and not used: name

Check failure on line 246 in client/hostvolumemanager/host_volumes.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

declared and not used: vol
hvm.updateNodeVols(name, vol)
}
return ctx.Done()
}
Loading
Loading