Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: plugin spec tweaks #24848

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/fingerprint/dynamic_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
mut := sync.Mutex{}
var wg sync.WaitGroup

for file, fullPath := range files {
for file := range files {
wg.Add(1)
go func(file, fullPath string) {
go func(file string) {
defer wg.Done()
// really should take way less than a second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

log := log.With("plugin_id", file)

p, err := hvm.NewHostVolumePluginExternal(log, file, fullPath, "")
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "")
if err != nil {
log.Warn("error getting plugin", "error", err)
return
Expand All @@ -112,7 +112,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
mut.Lock()
plugins[file] = fprint.Version.String()
mut.Unlock()
}(file, fullPath)
}(file)
}

wg.Wait()
Expand Down
94 changes: 64 additions & 30 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
Expand All @@ -20,6 +21,19 @@ import (
"github.com/hashicorp/nomad/helper"
)

const (
// environment variables for external plugins
EnvOperation = "DHV_OPERATION"
EnvHostPath = "DHV_HOST_PATH"
EnvNodeID = "DHV_NODE_ID"
EnvVolumeName = "DHV_VOLUME_NAME"
EnvVolumeID = "DHV_VOLUME_ID"
EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES"
EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvParameters = "DHV_PARAMETERS"
)

// HostVolumePlugin manages the lifecycle of volumes.
type HostVolumePlugin interface {
Fingerprint(ctx context.Context) (*PluginFingerprint, error)
Expand Down Expand Up @@ -121,24 +135,26 @@ var _ HostVolumePlugin = &HostVolumePluginExternal{}
// NewHostVolumePluginExternal returns an external host volume plugin
// if the specified executable exists on disk.
func NewHostVolumePluginExternal(log hclog.Logger,
id, executable, targetPath string) (*HostVolumePluginExternal, error) {
pluginDir, filename, targetPath string) (*HostVolumePluginExternal, error) {
// this should only be called with already-detected executables,
// but we'll double-check it anyway, so we can provide a tidy error message
// if it has changed between fingerprinting and execution.
executable := filepath.Join(pluginDir, filename)
f, err := os.Stat(executable)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExists, id)
return nil, fmt.Errorf("%w: %q", ErrPluginNotExists, filename)
}
return nil, err
}
if !helper.IsExecutable(f) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExecutable, id)
return nil, fmt.Errorf("%w: %q", ErrPluginNotExecutable, filename)
}
return &HostVolumePluginExternal{
ID: id,
ID: filename,
Executable: executable,
TargetPath: targetPath,
PluginDir: pluginDir,
log: log,
}, nil
}
Expand All @@ -151,22 +167,27 @@ type HostVolumePluginExternal struct {
ID string
Executable string
TargetPath string
PluginDir string

log hclog.Logger
}

// Fingerprint calls the executable with the following parameters:
// arguments: fingerprint
// environment:
// OPERATION=fingerprint
// DHV_OPERATION=fingerprint
//
// Response should be valid JSON on stdout, with a "version" key, e.g.:
// {"version": "0.0.1"}
// The version value should be a valid version number as allowed by
// version.NewVersion()
//
// Must complete within 5 seconds
func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFingerprint, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, p.Executable, "fingerprint")
cmd.Env = []string{"OPERATION=fingerprint"}
cmd.Env = []string{EnvOperation + "=fingerprint"}
stdout, stderr, err := runCommand(cmd)
if err != nil {
p.log.Debug("error with plugin",
Expand All @@ -186,19 +207,23 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing
// Create calls the executable with the following parameters:
// arguments: create {path to create}
// environment:
// OPERATION=create
// HOST_PATH={path to create}
// NODE_ID={Nomad node ID}
// VOLUME_NAME={name from the volume specification}
// CAPACITY_MIN_BYTES={capacity_min from the volume spec}
// CAPACITY_MAX_BYTES={capacity_max from the volume spec}
// PARAMETERS={json of parameters from the volume spec}
// DHV_OPERATION=create
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec}
// DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
//
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
// {"path": $HOST_PATH, "bytes": 50000000}
// "path" must be provided to confirm that the requested path is what was
// created by the plugin. "bytes" is the actual size of the volume created
// by the plugin; if excluded, it will default to 0.
//
// Must complete within 60 seconds (timeout on RPC)
func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {

Expand All @@ -208,11 +233,11 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
"NODE_ID=" + req.NodeID,
"VOLUME_NAME=" + req.Name,
fmt.Sprintf("CAPACITY_MIN_BYTES=%d", req.RequestedCapacityMinBytes),
fmt.Sprintf("CAPACITY_MAX_BYTES=%d", req.RequestedCapacityMaxBytes),
"PARAMETERS=" + string(params),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes),
fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes),
fmt.Sprintf("%s=%s", EnvParameters, params),
}

stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
Expand All @@ -228,19 +253,24 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
// an error here after the plugin has done who-knows-what.
return nil, err
}
// TODO: validate returned host path
return &pluginResp, nil
}

// Delete calls the executable with the following parameters:
// arguments: delete {path to create}
// environment:
// OPERATION=delete
// HOST_PATH={path to create}
// NODE_ID={Nomad node ID}
// VOLUME_NAME={name from the volume specification}
// PARAMETERS={json of parameters from the volume spec}
// DHV_OPERATION=delete
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
//
// Response on stdout is discarded.
//
// Must complete within 60 seconds (timeout on RPC)
func (p *HostVolumePluginExternal) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) error {

Expand All @@ -250,9 +280,9 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
return fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
"NODE_ID=" + req.NodeID,
"VOLUME_NAME=" + req.Name,
"PARAMETERS=" + string(params),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvParameters, params),
}

_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
Expand All @@ -263,8 +293,10 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
}

// runPlugin executes the... executable with these additional env vars:
// OPERATION={op}
// HOST_PATH={p.TargetPath/volID}
// DHV_OPERATION={op}
// DHV_HOST_PATH={path to create}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PLUGIN_DIR={path to directory containing plugins}
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
op, volID string, env []string) (stdout, stderr []byte, err error) {

Expand All @@ -279,8 +311,10 @@ func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
cmd := exec.CommandContext(ctx, p.Executable, op, path)

cmd.Env = append([]string{
"OPERATION=" + op,
"HOST_PATH=" + path,
fmt.Sprintf("%s=%s", EnvOperation, op),
fmt.Sprintf("%s=%s", EnvHostPath, path),
fmt.Sprintf("%s=%s", EnvVolumeID, volID),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
}, env...)

stdout, stderr, err = runCommand(cmd)
Expand Down
55 changes: 25 additions & 30 deletions client/hostvolumemanager/host_volume_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,23 @@ func TestNewHostVolumePluginExternal(t *testing.T) {
log := testlog.HCLogger(t)
var err error

_, err = NewHostVolumePluginExternal(log, "test-id", "non-existent", "target")
_, err = NewHostVolumePluginExternal(log, ".", "non-existent", "target")
must.ErrorIs(t, err, ErrPluginNotExists)

_, err = NewHostVolumePluginExternal(log, "test-id", "host_volume_plugin_test.go", "target")
_, err = NewHostVolumePluginExternal(log, ".", "host_volume_plugin_test.go", "target")
must.ErrorIs(t, err, ErrPluginNotExecutable)

t.Run("unix", func(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipped because windows") // db TODO(1.10.0)
}
p, err := NewHostVolumePluginExternal(log, "test-id", "./test_fixtures/test_plugin.sh", "test-target")
p, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", "test-target")
must.NoError(t, err)
must.Eq(t, &HostVolumePluginExternal{
ID: "test-id",
Executable: "./test_fixtures/test_plugin.sh",
ID: "test_plugin.sh",
Executable: "test_fixtures/test_plugin.sh",
TargetPath: "test-target",
PluginDir: "./test_fixtures",
log: log,
}, p)
})
Expand All @@ -115,65 +116,59 @@ func TestHostVolumePluginExternal(t *testing.T) {
t.Run("happy", func(t *testing.T) {

log, getLogs := logRecorder(t)
plug := &HostVolumePluginExternal{
ID: "test-external-plugin",
Executable: "./test_fixtures/test_plugin.sh",
TargetPath: tmp,
log: log,
}
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", tmp)
must.NoError(t, err)

// fingerprint
v, err := plug.Fingerprint(timeout(t))
must.NoError(t, err)
must.Eq(t, expectVersion, v.Version)
logged := getLogs()
must.NoError(t, err, must.Sprintf("logs: %s", logged))
must.Eq(t, expectVersion, v.Version, must.Sprintf("logs: %s", logged))

// create
resp, err := plug.Create(timeout(t),
&cstructs.ClientHostVolumeCreateRequest{
Name: "test-vol-name",
ID: volID,
NodeID: "test-node",
RequestedCapacityMinBytes: 5,
RequestedCapacityMaxBytes: 10,
Parameters: map[string]string{"key": "val"},
})
must.NoError(t, err)
logged = getLogs()
must.NoError(t, err, must.Sprintf("logs: %s", logged))

must.Eq(t, &HostVolumePluginCreateResponse{
Path: target,
SizeBytes: 5,
}, resp)
must.DirExists(t, target)
logged := getLogs()
must.StrContains(t, logged, "OPERATION=create") // stderr from `env`
must.StrContains(t, logged, `stdout="{`) // stdout from printf

// reset logger for next call
log, getLogs = logRecorder(t)
plug.log = log

// delete
err = plug.Delete(timeout(t),
&cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: volID,
NodeID: "test-node",
Parameters: map[string]string{"key": "val"},
})
must.NoError(t, err)
must.DirNotExists(t, target)
logged = getLogs()
must.NoError(t, err, must.Sprintf("logs: %s", logged))
must.DirNotExists(t, target)
must.StrContains(t, logged, "OPERATION=delete") // stderr from `env`
must.StrContains(t, logged, "removed directory") // stdout from `rm -v`
})

t.Run("sad", func(t *testing.T) {

log, getLogs := logRecorder(t)
plug := &HostVolumePluginExternal{
ID: "test-external-plugin-sad",
Executable: "./test_fixtures/test_plugin_sad.sh",
TargetPath: tmp,
log: log,
}
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin_sad.sh", tmp)
must.NoError(t, err)

v, err := plug.Fingerprint(timeout(t))
must.EqError(t, err, `error getting version from plugin "test-external-plugin-sad": exit status 1`)
must.EqError(t, err, `error getting version from plugin "test_plugin_sad.sh": exit status 1`)
must.Nil(t, v)
logged := getLogs()
must.StrContains(t, logged, "fingerprint: sad plugin is sad")
Expand All @@ -191,7 +186,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
RequestedCapacityMaxBytes: 10,
Parameters: map[string]string{"key": "val"},
})
must.EqError(t, err, `error creating volume "test-vol-id" with plugin "test-external-plugin-sad": exit status 1`)
must.EqError(t, err, `error creating volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`)
must.Nil(t, resp)
logged = getLogs()
must.StrContains(t, logged, "create: sad plugin is sad")
Expand All @@ -206,7 +201,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
NodeID: "test-node",
Parameters: map[string]string{"key": "val"},
})
must.EqError(t, err, `error deleting volume "test-vol-id" with plugin "test-external-plugin-sad": exit status 1`)
must.EqError(t, err, `error deleting volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`)
logged = getLogs()
must.StrContains(t, logged, "delete: sad plugin is sad")
must.StrContains(t, logged, "delete: it tells you all about it in stderr")
Expand Down
4 changes: 1 addition & 3 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -217,8 +216,7 @@ func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
return plug, nil
}
log := hvm.log.With("plugin_id", id)
path := filepath.Join(hvm.pluginDir, id)
return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir)
return NewHostVolumePluginExternal(log, hvm.pluginDir, id, hvm.sharedMountDir)
}

// restoreFromState loads all volumes from client state and runs Create for
Expand Down
1 change: 1 addition & 0 deletions client/hostvolumemanager/host_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ func logRecorder(t *testing.T) (hclog.Logger, func() string) {
return logger, func() string {
bts, err := io.ReadAll(buf)
test.NoError(t, err)
buf.Reset()
return string(bts)
}
}
Loading
Loading