Skip to content

Commit

Permalink
Persistent Worker: introduce resource modifiers (#764)
Browse files Browse the repository at this point in the history
  • Loading branch information
edigaryev authored Aug 7, 2024
1 parent f615946 commit 95e9f68
Show file tree
Hide file tree
Showing 13 changed files with 389 additions and 24 deletions.
40 changes: 40 additions & 0 deletions PERSISTENT-WORKERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,46 @@ On `amd64`, simply replace the `image` with `ghcr.io/cirruslabs/ubuntu-runner-am

Currently only Tart and Vetu isolations are supported for standby.

## Resource modifiers

Resource modifiers allow you to change the behavior of the underlying isolation engine when a certain amount of resources is allocated to the task.

This comes in handy when you want to pass through one or multiple physical devices such as GPUs using e.g. [Vetu](https://github.com/cirruslabs/vetu), as `vetu run` expects a path to a PCI device in its `--device` argument.

As an example, let's say you've split a single physical GPU using vGPU or SR-IOV technology as follows:

* 1 ×️ 1/2 of GPU (PCI device `01:00.0`)
* 2 × 1/4 of GPU (PCI devices `02:00.0` and `03:00.0`)

You can then ensure that each task that runs on a Persistent Worker and asks for a `gpu` resource will get the corresponding vGPU that matches the requirements:

```yaml
token: <TOKEN>
name: "gpu-enabled-worker"
resources:
gpu: 1
resource-modifiers:
- match:
gpu: 0.5
append:
run: ["--device", "/sys/bus/pci/devices/0000:01:00.0/,iommu=on"]
- match:
gpu: 0.25
append:
run: ["--device", "/sys/bus/pci/devices/0000:02:00.0/,iommu=on"]
- match:
gpu: 0.25
append:
run: ["--device", "/sys/bus/pci/devices/0000:03:00.0/,iommu=on"]
```

Note that Persistent Worker keeps track of used resource modifiers and no `--device` will be passed to the `vetu run` more than once if a match occurs on a modifier that is already in use.

Keep this in mind when assigning `resources:` to a Persistent Worker and writing `resource-modifiers:`.

## Observability

Persistent worker produces some useful OpenTelemetry metrics. Metrics are scoped with `org.cirruslabs.persistent_worker` prefix and include information about resource utilization, running tasks and VM images used.
Expand Down
10 changes: 10 additions & 0 deletions internal/commands/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/cirruslabs/cirrus-cli/internal/executor/endpoint"
"github.com/cirruslabs/cirrus-cli/internal/worker"
"github.com/cirruslabs/cirrus-cli/internal/worker/resourcemodifier"
"github.com/cirruslabs/cirrus-cli/internal/worker/security"
"github.com/cirruslabs/cirrus-cli/internal/worker/upstream"
"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -37,6 +38,8 @@ type Config struct {
Security *security.Security `yaml:"security"`

Standby *worker.StandbyConfig `yaml:"standby"`

ResourceModifiers []*resourcemodifier.Modifier `yaml:"resource-modifiers"`
}

type ConfigLog struct {
Expand Down Expand Up @@ -215,6 +218,13 @@ func buildWorker(output io.Writer) (*worker.Worker, error) {
opts = append(opts, worker.WithStandby(standby))
}

// Configure resource modifiers
if len(config.ResourceModifiers) != 0 {
opts = append(opts, worker.WithResourceModifiersManager(
resourcemodifier.NewManager(config.ResourceModifiers...),
))
}

// Instantiate worker
return worker.New(opts...)
}
4 changes: 2 additions & 2 deletions internal/executor/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewFromProto(
Arguments: instance.Arguments,
}, nil
case *api.PersistentWorkerInstance:
return persistentworker.New(instance.Isolation, security.NoSecurityAllowAllVolumes(), logger)
return persistentworker.New(instance.Isolation, security.NoSecurityAllowAllVolumes(), nil, logger)
case *api.DockerBuilder:
// Ensures that we're not trying to run e.g. Windows-specific scripts on macOS
instanceOS := strings.ToLower(instance.Platform.String())
Expand All @@ -109,7 +109,7 @@ func NewFromProto(
Type: &api.Isolation_None_{
None: &api.Isolation_None{},
},
}, security.NoSecurity(), logger)
}, security.NoSecurity(), nil, logger)
case *api.MacOSInstance:
return tart.New(instance.Image, instance.User, instance.Password, 22,
instance.Cpu, instance.Memory, tart.WithLogger(logger))
Expand Down
22 changes: 15 additions & 7 deletions internal/executor/instance/persistentworker/isolation/vetu/vetu.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/runconfig"
"github.com/cirruslabs/cirrus-cli/internal/executor/platform"
"github.com/cirruslabs/cirrus-cli/internal/logger"
"github.com/cirruslabs/cirrus-cli/internal/worker/resourcemodifier"
"github.com/cirruslabs/echelon"
"github.com/getsentry/sentry-go"
"github.com/google/uuid"
Expand Down Expand Up @@ -41,6 +42,7 @@ type Vetu struct {
diskSize uint32
bridgedInterface string
hostNetworking bool
resourceModifier *resourcemodifier.Modifier

vm *VM
}
Expand All @@ -52,15 +54,17 @@ func New(
sshPort uint16,
cpu uint32,
memory uint32,
resourceModifier *resourcemodifier.Modifier,
opts ...Option,
) (*Vetu, error) {
vetu := &Vetu{
vmName: vmName,
sshUser: sshUser,
sshPassword: sshPassword,
sshPort: sshPort,
cpu: cpu,
memory: memory,
vmName: vmName,
sshUser: sshUser,
sshPassword: sshPassword,
sshPort: sshPort,
cpu: cpu,
memory: memory,
resourceModifier: resourceModifier,
}

// Apply options
Expand Down Expand Up @@ -113,7 +117,7 @@ func (vetu *Vetu) bootVM(

tmpVMName := vmNamePrefix + identToBeInjected + uuid.NewString()

vm, err := NewVMClonedFrom(ctx, vetu.vmName, tmpVMName, lazyPull, env, logger)
vm, err := NewVMClonedFrom(ctx, vetu.vmName, tmpVMName, lazyPull, env, vetu.resourceModifier, logger)
if err != nil {
return fmt.Errorf("%w: failed to create VM cloned from %q: %v", ErrFailed, vetu.vmName, err)
}
Expand Down Expand Up @@ -190,6 +194,10 @@ func (vetu *Vetu) WorkingDirectory(projectDir string, dirtyMode bool) string {
}

func (vetu *Vetu) Close(ctx context.Context) error {
if vetu.resourceModifier != nil {
defer vetu.resourceModifier.Unlock()
}

if vetu.vm == nil {
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions internal/executor/instance/persistentworker/isolation/vetu/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vetu

import (
"context"
"github.com/cirruslabs/cirrus-cli/internal/worker/resourcemodifier"
"github.com/cirruslabs/echelon"
"github.com/getsentry/sentry-go"
"strconv"
Expand All @@ -14,6 +15,8 @@ type VM struct {

env map[string]string

resourceModifier *resourcemodifier.Modifier

runningVMCtx context.Context
runningVMCtxCancel context.CancelFunc
wg sync.WaitGroup
Expand All @@ -26,13 +29,15 @@ func NewVMClonedFrom(
to string,
lazyPull bool,
env map[string]string,
resourceModifier *resourcemodifier.Modifier,
logger *echelon.Logger,
) (*VM, error) {
runningVMCtx, runningVMCtxCancel := context.WithCancel(context.Background())

vm := &VM{
ident: to,
env: env,
resourceModifier: resourceModifier,
runningVMCtx: runningVMCtx,
runningVMCtxCancel: runningVMCtxCancel,
errChan: make(chan error, 1),
Expand Down Expand Up @@ -131,6 +136,11 @@ func (vm *VM) Start(
args = append(args, "--net-host")
}

// Apply "run" resource modifier
if vm.resourceModifier != nil {
args = append(args, vm.resourceModifier.Append.Run...)
}

args = append(args, vm.ident)

stdout, stderr, err := Cmd(vm.runningVMCtx, vm.env, "run", args...)
Expand Down
19 changes: 15 additions & 4 deletions internal/executor/instance/persistentworker/persistentworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/persistentworker/isolation/tart"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/persistentworker/isolation/vetu"
"github.com/cirruslabs/cirrus-cli/internal/logger"
"github.com/cirruslabs/cirrus-cli/internal/worker/resourcemodifier"
"github.com/cirruslabs/cirrus-cli/internal/worker/security"
"github.com/cirruslabs/cirrus-cli/pkg/api"
"runtime"
Expand All @@ -18,7 +19,12 @@ import (

var ErrInvalidIsolation = errors.New("invalid isolation parameters")

func New(isolation *api.Isolation, security *security.Security, logger logger.Lightweight) (abstract.Instance, error) {
func New(
isolation *api.Isolation,
security *security.Security,
resourceModifier *resourcemodifier.Modifier,
logger logger.Lightweight,
) (abstract.Instance, error) {
if isolation == nil {
nonePolicy := security.NonePolicy()
if nonePolicy == nil {
Expand Down Expand Up @@ -67,7 +73,7 @@ func New(isolation *api.Isolation, security *security.Security, logger logger.Li
case *api.Isolation_Tart_:
return newTart(iso, security, logger)
case *api.Isolation_Vetu_:
return newVetu(iso, security, logger)
return newVetu(iso, security, resourceModifier, logger)
default:
return nil, fmt.Errorf("%w: unsupported isolation type %T", ErrInvalidIsolation, iso)
}
Expand Down Expand Up @@ -118,7 +124,12 @@ func newTart(iso *api.Isolation_Tart_, security *security.Security, logger logge
iso.Tart.Cpu, iso.Tart.Memory, opts...)
}

func newVetu(iso *api.Isolation_Vetu_, security *security.Security, logger logger.Lightweight) (*vetu.Vetu, error) {
func newVetu(
iso *api.Isolation_Vetu_,
security *security.Security,
resourceModifier *resourcemodifier.Modifier,
logger logger.Lightweight,
) (*vetu.Vetu, error) {
vetuPolicy := security.VetuPolicy()
if vetuPolicy == nil {
return nil, fmt.Errorf("%w: \"vetu\" isolation is not allowed by this Persistent Worker's "+
Expand Down Expand Up @@ -146,5 +157,5 @@ func newVetu(iso *api.Isolation_Vetu_, security *security.Security, logger logge
}

return vetu.New(iso.Vetu.Image, iso.Vetu.User, iso.Vetu.Password, uint16(iso.Vetu.Port),
iso.Vetu.Cpu, iso.Vetu.Memory, opts...)
iso.Vetu.Cpu, iso.Vetu.Memory, resourceModifier, opts...)
}
7 changes: 7 additions & 0 deletions internal/worker/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker

import (
"github.com/cirruslabs/cirrus-cli/internal/worker/resourcemodifier"
"github.com/cirruslabs/cirrus-cli/internal/worker/security"
"github.com/cirruslabs/cirrus-cli/internal/worker/upstream"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -43,3 +44,9 @@ func WithStandby(standby *StandbyConfig) Option {
e.standbyConfig = standby
}
}

func WithResourceModifiersManager(resourceModifiersManager *resourcemodifier.Manager) Option {
return func(e *Worker) {
e.resourceModifierManager = resourceModifiersManager
}
}
60 changes: 60 additions & 0 deletions internal/worker/resourcemodifier/resourcemodifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package resourcemodifier

import (
"sync"
)

type Modifier struct {
Match map[string]float64 `yaml:"match"`
Append Append `yaml:"append"`

sync.Mutex
}

type Append struct {
Run []string `yaml:"run"`
}

func (modifier *Modifier) Matches(requested map[string]float64) bool {
for matchKey, matchValue := range modifier.Match {
requestedValue, ok := requested[matchKey]
if !ok {
return false
}

if requestedValue > matchValue {
return false
}
}

return true
}

type Manager struct {
resourceModifiers []*Modifier

mtx sync.Mutex
}

func NewManager(resourceModifiers ...*Modifier) *Manager {
return &Manager{
resourceModifiers: resourceModifiers,
}
}

func (manager *Manager) Acquire(resources map[string]float64) *Modifier {
manager.mtx.Lock()
defer manager.mtx.Unlock()

for _, resourceModifier := range manager.resourceModifiers {
if !resourceModifier.Matches(resources) {
continue
}

if resourceModifier.TryLock() {
return resourceModifier
}
}

return nil
}
Loading

0 comments on commit 95e9f68

Please sign in to comment.