diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ce0bc0..b903b2e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,11 @@ jobs: version: v1.61.0 args: --config ./.golangci.yaml + - name: Install TinyGo + run: | + wget https://github.com/tinygo-org/tinygo/releases/download/v0.35.0/tinygo_0.35.0_amd64.deb + sudo dpkg -i tinygo_0.35.0_amd64.deb + - name: Build proxy run: | - make all + make all -j $(nproc) diff --git a/Makefile b/Makefile index 0bd41d5..d34bb3c 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ BUILD_DIR = build TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0') COMMIT ?= $(shell git rev-parse HEAD) -EXAMPLES = addition long-addition +EXAMPLES = addition compute hello-world SERVICES = manager proplet cli proxy define compile_service @@ -25,8 +25,8 @@ $(SERVICES): install: $(foreach f,$(wildcard $(BUILD_DIR)/*[!.wasm]),cp $(f) $(patsubst $(BUILD_DIR)/%,$(GOBIN)/propeller-%,$(f));) -.PHONY: all $(SERVICES) -all: $(SERVICES) +.PHONY: all $(SERVICES) $(EXAMPLES) +all: $(SERVICES) $(EXAMPLES) clean: rm -rf build diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index 2f1a09e..33be8cc 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -11,6 +11,7 @@ import ( "github.com/absmach/magistrala/pkg/server" "github.com/absmach/propeller/pkg/mqtt" "github.com/absmach/propeller/proplet" + "github.com/absmach/propeller/proplet/runtimes" "github.com/caarlos0/env/v11" "github.com/google/uuid" "golang.org/x/sync/errgroup" @@ -19,15 +20,16 @@ import ( const svcName = "proplet" type config struct { - LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"` - InstanceID string `env:"PROPLET_INSTANCE_ID"` - MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` - MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"` - MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"` - LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"` - ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` - ThingID string `env:"PROPLET_THING_ID,notEmpty"` - ThingKey string `env:"PROPLET_THING_KEY,notEmpty"` + LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"` + InstanceID string `env:"PROPLET_INSTANCE_ID"` + MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` + MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"` + MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"` + LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"` + ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` + ThingID string `env:"PROPLET_THING_ID,notEmpty"` + ThingKey string `env:"PROPLET_THING_KEY,notEmpty"` + ExternalWasmRuntime string `env:"PROPLET_EXTERNAL_WASM_RUNTIME" envDefault:""` } func main() { @@ -59,9 +61,16 @@ func main() { return } - wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) - service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero) + var runtime proplet.Runtime + switch cfg.ExternalWasmRuntime != "" { + case true: + runtime = runtimes.NewHostRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime) + default: + runtime = runtimes.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) + } + + service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, runtime) if err != nil { logger.Error("failed to initialize service", slog.Any("error", err)) diff --git a/examples/compute/compute.go b/examples/compute/compute.go new file mode 100644 index 0000000..d8fd2af --- /dev/null +++ b/examples/compute/compute.go @@ -0,0 +1,27 @@ +package main + +import "math" + +//export compute +func compute(n uint32) uint32 { + var result uint32 + + for i := range n { + for j := range n { + for k := range n { + for l := range n { + for m := range n { + // Do some meaningless but CPU-intensive math + result += uint32(math.Pow(float64(i*j*k*l*m), 2)) % 10 + } + } + } + } + } + + return result +} + +// main is required for the `wasi` target, even if it isn't used. +// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main +func main() {} diff --git a/examples/hello-world/hello-world.go b/examples/hello-world/hello-world.go new file mode 100644 index 0000000..b1b14d0 --- /dev/null +++ b/examples/hello-world/hello-world.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Println("Hello World!") +} diff --git a/examples/long-addition/long-addition.go b/examples/long-addition/long-addition.go deleted file mode 100644 index cf9ff05..0000000 --- a/examples/long-addition/long-addition.go +++ /dev/null @@ -1,13 +0,0 @@ -package main - -import "time" - -//export add -func add(x, y uint32) uint32 { - time.Sleep(time.Minute) - return x + y -} - -// main is required for the `wasi` target, even if it isn't used. -// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main -func main() {} diff --git a/go.mod b/go.mod index 37009f0..21dd315 100644 --- a/go.mod +++ b/go.mod @@ -32,12 +32,13 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.17.11 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -47,15 +48,13 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 // indirect go.opentelemetry.io/otel/metric v1.33.0 // indirect go.opentelemetry.io/otel/sdk v1.33.0 // indirect - go.opentelemetry.io/proto/otlp v1.4.0 // indirect - golang.org/x/net v0.32.0 // indirect - golang.org/x/sys v0.28.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect - google.golang.org/grpc v1.69.0 // indirect - google.golang.org/protobuf v1.36.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect + google.golang.org/grpc v1.69.2 // indirect + google.golang.org/protobuf v1.36.2 // indirect oras.land/oras-go/v2 v2.5.0 ) - -require github.com/opencontainers/go-digest v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3e47db0..a7c2ef0 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 h1:TmHmbvxPmaegwhDubVz0lICL0J5Ka2vwTzhoePEXsGE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0/go.mod h1:qztMSjm835F2bXf+5HKAPIS5qsmQDqZna/PgVt4rWtI= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -54,6 +54,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= @@ -100,26 +102,26 @@ go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4Jjx go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= -go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg= -go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= -golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= -golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 h1:ChAdCYNQFDk5fYvFZMywKLIijG7TC2m1C2CMEu11G3o= -google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484/go.mod h1:KRUmxRI4JmbpAm8gcZM4Jsffi859fo5LQjILwuqj9z8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI= -google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= +google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager/api/endpoint.go b/manager/api/endpoint.go index 6f469f5..d524b54 100644 --- a/manager/api/endpoint.go +++ b/manager/api/endpoint.go @@ -122,9 +122,6 @@ func updateTaskEndpoint(svc manager.Service) endpoint.Endpoint { if !ok { return taskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData) } - if err := req.validate(); err != nil { - return taskResponse{}, errors.Join(apiutil.ErrValidation, err) - } task, err := svc.UpdateTask(ctx, req.Task) if err != nil { diff --git a/manager/service.go b/manager/service.go index 658b0db..36a5743 100644 --- a/manager/service.go +++ b/manager/service.go @@ -332,28 +332,19 @@ func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]int return errors.New("task id is empty") } - results, ok := msg["results"].([]interface{}) - if !ok { - return errors.New("invalid results") - } - data := make([]uint64, len(results)) - for i := range results { - r, ok := results[i].(float64) - if !ok { - return errors.New("invalid result") - } - data[i] = uint64(r) - } - t, err := svc.GetTask(ctx, taskID) if err != nil { return err } - t.Results = data + t.Results = msg["results"] t.State = task.Completed t.UpdatedAt = time.Now() t.FinishTime = time.Now() + if errMsg, ok := msg["error"].(string); ok { + t.Error = errMsg + } + if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { return err } diff --git a/proplet/requests.go b/proplet/requests.go index 48a05b4..e3b2e35 100644 --- a/proplet/requests.go +++ b/proplet/requests.go @@ -6,6 +6,7 @@ import ( type startRequest struct { ID string + CLIArgs []string FunctionName string WasmFile []byte imageURL string diff --git a/proplet/runtime.go b/proplet/runtime.go new file mode 100644 index 0000000..8a11ddb --- /dev/null +++ b/proplet/runtime.go @@ -0,0 +1,12 @@ +package proplet + +import ( + "context" +) + +var ResultsTopic = "channels/%s/messages/control/proplet/results" + +type Runtime interface { + StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error + StopApp(ctx context.Context, id string) error +} diff --git a/proplet/runtimes/host.go b/proplet/runtimes/host.go new file mode 100644 index 0000000..ab2bc2a --- /dev/null +++ b/proplet/runtimes/host.go @@ -0,0 +1,98 @@ +package runtimes + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strconv" + + "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/proplet" +) + +type hostRuntime struct { + pubsub mqtt.PubSub + channelID string + logger *slog.Logger + wasmRuntime string +} + +func NewHostRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID, wasmRuntime string) proplet.Runtime { + return &hostRuntime{ + pubsub: pubsub, + channelID: channelID, + logger: logger, + wasmRuntime: wasmRuntime, + } +} + +func (w *hostRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error { + currentDir, err := os.Getwd() + if err != nil { + return fmt.Errorf("error getting current directory: %w", err) + } + f, err := os.Create(filepath.Join(currentDir, id+".wasm")) + if err != nil { + return fmt.Errorf("error creating file: %w", err) + } + + if _, err = f.Write(wasmBinary); err != nil { + return fmt.Errorf("error writing to file: %w", err) + } + if err := f.Close(); err != nil { + return fmt.Errorf("error closing file: %w", err) + } + + cliArgs = append(cliArgs, filepath.Join(currentDir, id+".wasm")) + for i := range args { + cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10)) + } + cmd := exec.Command(w.wasmRuntime, cliArgs...) + results := bytes.Buffer{} + cmd.Stdout = &results + + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting command: %w", err) + } + + go func(fileName string) { + var payload map[string]interface{} + + if err := cmd.Wait(); err != nil { + w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error())) + payload = map[string]interface{}{ + "task_id": id, + "error": err.Error(), + "results": results.String(), + } + } else { + payload = map[string]interface{}{ + "task_id": id, + "results": results.String(), + } + } + + topic := fmt.Sprintf(proplet.ResultsTopic, w.channelID) + if err := w.pubsub.Publish(ctx, topic, payload); err != nil { + w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) + + return + } + + if err := os.Remove(fileName); err != nil { + w.logger.Error("failed to remove file", slog.String("fileName", fileName), slog.String("error", err.Error())) + } + + w.logger.Info("Finished running app", slog.String("id", id)) + }(filepath.Join(currentDir, id+".wasm")) + + return nil +} + +func (w *hostRuntime) StopApp(ctx context.Context, id string) error { + return nil +} diff --git a/proplet/wasm.go b/proplet/runtimes/wazero.go similarity index 84% rename from proplet/wasm.go rename to proplet/runtimes/wazero.go index 1b9abc3..fe1f1d9 100644 --- a/proplet/wasm.go +++ b/proplet/runtimes/wazero.go @@ -1,4 +1,4 @@ -package proplet +package runtimes import ( "context" @@ -8,17 +8,11 @@ import ( "sync" "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/proplet" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) -var resultsTopic = "channels/%s/messages/control/proplet/results" - -type Runtime interface { - StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error - StopApp(ctx context.Context, id string) error -} - type wazeroRuntime struct { mutex sync.Mutex runtimes map[string]wazero.Runtime @@ -27,7 +21,7 @@ type wazeroRuntime struct { logger *slog.Logger } -func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) Runtime { +func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) proplet.Runtime { return &wazeroRuntime{ runtimes: make(map[string]wazero.Runtime), pubsub: pubsub, @@ -36,7 +30,7 @@ func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) } } -func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error { +func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error { r := wazero.NewRuntime(ctx) w.mutex.Lock() @@ -74,7 +68,7 @@ func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, fun "results": results, } - topic := fmt.Sprintf(resultsTopic, w.channelID) + topic := fmt.Sprintf(proplet.ResultsTopic, w.channelID) if err := w.pubsub.Publish(ctx, topic, payload); err != nil { w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) diff --git a/proplet/service.go b/proplet/service.go index 13b3d7b..efddabd 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -139,6 +139,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri req := startRequest{ ID: payload.ID, + CLIArgs: payload.CLIArgs, FunctionName: payload.Name, WasmFile: payload.File, imageURL: payload.ImageURL, @@ -151,7 +152,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri p.logger.Info("Received start command", slog.String("app_name", req.FunctionName)) if req.WasmFile != nil { - if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil { + if err := p.runtime.StartApp(ctx, req.WasmFile, req.CLIArgs, req.ID, req.FunctionName, req.Params...); err != nil { return err } @@ -178,7 +179,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri if exists && receivedChunks == metadata.TotalChunks { p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.imageURL)) wasmBinary := assembleChunks(p.chunks[req.imageURL]) - if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil { + if err := p.runtime.StartApp(ctx, wasmBinary, req.CLIArgs, req.ID, req.FunctionName, req.Params...); err != nil { p.logger.Error("Failed to start app", slog.String("app_name", req.imageURL), slog.Any("error", err)) } diff --git a/task/task.go b/task/task.go index fbac322..c834b4f 100644 --- a/task/task.go +++ b/task/task.go @@ -35,8 +35,10 @@ type Task struct { State State `json:"state"` ImageURL string `json:"image_url,omitempty"` File []byte `json:"file,omitempty"` + CLIArgs []string `json:"cli_args"` Inputs []uint64 `json:"inputs,omitempty"` - Results []uint64 `json:"results,omitempty"` + Results any `json:"results,omitempty"` + Error string `json:"error,omitempty"` StartTime time.Time `json:"start_time"` FinishTime time.Time `json:"finish_time"` CreatedAt time.Time `json:"created_at"`