Skip to content

Commit

Permalink
feat: enable external wasm runtime
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Jan 10, 2025
1 parent d2abc2a commit 562b80e
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 42 deletions.
21 changes: 11 additions & 10 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
const svcName = "proplet"

type config struct {
LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"PROPLET_INSTANCE_ID"`
MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"PROPLET_INSTANCE_ID"`
MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
ExternalWasmRuntime string `env:"PROPLET_EXTERNAL_WASM_RUNTIME" envDefault:""`
}

func main() {
Expand Down Expand Up @@ -59,7 +60,7 @@ func main() {

return
}
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime)

service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions manager/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ func updateTaskEndpoint(svc manager.Service) endpoint.Endpoint {
if !ok {
return taskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData)
}
if err := req.validate(); err != nil {
return taskResponse{}, errors.Join(apiutil.ErrValidation, err)
}

task, err := svc.UpdateTask(ctx, req.Task)
if err != nil {
Expand Down
15 changes: 1 addition & 14 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,24 +332,11 @@ func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]int
return errors.New("task id is empty")
}

results, ok := msg["results"].([]interface{})
if !ok {
return errors.New("invalid results")
}
data := make([]uint64, len(results))
for i := range results {
r, ok := results[i].(float64)
if !ok {
return errors.New("invalid result")
}
data[i] = uint64(r)
}

t, err := svc.GetTask(ctx, taskID)
if err != nil {
return err
}
t.Results = data
t.Results = msg["results"]
t.State = task.Completed
t.UpdatedAt = time.Now()
t.FinishTime = time.Now()
Expand Down
1 change: 1 addition & 0 deletions proplet/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type startRequest struct {
ID string
CLIArgs []string
FunctionName string
WasmFile []byte
imageURL string
Expand Down
5 changes: 3 additions & 2 deletions proplet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri

req := startRequest{
ID: payload.ID,
CLIArgs: payload.CLIArgs,
FunctionName: payload.Name,
WasmFile: payload.File,
imageURL: payload.ImageURL,
Expand All @@ -151,7 +152,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri
p.logger.Info("Received start command", slog.String("app_name", req.FunctionName))

if req.WasmFile != nil {
if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil {
if err := p.runtime.StartApp(ctx, req.WasmFile, req.CLIArgs, req.ID, req.FunctionName, req.Params...); err != nil {
return err
}

Expand All @@ -178,7 +179,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri
if exists && receivedChunks == metadata.TotalChunks {
p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.imageURL))
wasmBinary := assembleChunks(p.chunks[req.imageURL])
if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil {
if err := p.runtime.StartApp(ctx, wasmBinary, req.CLIArgs, req.ID, req.FunctionName, req.Params...); err != nil {
p.logger.Error("Failed to start app", slog.String("app_name", req.imageURL), slog.Any("error", err))
}

Expand Down
88 changes: 76 additions & 12 deletions proplet/wasm.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package proplet

import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"strconv"
"sync"

"github.com/absmach/propeller/pkg/mqtt"
Expand All @@ -15,28 +19,34 @@ import (
var resultsTopic = "channels/%s/messages/control/proplet/results"

type Runtime interface {
StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error
StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error
StopApp(ctx context.Context, id string) error
}

type wazeroRuntime struct {
mutex sync.Mutex
runtimes map[string]wazero.Runtime
pubsub mqtt.PubSub
channelID string
logger *slog.Logger
mutex sync.Mutex
runtimes map[string]wazero.Runtime
pubsub mqtt.PubSub
channelID string
logger *slog.Logger
hostWasmRuntime string
}

func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) Runtime {
func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID, hostWasmRuntime string) Runtime {
return &wazeroRuntime{
runtimes: make(map[string]wazero.Runtime),
pubsub: pubsub,
channelID: channelID,
logger: logger,
runtimes: make(map[string]wazero.Runtime),
pubsub: pubsub,
channelID: channelID,
logger: logger,
hostWasmRuntime: hostWasmRuntime,
}
}

func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error {
func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error {
if w.hostWasmRuntime != "" {
return w.runOnHostRuntime(ctx, wasmBinary, cliArgs, id, args...)
}

r := wazero.NewRuntime(ctx)

w.mutex.Lock()
Expand Down Expand Up @@ -104,3 +114,57 @@ func (w *wazeroRuntime) StopApp(ctx context.Context, id string) error {

return nil
}

func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, cliArgs []string, id string, args ...uint64) error {
currentDir, err := os.Getwd()
if err != nil {
return fmt.Errorf("error getting current directory: %w", err)
}
f, err := os.Create(fmt.Sprintf("%s/%s.wasm", currentDir, id))
if err != nil {
return fmt.Errorf("error creating file: %w", err)
}

if _, err = f.Write(wasmBinary); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
f.Close()

cliArgs = append(cliArgs, fmt.Sprintf("%s/%s.wasm", currentDir, id))
for i := range args {
cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10))
}
cmd := exec.Command(w.hostWasmRuntime, cliArgs...)
results := bytes.Buffer{}
cmd.Stdout = &results

if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting command: %w", err)
}

go func(fileName string) {
if err := cmd.Wait(); err != nil {
w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error()))
}

payload := map[string]interface{}{
"task_id": id,
"results": results.String(),
}

topic := fmt.Sprintf(resultsTopic, w.channelID)
if err := w.pubsub.Publish(ctx, topic, payload); err != nil {
w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error()))

return
}

if err := os.Remove(fileName); err != nil {
w.logger.Error("failed to remove file", slog.String("fileName", fileName), slog.String("error", err.Error()))
}

w.logger.Info("Finished running app", slog.String("id", id))
}(fmt.Sprintf("%s/%s.wasm", currentDir, id))

return nil
}
3 changes: 2 additions & 1 deletion task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type Task struct {
State State `json:"state"`
ImageURL string `json:"image_url,omitempty"`
File []byte `json:"file,omitempty"`
CLIArgs []string `json:"cli_args"`
Inputs []uint64 `json:"inputs,omitempty"`
Results []uint64 `json:"results,omitempty"`
Results any `json:"results,omitempty"`
StartTime time.Time `json:"start_time"`
FinishTime time.Time `json:"finish_time"`
CreatedAt time.Time `json:"created_at"`
Expand Down

0 comments on commit 562b80e

Please sign in to comment.