diff --git a/.golangci.yaml b/.golangci.yaml index b473231..49823f9 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -2,6 +2,8 @@ run: timeout: 10m issues: + exclude-dirs: + - examples/ max-issues-per-linter: 100 max-same-issues: 100 @@ -30,6 +32,7 @@ linters: - dupl - err113 - noctx + - cyclop linters-settings: gocritic: diff --git a/Makefile b/Makefile index 5090a96..57467f8 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,8 @@ BUILD_DIR = build TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0') COMMIT ?= $(shell git rev-parse HEAD) +EXAMPLES = addition long-addition +SERVICES = manager proplet cli define compile_service CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) \ @@ -12,17 +14,21 @@ define compile_service -X 'github.com/absmach/magistrala.BuildTime=$(TIME)' \ -X 'github.com/absmach/magistrala.Version=$(VERSION)' \ -X 'github.com/absmach/magistrala.Commit=$(COMMIT)'" \ - -o ${BUILD_DIR}/propellerd cmd/propellerd/main.go + -o ${BUILD_DIR}/$(1) cmd/$(1)/main.go endef -.PHONY: build -build: - $(call compile_service) +$(SERVICES): + $(call compile_service,$(@)) install: - cp ${BUILD_DIR}/propellerd $(GOBIN)/propellerd + for file in $(BUILD_DIR)/*; do \ + if [[ ! "$$file" =~ \.wasm$$ ]]; then \ + cp "$$file" $(GOBIN)/propeller-`basename "$$file"`; \ + fi \ + done -all: build +.PHONY: all $(SERVICES) +all: $(SERVICES) clean: rm -rf build @@ -36,6 +42,9 @@ start-magistrala: stop-magistrala: docker compose -f docker/compose.yaml down +$(EXAMPLES): + GOOS=js GOARCH=wasm tinygo build -o build/$@.wasm -target wasi examples/$@/$@.go + help: @echo "Usage: make " @echo "" diff --git a/cmd/propellerd/main.go b/cmd/cli/main.go similarity index 72% rename from cmd/propellerd/main.go rename to cmd/cli/main.go index 2b5fb15..4bb4bf0 100644 --- a/cmd/propellerd/main.go +++ b/cmd/cli/main.go @@ -10,9 +10,9 @@ import ( func main() { rootCmd := &cobra.Command{ - Use: "propellerd", - Short: "Propeller Daemon", - Long: `Propeller Daemon is a daemon that manages the lifecycle of Propeller components.`, + Use: "propeller-cli", + Short: "Propeller CLI", + Long: `Propeller CLI is a command line interface for interacting with Propeller components.`, PersistentPreRun: func(_ *cobra.Command, _ []string) { sdkConf := sdk.Config{ ManagerURL: propellerd.DefManagerURL, @@ -23,10 +23,8 @@ func main() { }, } - managerCmd := propellerd.NewManagerCmd() tasksCmd := propellerd.NewTasksCmd() - rootCmd.AddCommand(managerCmd) rootCmd.AddCommand(tasksCmd) if err := rootCmd.Execute(); err != nil { diff --git a/cmd/manager/main.go b/cmd/manager/main.go new file mode 100644 index 0000000..da6a8c5 --- /dev/null +++ b/cmd/manager/main.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "fmt" + "log" + "log/slog" + "net/url" + "os" + "time" + + "github.com/absmach/magistrala/pkg/jaeger" + "github.com/absmach/magistrala/pkg/prometheus" + "github.com/absmach/magistrala/pkg/server" + httpserver "github.com/absmach/magistrala/pkg/server/http" + "github.com/absmach/propeller/manager" + "github.com/absmach/propeller/manager/api" + "github.com/absmach/propeller/manager/middleware" + "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/pkg/scheduler" + "github.com/absmach/propeller/pkg/storage" + "github.com/caarlos0/env/v11" + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" + "golang.org/x/sync/errgroup" +) + +const ( + svcName = "manager" + defHTTPPort = "7070" + envPrefixHTTP = "MANAGER_HTTP_" +) + +type config struct { + LogLevel string `env:"MANAGER_LOG_LEVEL" envDefault:"info"` + InstanceID string `env:"MANAGER_INSTANCE_ID"` + MQTTAddress string `env:"MANAGER_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` + MQTTQoS uint8 `env:"MANAGER_MQTT_QOS" envDefault:"2"` + MQTTTimeout time.Duration `env:"MANAGER_MQTT_TIMEOUT" envDefault:"30s"` + ChannelID string `env:"MANAGER_CHANNEL_ID,notEmpty"` + ThingID string `env:"MANAGER_THING_ID,notEmpty"` + ThingKey string `env:"MANAGER_THING_KEY,notEmpty"` + Server server.Config + OTELURL url.URL `env:"MANAGER_OTEL_URL"` + TraceRatio float64 `env:"MANAGER_TRACE_RATIO" envDefault:"0"` +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load configuration : %s", err.Error()) + } + + if cfg.InstanceID == "" { + cfg.InstanceID = uuid.NewString() + } + + var level slog.Level + if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { + log.Fatalf("failed to parse log level: %s", err.Error()) + } + logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + }) + logger := slog.New(logHandler) + slog.SetDefault(logger) + + var tp trace.TracerProvider + switch { + case cfg.OTELURL == (url.URL{}): + tp = noop.NewTracerProvider() + default: + sdktp, err := jaeger.NewProvider(ctx, svcName, cfg.OTELURL, "", cfg.TraceRatio) + if err != nil { + logger.Error("failed to initialize opentelemetry", slog.String("error", err.Error())) + + return + } + defer func() { + if err := sdktp.Shutdown(ctx); err != nil { + logger.Error("error shutting down tracer provider", slog.Any("error", err)) + } + }() + tp = sdktp + } + tracer := tp.Tracer(svcName) + + mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, svcName, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger) + if err != nil { + logger.Error("failed to initialize mqtt pubsub", slog.String("error", err.Error())) + + return + } + + svc := manager.NewService( + storage.NewInMemoryStorage(), + storage.NewInMemoryStorage(), + storage.NewInMemoryStorage(), + scheduler.NewRoundRobin(), + mqttPubSub, + cfg.ChannelID, + logger, + ) + svc = middleware.Logging(logger, svc) + svc = middleware.Tracing(tracer, svc) + counter, latency := prometheus.MakeMetrics(svcName, "api") + svc = middleware.Metrics(counter, latency, svc) + + if err := svc.Subscribe(ctx); err != nil { + logger.Error("failed to subscribe to manager channel", slog.String("error", err.Error())) + + return + } + + httpServerConfig := server.Config{Port: defHTTPPort} + if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error())) + + return + } + + hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger) + + g.Go(func() error { + return hs.Start() + }) + + g.Go(func() error { + return server.StopSignalHandler(ctx, cancel, logger, svcName, hs) + }) + + if err := g.Wait(); err != nil { + logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err)) + } +} diff --git a/cmd/manager/start.go b/cmd/manager/start.go deleted file mode 100644 index fe31451..0000000 --- a/cmd/manager/start.go +++ /dev/null @@ -1,111 +0,0 @@ -package manager - -import ( - "context" - "fmt" - "log/slog" - "net/url" - "os" - "time" - - "github.com/absmach/magistrala/pkg/jaeger" - "github.com/absmach/magistrala/pkg/prometheus" - "github.com/absmach/magistrala/pkg/server" - httpserver "github.com/absmach/magistrala/pkg/server/http" - "github.com/absmach/propeller/manager" - "github.com/absmach/propeller/manager/api" - "github.com/absmach/propeller/manager/middleware" - "github.com/absmach/propeller/pkg/mqtt" - "github.com/absmach/propeller/pkg/scheduler" - "github.com/absmach/propeller/pkg/storage" - "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/otel/trace/noop" - "golang.org/x/sync/errgroup" -) - -const svcName = "manager" - -type Config struct { - LogLevel string - OTELURL url.URL - TraceRatio float64 - Server server.Config - InstanceID string - ChannelID string - ThingID string - ThingKey string - MQTTAddress string - MQTTQOS uint8 - MQTTTimeout time.Duration -} - -func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) error { - g, ctx := errgroup.WithContext(ctx) - - var level slog.Level - if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { - return fmt.Errorf("failed to parse log level: %s", err.Error()) - } - logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: level, - }) - logger := slog.New(logHandler) - slog.SetDefault(logger) - - var tp trace.TracerProvider - switch { - case cfg.OTELURL == (url.URL{}): - tp = noop.NewTracerProvider() - default: - sdktp, err := jaeger.NewProvider(ctx, svcName, cfg.OTELURL, "", cfg.TraceRatio) - if err != nil { - return fmt.Errorf("failed to initialize opentelemetry: %s", err.Error()) - } - defer func() { - if err := sdktp.Shutdown(ctx); err != nil { - slog.Error("error shutting down tracer provider", slog.Any("error", err)) - } - }() - tp = sdktp - } - tracer := tp.Tracer(svcName) - - mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, logger) - if err != nil { - return fmt.Errorf("failed to initialize mqtt pubsub: %s", err.Error()) - } - - svc := manager.NewService( - storage.NewInMemoryStorage(), - storage.NewInMemoryStorage(), - storage.NewInMemoryStorage(), - scheduler.NewRoundRobin(), - mqttPubSub, - cfg.ChannelID, - logger, - ) - svc = middleware.Logging(logger, svc) - svc = middleware.Tracing(tracer, svc) - counter, latency := prometheus.MakeMetrics(svcName, "api") - svc = middleware.Metrics(counter, latency, svc) - - if err := svc.Subscribe(ctx); err != nil { - return fmt.Errorf("failed to subscribe to manager channel: %s", err.Error()) - } - - hs := httpserver.NewServer(ctx, cancel, svcName, cfg.Server, api.MakeHandler(svc, logger, cfg.InstanceID), logger) - - g.Go(func() error { - return hs.Start() - }) - - g.Go(func() error { - return server.StopSignalHandler(ctx, cancel, logger, svcName, hs) - }) - - if err := g.Wait(); err != nil { - logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err)) - } - - return nil -} diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index ef241d8..c6f5f92 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -2,158 +2,121 @@ package main import ( "context" - "errors" - "flag" "fmt" "log" "log/slog" "net/http" "os" - "os/signal" - "syscall" "time" + "github.com/absmach/magistrala/pkg/server" + "github.com/absmach/propeller/pkg/mqtt" "github.com/absmach/propeller/proplet" + "github.com/caarlos0/env/v11" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" ) -const registryTimeout = 30 * time.Second - -var ( - wasmFilePath string - wasmBinary []byte - logLevel slog.Level -) - -func main() { - if err := run(); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } +const svcName = "proplet" + +type config struct { + LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"` + InstanceID string `env:"PROPLET_INSTANCE_ID"` + MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` + MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"` + MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"` + LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"` + RegistryURL string `env:"PROPLET_REGISTRY_URL"` + RegistryToken string `env:"PROPLET_REGISTRY_TOKEN"` + RegistryTimeout time.Duration `env:"PROPLET_REGISTRY_TIMEOUT" envDefault:"30s"` + ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` + ThingID string `env:"PROPLET_THING_ID,notEmpty"` + ThingKey string `env:"PROPLET_THING_KEY,notEmpty"` } -func run() error { - flag.StringVar(&wasmFilePath, "file", "", "Path to the WASM file") - flag.Parse() - +func main() { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := configureLogger("info") - slog.SetDefault(logger) + g, ctx := errgroup.WithContext(ctx) - logger.Info("Starting Proplet service") - - go func() { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - sig := <-sigChan - logger.Info("Received shutdown signal", slog.String("signal", sig.String())) - cancel() - }() - - hasWASMFile := wasmFilePath != "" + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load configuration : %s", err.Error()) + } - cfg, err := proplet.LoadConfig("proplet/config.json", hasWASMFile) - if err != nil { - logger.Error("Failed to load configuration", slog.String("path", "proplet/config.json"), slog.Any("error", err)) + if cfg.InstanceID == "" { + cfg.InstanceID = uuid.NewString() + } - return fmt.Errorf("failed to load configuration: %w", err) + var level slog.Level + if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { + log.Fatalf("failed to parse log level: %s", err.Error()) } + logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + }) + logger := slog.New(logHandler) + slog.SetDefault(logger) if cfg.RegistryURL != "" { - if err := checkRegistryConnectivity(cfg.RegistryURL, logger); err != nil { - logger.Error("Failed connectivity check for Registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err)) + if err := checkRegistryConnectivity(ctx, cfg.RegistryURL, cfg.RegistryTimeout); err != nil { + logger.Error("failed to connect to registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err)) - return fmt.Errorf("registry connectivity check failed: %w", err) + return } - logger.Info("Registry connectivity verified", slog.String("url", cfg.RegistryURL)) - } - if hasWASMFile { - wasmBinary, err = loadWASMFile(wasmFilePath, logger) - if err != nil { - logger.Error("Failed to load WASM file", slog.String("wasm_file_path", wasmFilePath), slog.Any("error", err)) - - return fmt.Errorf("failed to load WASM file: %w", err) - } - logger.Info("WASM binary loaded at startup", slog.Int("size_bytes", len(wasmBinary))) + logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL)) } - if cfg.RegistryURL == "" && wasmBinary == nil { - logger.Error("Neither a registry URL nor a WASM binary file was provided") + mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.InstanceID, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger) + if err != nil { + logger.Error("failed to initialize mqtt client", slog.Any("error", err)) - return errors.New("missing registry URL and WASM binary file") + return } + wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) - service, err := proplet.NewService(ctx, cfg, wasmBinary, logger) + service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.RegistryURL, cfg.RegistryToken, cfg.LivelinessInterval, mqttPubSub, logger, wazero) if err != nil { - logger.Error("Error initializing service", slog.Any("error", err)) + logger.Error("failed to initialize service", slog.Any("error", err)) - return fmt.Errorf("service initialization error: %w", err) + return } if err := service.Run(ctx, logger); err != nil { - logger.Error("Error running service", slog.Any("error", err)) + logger.Error("failed to run service", slog.Any("error", err)) - return fmt.Errorf("service run error: %w", err) + return } - return nil -} - -func configureLogger(level string) *slog.Logger { - if err := logLevel.UnmarshalText([]byte(level)); err != nil { - log.Printf("Invalid log level: %s. Defaulting to info.\n", level) - logLevel = slog.LevelInfo - } - - logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: logLevel, + g.Go(func() error { + return server.StopSignalHandler(ctx, cancel, logger, svcName) }) - return slog.New(logHandler) -} - -func loadWASMFile(path string, logger *slog.Logger) ([]byte, error) { - logger.Info("Loading WASM file", slog.String("path", path)) - wasmBytes, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("failed to read WASM file: %w", err) + if err := g.Wait(); err != nil { + logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err)) } - - return wasmBytes, nil } -func checkRegistryConnectivity(registryURL string, logger *slog.Logger) error { - ctx, cancel := context.WithTimeout(context.Background(), registryTimeout) +func checkRegistryConnectivity(ctx context.Context, registryURL string, registryTimeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, registryTimeout) defer cancel() - client := http.Client{} - - logger.Info("Checking registry connectivity", slog.String("url", registryURL)) + client := http.DefaultClient req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody) if err != nil { - logger.Error("Failed to create HTTP request", slog.String("url", registryURL), slog.Any("error", err)) - return fmt.Errorf("failed to create HTTP request: %w", err) } resp, err := client.Do(req) if err != nil { - logger.Error("Failed to connect to registry", slog.String("url", registryURL), slog.Any("error", err)) - - return fmt.Errorf("failed to connect to registry URL '%s': %w", registryURL, err) + return fmt.Errorf("failed to connect to registry URL: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - logger.Error("Registry returned unexpected status", slog.String("url", registryURL), slog.Int("status_code", resp.StatusCode)) - - return fmt.Errorf("registry URL '%s' returned status: %s", registryURL, resp.Status) + return fmt.Errorf("registry returned unexpected status: %d", resp.StatusCode) } - logger.Info("Registry connectivity verified", slog.String("url", registryURL)) - return nil } diff --git a/docker/nats/nats.conf b/docker/nats/nats.conf index 10e590c..9ff090a 100644 --- a/docker/nats/nats.conf +++ b/docker/nats/nats.conf @@ -1,5 +1,6 @@ server_name: "nats_internal_broker" -max_payload: 1MB +max_payload: 64MB +max_pending: 64MB max_connections: 1M port: $MG_NATS_PORT http_port: $MG_NATS_HTTP_PORT diff --git a/examples/addition/addition.go b/examples/addition/addition.go new file mode 100644 index 0000000..6440801 --- /dev/null +++ b/examples/addition/addition.go @@ -0,0 +1,10 @@ +package main + +//export add +func add(x, y uint32) uint32 { + return x + y +} + +// main is required for the `wasi` target, even if it isn't used. +// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main +func main() {} diff --git a/examples/long-addition/long-addition.go b/examples/long-addition/long-addition.go new file mode 100644 index 0000000..cf9ff05 --- /dev/null +++ b/examples/long-addition/long-addition.go @@ -0,0 +1,13 @@ +package main + +import "time" + +//export add +func add(x, y uint32) uint32 { + time.Sleep(time.Minute) + return x + y +} + +// main is required for the `wasi` target, even if it isn't used. +// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main +func main() {} diff --git a/go.mod b/go.mod index bdb794d..f7aea4b 100644 --- a/go.mod +++ b/go.mod @@ -5,18 +5,19 @@ go 1.23.0 require ( github.com/0x6flab/namegenerator v1.4.0 github.com/absmach/magistrala v0.15.1 + github.com/caarlos0/env/v11 v11.3.0 github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/fatih/color v1.18.0 - github.com/go-chi/chi/v5 v5.1.0 + github.com/go-chi/chi/v5 v5.2.0 github.com/go-kit/kit v0.13.0 github.com/google/uuid v1.6.0 github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f github.com/prometheus/client_golang v1.20.5 github.com/spf13/cobra v1.8.1 github.com/tetratelabs/wazero v1.8.2 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 - go.opentelemetry.io/otel v1.32.0 - go.opentelemetry.io/otel/trace v1.32.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 + go.opentelemetry.io/otel v1.33.0 + go.opentelemetry.io/otel/trace v1.33.0 golang.org/x/sync v0.10.0 ) @@ -40,18 +41,17 @@ require ( github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 // indirect - go.opentelemetry.io/otel/metric v1.32.0 // indirect - go.opentelemetry.io/otel/sdk v1.32.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 // indirect + go.opentelemetry.io/otel/metric v1.33.0 // indirect + go.opentelemetry.io/otel/sdk v1.33.0 // indirect go.opentelemetry.io/proto/otlp v1.4.0 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect - google.golang.org/grpc v1.68.1 // indirect - google.golang.org/protobuf v1.35.2 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect + google.golang.org/grpc v1.69.0 // indirect + google.golang.org/protobuf v1.36.0 // indirect ) - -require github.com/gorilla/websocket v1.5.3 // indirect diff --git a/go.sum b/go.sum index eebac91..695655c 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,10 @@ github.com/absmach/magistrala v0.15.1 h1:3Bk2hlyWcV591LxPYwlvRcyCXTfuZ1g/EkNmU+o github.com/absmach/magistrala v0.15.1/go.mod h1:9pto6xuBt/IuCtZRdEha0iDQKNQ5tyNOjLXJgUiikYk= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3Kg= +github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc= +github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw= +github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -19,8 +23,8 @@ github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= -github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0= +github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU= github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= @@ -76,20 +80,24 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tetratelabs/wazero v1.8.2 h1:yIgLR/b2bN31bjxwXHD8a3d+BogigR952csSDdLYEv4= github.com/tetratelabs/wazero v1.8.2/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 h1:DheMAlT6POBP+gh8RUH19EOTnQIor5QE0uSRPtzCpSw= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0/go.mod h1:wZcGmeVO9nzP67aYSLDqXNWK87EZWhi7JWj1v7ZXf94= -go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= -go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI= -go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= -go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= -go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= -go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= -go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= -go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx69E/Y/DcQSMPpKANYVMQ7fBA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 h1:wpMfgF8E1rkrT1Z6meFh1NDtownE9Ii3n3X2GJYjsaU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0/go.mod h1:wAy0T/dUbs468uOlkT31xjvqQgEVXv58BRFWEgn5v/0= +go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= +go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= +go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= +go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg= go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= @@ -102,18 +110,14 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583 h1:v+j+5gpj0FopU0KKLDGfDo9ZRRpKdi5UBrCP0f76kuY= -google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= -google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 h1:CkkIfIt50+lT6NHAVoRYEyAvQGFM7xEwXUUywFvEb3Q= -google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576/go.mod h1:1R3kvZ1dtP3+4p4d3G8uJ8rFk/fWlScl38vanWACI08= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 h1:8ZmaLZE4XWrtU3MyClkYqqtl6Oegr3235h7jxsDyqCY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= -google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= -google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= -google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= -google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 h1:ChAdCYNQFDk5fYvFZMywKLIijG7TC2m1C2CMEu11G3o= +google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484/go.mod h1:KRUmxRI4JmbpAm8gcZM4Jsffi859fo5LQjILwuqj9z8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= +google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI= +google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= +google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job/job.go b/job/job.go deleted file mode 100644 index f005b11..0000000 --- a/job/job.go +++ /dev/null @@ -1,10 +0,0 @@ -package job - -import "github.com/absmach/propeller/task" - -type Job struct { - ID string - Name string - Description string - Tasks []task.Task -} diff --git a/manager/api/transport.go b/manager/api/transport.go index 19749eb..554ab9b 100644 --- a/manager/api/transport.go +++ b/manager/api/transport.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "io" "log/slog" "net/http" "strings" @@ -18,6 +19,11 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) +const ( + maxFileSize = 1024 * 1024 * 100 + fileKey = "file" +) + func MakeHandler(svc manager.Service, logger *slog.Logger, instanceID string) http.Handler { mux := chi.NewRouter() @@ -68,6 +74,12 @@ func MakeHandler(svc manager.Service, logger *slog.Logger, instanceID string) ht api.EncodeResponse, opts..., ), "update-task").ServeHTTP) + r.Put("/upload", otelhttp.NewHandler(kithttp.NewServer( + updateTaskEndpoint(svc), + decodeUploadTaskFileReq, + api.EncodeResponse, + opts..., + ), "upload-task-file").ServeHTTP) r.Delete("/", otelhttp.NewHandler(kithttp.NewServer( deleteTaskEndpoint(svc), decodeEntityReq("taskID"), @@ -107,6 +119,7 @@ func decodeTaskReq(_ context.Context, r *http.Request) (interface{}, error) { if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) { return nil, errors.Join(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType) } + var req taskReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { return nil, errors.Join(err, apiutil.ErrValidation) @@ -115,6 +128,30 @@ func decodeTaskReq(_ context.Context, r *http.Request) (interface{}, error) { return req, nil } +func decodeUploadTaskFileReq(_ context.Context, r *http.Request) (interface{}, error) { + var req taskReq + if err := r.ParseMultipartForm(maxFileSize); err != nil { + return nil, err + } + file, header, err := r.FormFile(fileKey) + if err != nil { + return nil, err + } + defer file.Close() + + if !strings.HasSuffix(header.Filename, ".wasm") { + return nil, errors.Join(apiutil.ErrValidation, errors.New("invalid file extension")) + } + data, err := io.ReadAll(file) + if err != nil { + return nil, err + } + req.File = data + req.Task.ID = chi.URLParam(r, "taskID") + + return req, nil +} + func decodeUpdateTaskReq(_ context.Context, r *http.Request) (interface{}, error) { if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) { return nil, errors.Join(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType) diff --git a/manager/service.go b/manager/service.go index 290bb3f..658b0db 100644 --- a/manager/service.go +++ b/manager/service.go @@ -149,11 +149,26 @@ func (svc *service) ListTasks(ctx context.Context, offset, limit uint64) (task.T } func (svc *service) UpdateTask(ctx context.Context, t task.Task) (task.Task, error) { - if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { + dbT, err := svc.GetTask(ctx, t.ID) + if err != nil { return task.Task{}, err } + dbT.UpdatedAt = time.Now() + if t.Name != "" { + dbT.Name = t.Name + } + if t.Inputs != nil { + dbT.Inputs = t.Inputs + } + if t.File != nil { + dbT.File = t.File + } - return t, nil + if err := svc.tasksDB.Update(ctx, dbT.ID, dbT); err != nil { + return task.Task{}, err + } + + return dbT, nil } func (svc *service) DeleteTask(ctx context.Context, taskID string) error { @@ -166,13 +181,13 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error { return err } - p, err := svc.SelectProplet(ctx, t) - if err != nil { + topic := svc.baseTopic + "/control/manager/start" + if err := svc.pubsub.Publish(ctx, topic, t); err != nil { return err } - topic := "channels/" + p.ID + "/messages/control/manager/start" - if err := svc.pubsub.Publish(ctx, topic, t); err != nil { + p, err := svc.SelectProplet(ctx, t) + if err != nil { return err } @@ -185,6 +200,13 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error { return err } + t.State = task.Running + t.UpdatedAt = time.Now() + t.StartTime = time.Now() + if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { + return err + } + return nil } @@ -207,7 +229,7 @@ func (svc *service) StopTask(ctx context.Context, taskID string) error { return err } - topic := "channels/" + p.ID + "/messages/control/manager/stop" + topic := svc.baseTopic + "/control/manager/stop" if err := svc.pubsub.Publish(ctx, topic, t); err != nil { return err } @@ -244,6 +266,8 @@ func (svc *service) handle(ctx context.Context) func(topic string, msg map[strin svc.logger.InfoContext(ctx, "successfully created proplet") case svc.baseTopic + "/control/proplet/alive": return svc.updateLivenessHandler(ctx, msg) + case svc.baseTopic + "/control/proplet/results": + return svc.updateResultsHandler(ctx, msg) } return nil @@ -280,6 +304,9 @@ func (svc *service) updateLivenessHandler(ctx context.Context, msg map[string]in } p, err := svc.GetProplet(ctx, propletID) + if errors.Is(err, pkgerrors.ErrNotFound) { + return svc.createPropletHandler(ctx, msg) + } if err != nil { return err } @@ -295,3 +322,41 @@ func (svc *service) updateLivenessHandler(ctx context.Context, msg map[string]in return nil } + +func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]interface{}) error { + taskID, ok := msg["task_id"].(string) + if !ok { + return errors.New("invalid task_id") + } + if taskID == "" { + return errors.New("task id is empty") + } + + results, ok := msg["results"].([]interface{}) + if !ok { + return errors.New("invalid results") + } + data := make([]uint64, len(results)) + for i := range results { + r, ok := results[i].(float64) + if !ok { + return errors.New("invalid result") + } + data[i] = uint64(r) + } + + t, err := svc.GetTask(ctx, taskID) + if err != nil { + return err + } + t.Results = data + t.State = task.Completed + t.UpdatedAt = time.Now() + t.FinishTime = time.Now() + + if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { + return err + } + + return nil +} diff --git a/node/node.go b/node/node.go deleted file mode 100644 index 0c39d2c..0000000 --- a/node/node.go +++ /dev/null @@ -1,40 +0,0 @@ -package node - -import ( - "github.com/google/uuid" -) - -type Role uint8 - -const ( - ManagerRole Role = iota - PropletRole -) - -type Node struct { - ID string `json:"id"` - Name string `json:"name"` - URL string `json:"url"` - Memory uint64 `json:"memory"` - MemoryAllocated uint64 `json:"memory_allocated"` - Disk uint64 `json:"disk"` - DiskAllocated uint64 `json:"disk_allocated"` - TaskCount uint64 `json:"task_count"` - Role Role `json:"role"` -} - -type NodePage struct { - Offset uint64 `json:"offset"` - Limit uint64 `json:"limit"` - Total uint64 `json:"total"` - Nodes []Node `json:"nodes"` -} - -func NewNode(name, url string, role Role) *Node { - return &Node{ - ID: uuid.NewString(), - Name: name, - URL: url, - Role: role, - } -} diff --git a/pkg/mqtt/pubsub.go b/pkg/mqtt/pubsub.go index 4b3cdee..ca4b415 100644 --- a/pkg/mqtt/pubsub.go +++ b/pkg/mqtt/pubsub.go @@ -12,12 +12,14 @@ import ( ) var ( - errConnect = errors.New("failed to connect to MQTT broker") errPublishTimeout = errors.New("failed to publish due to timeout reached") errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") errEmptyTopic = errors.New("empty topic") errEmptyID = errors.New("empty ID") + + aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" + lwtPayloadTemplate = `{"status":"offline","proplet_id":"%s","mg_channel_id":"%s"}` ) type pubsub struct { @@ -35,12 +37,12 @@ type PubSub interface { Unsubscribe(ctx context.Context, topic string) error } -func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, logger *slog.Logger) (PubSub, error) { +func NewPubSub(url string, qos byte, id, username, password, channelID string, timeout time.Duration, logger *slog.Logger) (PubSub, error) { if id == "" { return nil, errEmptyID } - client, err := newClient(url, id, username, password, timeout) + client, err := newClient(url, id, username, password, channelID, timeout, logger) if err != nil { return nil, err } @@ -112,21 +114,49 @@ func (ps *pubsub) Close() error { return nil } -func newClient(address, id, username, password string, timeout time.Duration) (mqtt.Client, error) { +func newClient(address, id, username, password, channelID string, timeout time.Duration, logger *slog.Logger) (mqtt.Client, error) { opts := mqtt.NewClientOptions(). + AddBroker(address). + SetClientID(id). SetUsername(username). SetPassword(password). - AddBroker(address). - SetClientID(id) + SetCleanSession(true) + if channelID != "" { + topic := fmt.Sprintf(aliveTopicTemplate, channelID) + lwtPayload := fmt.Sprintf(lwtPayloadTemplate, username, channelID) + opts.SetWill(topic, lwtPayload, 0, false) + } + + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + args := []any{} + if err != nil { + args = append(args, slog.Any("error", err)) + } + + logger.Info("MQTT connection lost", args...) + }) + + opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) { + args := []any{} + if options != nil { + args = append(args, + slog.String("client_id", options.ClientID), + slog.String("username", options.Username), + ) + } + + logger.Info("MQTT reconnecting", args...) + }) + client := mqtt.NewClient(opts) token := client.Connect() if token.Error() != nil { - return nil, token.Error() + return nil, errors.Join(errors.New("failed to connect to MQTT broker"), token.Error()) } if ok := token.WaitTimeout(timeout); !ok { - return nil, errConnect + return nil, errors.New("timeout reached while connecting to MQTT broker") } return client, nil diff --git a/propellerd/manager.go b/propellerd/manager.go deleted file mode 100644 index fcc617a..0000000 --- a/propellerd/manager.go +++ /dev/null @@ -1,127 +0,0 @@ -package propellerd - -import ( - "context" - "log/slog" - "time" - - "github.com/absmach/magistrala/pkg/server" - "github.com/absmach/propeller/cmd/manager" - "github.com/spf13/cobra" -) - -var ( - logLevel = "info" - port = "7070" - channelID = "" - thingID = "" - thingKey = "" - mqttAddress = "tcp://localhost:1883" - mqttQOS = 2 - mqttTimeout = 30 * time.Second -) - -var managerCmd = []cobra.Command{ - { - Use: "start", - Short: "Start manager", - Long: `Start manager.`, - Run: func(cmd *cobra.Command, _ []string) { - cfg := manager.Config{ - LogLevel: logLevel, - Server: server.Config{ - Port: port, - }, - ChannelID: channelID, - ThingID: thingID, - ThingKey: thingKey, - MQTTAddress: mqttAddress, - MQTTQOS: uint8(mqttQOS), - MQTTTimeout: mqttTimeout, - } - ctx, cancel := context.WithCancel(cmd.Context()) - if err := manager.StartManager(ctx, cancel, cfg); err != nil { - slog.Error("failed to start manager", slog.String("error", err.Error())) - } - cancel() - }, - }, -} - -func NewManagerCmd() *cobra.Command { - cmd := cobra.Command{ - Use: "manager [start]", - Short: "Manager management", - Long: `Create manager for Propeller.`, - } - - for i := range managerCmd { - cmd.AddCommand(&managerCmd[i]) - } - - cmd.PersistentFlags().StringVarP( - &logLevel, - "log-level", - "l", - logLevel, - "Log level", - ) - - cmd.PersistentFlags().StringVarP( - &port, - "port", - "p", - port, - "Manager HTTP Server Port", - ) - - cmd.PersistentFlags().StringVarP( - &channelID, - "channel-id", - "c", - channelID, - "Manager Channel ID", - ) - - cmd.PersistentFlags().StringVarP( - &thingID, - "thing-id", - "t", - thingID, - "Manager Thing ID", - ) - - cmd.PersistentFlags().StringVarP( - &thingKey, - "thing-key", - "k", - thingKey, - "Thing Key", - ) - - cmd.PersistentFlags().StringVarP( - &mqttAddress, - "mqtt-address", - "m", - mqttAddress, - "MQTT Address", - ) - - cmd.PersistentFlags().IntVarP( - &mqttQOS, - "mqtt-qos", - "q", - mqttQOS, - "MQTT QOS", - ) - - cmd.PersistentFlags().DurationVarP( - &mqttTimeout, - "mqtt-timeout", - "o", - mqttTimeout, - "MQTT Timeout", - ) - - return &cmd -} diff --git a/proplet/api/requests.go b/proplet/api/requests.go deleted file mode 100644 index b9dbfa3..0000000 --- a/proplet/api/requests.go +++ /dev/null @@ -1,90 +0,0 @@ -package api - -import ( - "fmt" - - pkgerrors "github.com/absmach/propeller/pkg/errors" -) - -type StartRequest struct { - AppName string `json:"app_name"` - Params []string `json:"params"` -} - -func (r StartRequest) Validate() error { - if r.AppName == "" { - return fmt.Errorf("start request: app_name is required but missing: %w", pkgerrors.ErrMissingAppName) - } - - return nil -} - -type StopRequest struct { - AppName string `json:"app_name"` -} - -func (r StopRequest) Validate() error { - if r.AppName == "" { - return fmt.Errorf("stop request: app_name is required but missing: %w", pkgerrors.ErrMissingAppName) - } - - return nil -} - -type RPCRequest struct { - Method string `json:"method"` - Params []interface{} `json:"params"` - ID int `json:"id"` -} - -func (r RPCRequest) Validate() error { - if r.Method == "" { - return fmt.Errorf("RPC request: method is required but missing: %w", pkgerrors.ErrInvalidMethod) - } - if len(r.Params) == 0 { - return fmt.Errorf("RPC request: params are required but missing: %w", pkgerrors.ErrInvalidParams) - } - - return nil -} - -func (r RPCRequest) ParseParams() (interface{}, error) { - switch r.Method { - case "start": - if len(r.Params) < 1 { - return nil, fmt.Errorf("start method: missing required parameters: %w", pkgerrors.ErrInvalidParams) - } - appName, ok := r.Params[0].(string) - if !ok || appName == "" { - return nil, fmt.Errorf("start method: invalid app_name parameter: %w", pkgerrors.ErrInvalidParams) - } - - return StartRequest{ - AppName: appName, - Params: parseStringSlice(r.Params[1:]), - }, nil - case "stop": - if len(r.Params) < 1 { - return nil, fmt.Errorf("stop method: missing required parameters: %w", pkgerrors.ErrInvalidParams) - } - appName, ok := r.Params[0].(string) - if !ok || appName == "" { - return nil, fmt.Errorf("stop method: invalid app_name parameter: %w", pkgerrors.ErrInvalidParams) - } - - return StopRequest{AppName: appName}, nil - default: - return nil, fmt.Errorf("unknown method '%s': %w", r.Method, pkgerrors.ErrInvalidMethod) - } -} - -func parseStringSlice(params []interface{}) []string { - result := []string{} - for _, param := range params { - if str, ok := param.(string); ok { - result = append(result, str) - } - } - - return result -} diff --git a/proplet/api/responses.go b/proplet/api/responses.go deleted file mode 100644 index dba7770..0000000 --- a/proplet/api/responses.go +++ /dev/null @@ -1,73 +0,0 @@ -package api - -import ( - "fmt" - - pkgerrors "github.com/absmach/propeller/pkg/errors" -) - -type Response struct { - Status string `json:"status"` - Error string `json:"error,omitempty"` -} - -func (r Response) Validate() error { - if r.Status == "" { - return fmt.Errorf("response: status is required but missing: %w", pkgerrors.ErrMissingValue) - } - if r.Status != "success" && r.Status != "failure" { - return fmt.Errorf("response: invalid status '%s': %w", r.Status, pkgerrors.ErrInvalidStatus) - } - if r.Status == "failure" && r.Error == "" { - return fmt.Errorf("response: error message is required for failure status: %w", pkgerrors.ErrInvalidValue) - } - - return nil -} - -type RPCResponse struct { - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` - ID int `json:"id"` -} - -func (r RPCResponse) Validate() error { - if r.ID == 0 { - return fmt.Errorf("RPC response: ID is required but missing or zero: %w", pkgerrors.ErrMissingValue) - } - if r.Error != "" && r.Result != "" { - return fmt.Errorf("RPC response: both result and error cannot be set simultaneously: %w", pkgerrors.ErrInvalidValue) - } - if r.Error == "" && r.Result == "" { - return fmt.Errorf("RPC response: result or error must be set: %w", pkgerrors.ErrMissingResult) - } - - return nil -} - -func NewSuccessResponse() Response { - return Response{ - Status: "success", - } -} - -func NewFailureResponse(err error) Response { - return Response{ - Status: "failure", - Error: err.Error(), - } -} - -func NewRPCSuccessResponse(id int, result string) RPCResponse { - return RPCResponse{ - ID: id, - Result: result, - } -} - -func NewRPCFailureResponse(id int, err error) RPCResponse { - return RPCResponse{ - ID: id, - Error: err.Error(), - } -} diff --git a/proplet/config.go b/proplet/config.go deleted file mode 100644 index aa3ebb2..0000000 --- a/proplet/config.go +++ /dev/null @@ -1,70 +0,0 @@ -package proplet - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "os" -) - -type Config struct { - BrokerURL string `json:"broker_url"` - Password string `json:"password"` - PropletID string `json:"proplet_id"` - ChannelID string `json:"channel_id"` - RegistryURL string `json:"registry_url"` - RegistryToken string `json:"registry_token"` -} - -func LoadConfig(filepath string, hasWASMFile bool) (Config, error) { - file, err := os.Open(filepath) - if err != nil { - return Config{}, fmt.Errorf("unable to open configuration file '%s': %w", filepath, err) - } - defer file.Close() - - var config Config - decoder := json.NewDecoder(file) - if err := decoder.Decode(&config); err != nil { - return Config{}, fmt.Errorf("failed to parse configuration file '%s': %w", filepath, err) - } - - if err := config.Validate(hasWASMFile); err != nil { - return Config{}, fmt.Errorf("configuration validation failed: %w", err) - } - - return config, nil -} - -func (c Config) Validate(hasWASMFile bool) error { - if c.BrokerURL == "" { - return errors.New("broker_url is required") - } - if _, err := url.Parse(c.BrokerURL); err != nil { - return fmt.Errorf("broker_url is not a valid URL: %w", err) - } - if c.Password == "" { - return errors.New("password is required") - } - if c.PropletID == "" { - return errors.New("proplet_id is required") - } - if c.ChannelID == "" { - return errors.New("channel_id is required") - } - if hasWASMFile { - return nil - } - if c.RegistryURL == "" { - return errors.New("registry_url is required when not using a WASM file") - } - if _, err := url.Parse(c.RegistryURL); err != nil { - return fmt.Errorf("registry_url is not a valid URL: %w", err) - } - if c.RegistryToken == "" { - return errors.New("registry_token is required when not using a WASM file") - } - - return nil -} diff --git a/proplet/config.json b/proplet/config.json deleted file mode 100644 index 4717fb5..0000000 --- a/proplet/config.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "broker_url": "mqtt://localhost:1883", - "password": "example-password", - "proplet_id": "proplet-1", - "channel_id": "channel-1", - "registry_url": "", - "registry_token": "" -} diff --git a/proplet/mqtt.go b/proplet/mqtt.go deleted file mode 100644 index cbc5806..0000000 --- a/proplet/mqtt.go +++ /dev/null @@ -1,136 +0,0 @@ -package proplet - -import ( - "encoding/json" - "fmt" - "log/slog" - "time" - - pkgerrors "github.com/absmach/propeller/pkg/errors" - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -const livelinessInterval = 10 * time.Second - -var ( - RegistryFailurePayload = `{"status":"failure","error":"%v"}` - RegistrySuccessPayload = `{"status":"success"}` - RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" - lwtPayloadTemplate = `{"status":"offline","proplet_id":"%s","chan_id":"%s"}` - discoveryPayloadTemplate = `{"proplet_id":"%s","chan_id":"%s"}` - alivePayloadTemplate = `{"status":"alive","proplet_id":"%s","chan_id":"%s"}` - aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" - discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" - startTopicTemplate = "channels/%s/messages/control/manager/start" - stopTopicTemplate = "channels/%s/messages/control/manager/stop" - registryUpdateTopicTemplate = "channels/%s/messages/control/manager/updateRegistry" - registryResponseTopic = "channels/%s/messages/registry/server" - fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" -) - -func NewMQTTClient(config Config, logger *slog.Logger) (mqtt.Client, error) { - lwtPayload := fmt.Sprintf(lwtPayloadTemplate, config.PropletID, config.ChannelID) - if lwtPayload == "" { - return nil, fmt.Errorf("failed to prepare MQTT last will payload: %w", pkgerrors.ErrMQTTWillPayloadFailed) - } - - opts := mqtt.NewClientOptions(). - AddBroker(config.BrokerURL). - SetClientID("Proplet-"+config.PropletID). - SetUsername(config.PropletID). - SetPassword(config.Password). - SetCleanSession(true). - SetWill(aliveTopicTemplate+config.ChannelID, lwtPayloadTemplate+config.PropletID+config.ChannelID, 0, false) - - opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { - logger.Error("MQTT connection lost", slog.Any("error", err)) - }) - - opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) { - logger.Info("MQTT reconnecting") - }) - - client := mqtt.NewClient(opts) - password := client.Connect() - if password.Wait() && password.Error() != nil { - return nil, fmt.Errorf("failed to connect to MQTT broker '%s': %w", config.BrokerURL, pkgerrors.ErrMQTTConnectionFailed) - } - - logger.Info("MQTT client connected successfully", slog.String("broker_url", config.BrokerURL)) - - if err := PublishDiscovery(client, config, logger); err != nil { - return nil, fmt.Errorf("failed to publish discovery message: %w", err) - } - - go startLivelinessUpdates(client, config, logger) - - return client, nil -} - -func PublishDiscovery(client mqtt.Client, config Config, logger *slog.Logger) error { - topic := fmt.Sprintf(discoveryTopicTemplate, config.ChannelID) - payload := fmt.Sprintf(discoveryPayloadTemplate, config.PropletID, config.ChannelID) - password := client.Publish(topic, 0, false, payload) - password.Wait() - if password.Error() != nil { - return fmt.Errorf("failed to publish discovery message: %w", password.Error()) - } - - return nil -} - -func startLivelinessUpdates(client mqtt.Client, config Config, logger *slog.Logger) { - ticker := time.NewTicker(livelinessInterval) - defer ticker.Stop() - - for range ticker.C { - password := client.Publish(fmt.Sprintf(aliveTopicTemplate, config.ChannelID), 0, false, fmt.Sprintf(alivePayloadTemplate, config.PropletID, config.ChannelID)) - password.Wait() - if password.Error() != nil { - logger.Error("Failed to publish liveliness message", slog.String("topic", fmt.Sprintf(aliveTopicTemplate, config.ChannelID)), slog.Any("error", password.Error())) - } else { - logger.Info("Published liveliness message", slog.String("topic", fmt.Sprintf(aliveTopicTemplate, config.ChannelID))) - } - } -} - -func SubscribeToManagerTopics(client mqtt.Client, config Config, startHandler, stopHandler, registryHandler mqtt.MessageHandler, logger *slog.Logger) error { - if password := client.Subscribe(fmt.Sprintf(startTopicTemplate, config.ChannelID), 0, startHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to start topic: %w", password.Error()) - } - - if password := client.Subscribe(fmt.Sprintf(stopTopicTemplate, config.ChannelID), 0, stopHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to stop topic: %w", password.Error()) - } - - if password := client.Subscribe(fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID), 0, registryHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to registry update topic: %w", password.Error()) - } - - logger.Info("Subscribed to Manager topics", - slog.String("start_topic", fmt.Sprintf(startTopicTemplate, config.ChannelID)), - slog.String("stop_topic", fmt.Sprintf(stopTopicTemplate, config.ChannelID)), - slog.String("registry_update_topic", fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID))) - - return nil -} - -func SubscribeToRegistryTopic(client mqtt.Client, channelID string, handler mqtt.MessageHandler, logger *slog.Logger) error { - if password := client.Subscribe(fmt.Sprintf(registryResponseTopic, channelID), 0, handler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to registry topic '%s': %w", fmt.Sprintf(registryResponseTopic, channelID), password.Error()) - } - - return nil -} - -func PublishFetchRequest(client mqtt.Client, channelID, appName string, logger *slog.Logger) error { - payload, err := json.Marshal(map[string]string{"app_name": appName}) - if err != nil { - return fmt.Errorf("failed to marshal fetch request payload: %w", err) - } - if password := client.Publish(fmt.Sprintf(fetchRequestTopicTemplate, channelID), 0, false, payload); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to publish fetch request: %w", password.Error()) - } - - return nil -} diff --git a/proplet/worker.go b/proplet/proplet.go similarity index 100% rename from proplet/worker.go rename to proplet/proplet.go diff --git a/proplet/requests.go b/proplet/requests.go new file mode 100644 index 0000000..cbd0c2a --- /dev/null +++ b/proplet/requests.go @@ -0,0 +1,36 @@ +package proplet + +import "errors" + +type startRequest struct { + ID string + FunctionName string + WasmFile []byte + Params []uint64 +} + +func (r startRequest) Validate() error { + if r.ID == "" { + return errors.New("id is required") + } + if r.FunctionName == "" { + return errors.New("function name is required") + } + if r.WasmFile == nil { + return errors.New("wasm file is required") + } + + return nil +} + +type stopRequest struct { + ID string +} + +func (r stopRequest) Validate() error { + if r.ID == "" { + return errors.New("id is required") + } + + return nil +} diff --git a/proplet/service.go b/proplet/service.go index 5a8d8dc..3f83bfe 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -8,15 +8,11 @@ import ( "log" "log/slog" "net/url" - "os" "sync" "time" - pkgerrors "github.com/absmach/propeller/pkg/errors" - propletapi "github.com/absmach/propeller/proplet/api" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/tetratelabs/wazero" - wazeroapi "github.com/tetratelabs/wazero/api" + pkgmqtt "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/task" ) const ( @@ -24,15 +20,32 @@ const ( pollingInterval = 5 * time.Second ) +var ( + RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" + updateRegistryTopicTemplate = "channels/%s/messages/control/manager/update" + aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" + discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" + startTopicTemplate = "channels/%s/messages/control/manager/start" + stopTopicTemplate = "channels/%s/messages/control/manager/stop" + registryResponseTopic = "channels/%s/messages/registry/server" + fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" +) + type PropletService struct { - config Config - mqttClient mqtt.Client - runtime *WazeroRuntime - wasmBinary []byte - chunks map[string][][]byte - chunkMetadata map[string]*ChunkPayload - chunksMutex sync.Mutex + channelID string + thingID string + thingKey string + registryURL string + registryToken string + livelinessInterval time.Duration + pubsub pkgmqtt.PubSub + chunks map[string][][]byte + chunkMetadata map[string]*ChunkPayload + chunksMutex sync.Mutex + runtime Runtime + logger *slog.Logger } + type ChunkPayload struct { AppName string `json:"app_name"` ChunkIdx int `json:"chunk_idx"` @@ -40,121 +53,81 @@ type ChunkPayload struct { Data []byte `json:"data"` } -type WazeroRuntime struct { - runtime wazero.Runtime - modules map[string]wazeroapi.Module - mutex sync.Mutex -} - -func NewWazeroRuntime(ctx context.Context) *WazeroRuntime { - return &WazeroRuntime{ - runtime: wazero.NewRuntime(ctx), - modules: make(map[string]wazeroapi.Module), - } -} - -func (w *WazeroRuntime) StartApp(ctx context.Context, appName string, wasmBinary []byte, functionName string) (wazeroapi.Function, error) { - if appName == "" { - return nil, fmt.Errorf("start app: appName is required but missing: %w", pkgerrors.ErrMissingValue) - } - if len(wasmBinary) == 0 { - return nil, fmt.Errorf("start app: Wasm binary is empty: %w", pkgerrors.ErrInvalidValue) - } - if functionName == "" { - return nil, fmt.Errorf("start app: functionName is required but missing: %w", pkgerrors.ErrMissingValue) - } - - w.mutex.Lock() - defer w.mutex.Unlock() - - if _, exists := w.modules[appName]; exists { - return nil, fmt.Errorf("start app: app '%s' is already running: %w", appName, pkgerrors.ErrAppAlreadyRunning) +func NewService(ctx context.Context, channelID, thingID, thingKey, registryURL, registryToken string, livelinessInterval time.Duration, pubsub pkgmqtt.PubSub, logger *slog.Logger, runtime Runtime) (*PropletService, error) { + topic := fmt.Sprintf(discoveryTopicTemplate, channelID) + payload := map[string]interface{}{ + "proplet_id": thingID, + "mg_channel_id": channelID, } - - module, err := w.runtime.Instantiate(ctx, wasmBinary) - if err != nil { - return nil, fmt.Errorf("start app: failed to instantiate Wasm module for app '%s': %w", appName, pkgerrors.ErrModuleInstantiation) + if err := pubsub.Publish(ctx, topic, payload); err != nil { + return nil, errors.Join(errors.New("failed to publish discovery"), err) } - function := module.ExportedFunction(functionName) - if function == nil { - _ = module.Close(ctx) - - return nil, fmt.Errorf("start app: function '%s' not found in Wasm module for app '%s': %w", functionName, appName, pkgerrors.ErrFunctionNotFound) + p := &PropletService{ + channelID: channelID, + thingID: thingID, + thingKey: thingKey, + registryURL: registryURL, + registryToken: registryToken, + livelinessInterval: livelinessInterval, + pubsub: pubsub, + chunks: make(map[string][][]byte), + chunkMetadata: make(map[string]*ChunkPayload), + runtime: runtime, + logger: logger, } - w.modules[appName] = module + go p.startLivelinessUpdates(ctx) - return function, nil + return p, nil } -func (w *WazeroRuntime) StopApp(ctx context.Context, appName string) error { - if appName == "" { - return fmt.Errorf("stop app: appName is required but missing: %w", pkgerrors.ErrMissingValue) - } +func (p *PropletService) startLivelinessUpdates(ctx context.Context) { + ticker := time.NewTicker(p.livelinessInterval) + defer ticker.Stop() - w.mutex.Lock() - defer w.mutex.Unlock() + for { + select { + case <-ctx.Done(): + p.logger.Info("stopping liveliness updates") - module, exists := w.modules[appName] - if !exists { - return fmt.Errorf("stop app: app '%s' is not running: %w", appName, pkgerrors.ErrAppNotRunning) - } - - if err := module.Close(ctx); err != nil { - return fmt.Errorf("stop app: failed to stop app '%s': %w", appName, pkgerrors.ErrModuleStopFailed) - } + return + case <-ticker.C: + topic := fmt.Sprintf(aliveTopicTemplate, p.channelID) + payload := map[string]interface{}{ + "status": "alive", + "proplet_id": p.thingID, + "mg_channel_id": p.channelID, + } - delete(w.modules, appName) + if err := p.pubsub.Publish(ctx, topic, payload); err != nil { + p.logger.Error("failed to publish liveliness message", slog.Any("error", err)) + } - return nil + p.logger.Debug("Published liveliness message", slog.String("topic", topic)) + } + } } -func NewService(ctx context.Context, cfg Config, wasmBinary []byte, logger *slog.Logger) (*PropletService, error) { - mqttClient, err := NewMQTTClient(cfg, logger) - if err != nil { - return nil, fmt.Errorf("failed to initialize MQTT client: %w", err) +func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { + topic := fmt.Sprintf(startTopicTemplate, p.channelID) + if err := p.pubsub.Subscribe(ctx, topic, p.handleStartCommand(ctx)); err != nil { + return fmt.Errorf("failed to subscribe to start topic: %w", err) } - runtime := NewWazeroRuntime(ctx) - - return &PropletService{ - config: cfg, - mqttClient: mqttClient, - runtime: runtime, - wasmBinary: wasmBinary, - chunks: make(map[string][][]byte), - chunkMetadata: make(map[string]*ChunkPayload), - }, nil -} + topic = fmt.Sprintf(stopTopicTemplate, p.channelID) + if err := p.pubsub.Subscribe(ctx, topic, p.handleStopCommand(ctx)); err != nil { + return fmt.Errorf("failed to subscribe to stop topic: %w", err) + } -func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { - if err := SubscribeToManagerTopics( - p.mqttClient, - p.config, - func(client mqtt.Client, msg mqtt.Message) { - p.handleStartCommand(ctx, client, msg, logger) - }, - func(client mqtt.Client, msg mqtt.Message) { - p.handleStopCommand(ctx, client, msg, logger) - }, - func(client mqtt.Client, msg mqtt.Message) { - p.registryUpdate(ctx, client, msg, logger) - }, - logger, - ); err != nil { - return fmt.Errorf("failed to subscribe to Manager topics: %w", err) + topic = fmt.Sprintf(registryResponseTopic, p.channelID) + if err := p.pubsub.Subscribe(ctx, topic, p.handleChunk(ctx)); err != nil { + return fmt.Errorf("failed to subscribe to registry topics: %w", err) } - if err := SubscribeToRegistryTopic( - p.mqttClient, - p.config.ChannelID, - func(client mqtt.Client, msg mqtt.Message) { - p.handleChunk(ctx, client, msg) - }, - logger, - ); err != nil { - return fmt.Errorf("failed to subscribe to Registry topics: %w", err) + topic = fmt.Sprintf(updateRegistryTopicTemplate, p.channelID) + if err := p.pubsub.Subscribe(ctx, topic, p.registryUpdate(ctx)); err != nil { + return fmt.Errorf("failed to subscribe to update registry topic: %w", err) } logger.Info("Proplet service is running.") @@ -163,115 +136,125 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { return nil } -func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { - var req propletapi.StartRequest - if err := json.Unmarshal(msg.Payload(), &req); err != nil { - logger.Error("Invalid start command payload", slog.Any("error", err)) - - return - } - - logger.Info("Received start command", slog.String("app_name", req.AppName)) - - if p.wasmBinary != nil { - logger.Info("Using preloaded WASM binary", slog.String("app_name", req.AppName)) - function, err := p.runtime.StartApp(ctx, req.AppName, p.wasmBinary, "main") +func (p *PropletService) handleStartCommand(ctx context.Context) func(topic string, msg map[string]interface{}) error { + return func(topic string, msg map[string]interface{}) error { + data, err := json.Marshal(msg) if err != nil { - logger.Error("Failed to start app", slog.String("app_name", req.AppName), slog.Any("error", err)) - - return + return err } - _, err = function.Call(ctx) - if err != nil { - logger.Error("Error executing app", slog.String("app_name", req.AppName), slog.Any("error", err)) - } else { - logger.Info("App started successfully", slog.String("app_name", req.AppName)) + var payload task.Task + if err := json.Unmarshal(data, &payload); err != nil { + return err } - return - } + req := startRequest{ + ID: payload.ID, + FunctionName: payload.Name, + WasmFile: payload.File, + Params: payload.Inputs, + } + if err := req.Validate(); err != nil { + return err + } - if p.config.RegistryURL != "" { - err := PublishFetchRequest(p.mqttClient, p.config.ChannelID, req.AppName, logger) - if err != nil { - logger.Error("Failed to publish fetch request", slog.String("app_name", req.AppName), slog.Any("error", err)) + p.logger.Info("Received start command", slog.String("app_name", req.FunctionName)) - return + if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil { + return err } - go func() { - logger.Info("Waiting for chunks", slog.String("app_name", req.AppName)) + if p.registryURL != "" { + payload := map[string]interface{}{ + "app_name": req.FunctionName, + } + topic := fmt.Sprintf(fetchRequestTopicTemplate, p.channelID) + if err := p.pubsub.Publish(ctx, topic, payload); err != nil { + return err + } + + go func() { + p.logger.Info("Waiting for chunks", slog.String("app_name", req.FunctionName)) + + for { + p.chunksMutex.Lock() + metadata, exists := p.chunkMetadata[req.FunctionName] + receivedChunks := len(p.chunks[req.FunctionName]) + p.chunksMutex.Unlock() - for { - p.chunksMutex.Lock() - metadata, exists := p.chunkMetadata[req.AppName] - receivedChunks := len(p.chunks[req.AppName]) - p.chunksMutex.Unlock() + if exists && receivedChunks == metadata.TotalChunks { + p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.FunctionName)) + go p.deployAndRunApp(ctx, req.FunctionName) - if exists && receivedChunks == metadata.TotalChunks { - logger.Info("All chunks received, deploying app", slog.String("app_name", req.AppName)) - go p.deployAndRunApp(ctx, req.AppName) + break + } - break + time.Sleep(pollingInterval) } + }() + } - time.Sleep(pollingInterval) - } - }() - } else { - logger.Warn("Registry URL is empty, and no binary provided", slog.String("app_name", req.AppName)) + return nil } } -func (p *PropletService) handleStopCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { - var req propletapi.StopRequest - if err := json.Unmarshal(msg.Payload(), &req); err != nil { - logger.Error("Invalid stop command payload", slog.Any("error", err)) +func (p *PropletService) handleStopCommand(ctx context.Context) func(topic string, msg map[string]interface{}) error { + return func(topic string, msg map[string]interface{}) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } - return - } + var req stopRequest + if err := json.Unmarshal(data, &req); err != nil { + return err + } - logger.Info("Received stop command", slog.String("app_name", req.AppName)) + if err := req.Validate(); err != nil { + return err + } - err := p.runtime.StopApp(ctx, req.AppName) - if err != nil { - logger.Error("Failed to stop app", slog.String("app_name", req.AppName), slog.Any("error", err)) + if err := p.runtime.StopApp(ctx, req.ID); err != nil { + return err + } - return + return nil } - - logger.Info("App stopped successfully", slog.String("app_name", req.AppName)) } -func (p *PropletService) handleChunk(ctx context.Context, _ mqtt.Client, msg mqtt.Message) { - var chunk ChunkPayload - if err := json.Unmarshal(msg.Payload(), &chunk); err != nil { - log.Printf("Failed to unmarshal chunk payload: %v", err) +func (p *PropletService) handleChunk(ctx context.Context) func(topic string, msg map[string]interface{}) error { + return func(topic string, msg map[string]interface{}) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } - return - } + var chunk ChunkPayload + if err := json.Unmarshal(data, &chunk); err != nil { + return err + } - if err := chunk.Validate(); err != nil { - log.Printf("Invalid chunk payload: %v\n", err) + if err := chunk.Validate(); err != nil { + return err + } - return - } + p.chunksMutex.Lock() + defer p.chunksMutex.Unlock() - p.chunksMutex.Lock() - defer p.chunksMutex.Unlock() + if _, exists := p.chunkMetadata[chunk.AppName]; !exists { + p.chunkMetadata[chunk.AppName] = &chunk + } - if _, exists := p.chunkMetadata[chunk.AppName]; !exists { - p.chunkMetadata[chunk.AppName] = &chunk - } + p.chunks[chunk.AppName] = append(p.chunks[chunk.AppName], chunk.Data) - p.chunks[chunk.AppName] = append(p.chunks[chunk.AppName], chunk.Data) + log.Printf("Received chunk %d/%d for app '%s'\n", chunk.ChunkIdx+1, chunk.TotalChunks, chunk.AppName) - log.Printf("Received chunk %d/%d for app '%s'\n", chunk.ChunkIdx+1, chunk.TotalChunks, chunk.AppName) + if len(p.chunks[chunk.AppName]) == p.chunkMetadata[chunk.AppName].TotalChunks { + log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName) + go p.deployAndRunApp(ctx, chunk.AppName) + } - if len(p.chunks[chunk.AppName]) == p.chunkMetadata[chunk.AppName].TotalChunks { - log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName) - go p.deployAndRunApp(ctx, chunk.AppName) + return nil } } @@ -283,21 +266,8 @@ func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) { delete(p.chunks, appName) p.chunksMutex.Unlock() - wasmBinary := assembleChunks(chunks) - - function, err := p.runtime.StartApp(ctx, appName, wasmBinary, "main") - if err != nil { - log.Printf("Failed to start app '%s': %v\n", appName, err) - - return - } - - _, err = function.Call(ctx) - if err != nil { - log.Printf("Failed to execute app '%s': %v\n", appName, err) - - return - } + _ = ctx + _ = assembleChunks(chunks) log.Printf("App '%s' started successfully\n", appName) } @@ -333,40 +303,42 @@ func (p *PropletService) UpdateRegistry(ctx context.Context, registryURL, regist return fmt.Errorf("invalid registry URL '%s': %w", registryURL, err) } - p.config.RegistryURL = registryURL - p.config.RegistryToken = registryToken - - configData, err := json.MarshalIndent(p.config, "", " ") - if err != nil { - return fmt.Errorf("failed to serialize updated config: %w", err) - } - - if err := os.WriteFile("proplet/config.json", configData, filePermissions); err != nil { - return fmt.Errorf("failed to write updated config to file: %w", err) - } + p.registryURL = registryURL + p.registryToken = registryToken log.Printf("App Registry updated and persisted: %s\n", registryURL) return nil } -func (p *PropletService) registryUpdate(ctx context.Context, client mqtt.Client, msg mqtt.Message, logger *slog.Logger) { - var payload struct { - RegistryURL string `json:"registry_url"` - RegistryToken string `json:"registry_token"` - } - if err := json.Unmarshal(msg.Payload(), &payload); err != nil { - logger.Error("Invalid registry update payload", slog.Any("error", err)) +func (p *PropletService) registryUpdate(ctx context.Context) func(topic string, msg map[string]interface{}) error { + return func(topic string, msg map[string]interface{}) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } - return - } + var payload struct { + RegistryURL string `json:"registry_url"` + RegistryToken string `json:"registry_token"` + } + if err := json.Unmarshal(data, &payload); err != nil { + return err + } + + ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.channelID) + + if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil { + if err := p.pubsub.Publish(ctx, ackTopic, map[string]interface{}{"status": "failure", "error": err.Error()}); err != nil { + p.logger.Error("Failed to publish ack message", slog.String("ack_topic", ackTopic), slog.Any("error", err)) + } + } else { + if err := p.pubsub.Publish(ctx, ackTopic, map[string]interface{}{"status": "success"}); err != nil { + p.logger.Error("Failed to publish ack message", slog.String("ack_topic", ackTopic), slog.Any("error", err)) + } + p.logger.Info("App Registry configuration updated successfully", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL)) + } - ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.config.ChannelID) - if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil { - client.Publish(ackTopic, 0, false, fmt.Sprintf(RegistryFailurePayload, err)) - logger.Error("Failed to update registry configuration", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL), slog.Any("error", err)) - } else { - client.Publish(ackTopic, 0, false, RegistrySuccessPayload) - logger.Info("App Registry configuration updated successfully", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL)) + return nil } } diff --git a/proplet/wasm.go b/proplet/wasm.go index 286a10d..1b9abc3 100644 --- a/proplet/wasm.go +++ b/proplet/wasm.go @@ -2,104 +2,105 @@ package proplet import ( "context" + "errors" "fmt" + "log/slog" "sync" - "github.com/absmach/propeller/task" + "github.com/absmach/propeller/pkg/mqtt" "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) -var _ Service = (*proplet)(nil) +var resultsTopic = "channels/%s/messages/control/proplet/results" -type proplet struct { - mu sync.Mutex - Name string - DB map[string]task.Task - TaskCount int +type Runtime interface { + StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error + StopApp(ctx context.Context, id string) error +} + +type wazeroRuntime struct { + mutex sync.Mutex runtimes map[string]wazero.Runtime - functions map[string]api.Function + pubsub mqtt.PubSub + channelID string + logger *slog.Logger } -func NewWasmProplet(name string) *proplet { - return &proplet{ - Name: name, - DB: make(map[string]task.Task), - TaskCount: 0, +func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) Runtime { + return &wazeroRuntime{ runtimes: make(map[string]wazero.Runtime), - functions: make(map[string]api.Function), + pubsub: pubsub, + channelID: channelID, + logger: logger, } } -func (w *proplet) StartTask(ctx context.Context, t task.Task) error { - w.mu.Lock() - defer w.mu.Unlock() - +func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error { r := wazero.NewRuntime(ctx) + + w.mutex.Lock() + w.runtimes[id] = r + w.mutex.Unlock() + // Instantiate WASI, which implements host functions needed for TinyGo to // implement `panic`. wasi_snapshot_preview1.MustInstantiate(ctx, r) - module, err := r.Instantiate(ctx, t.Function.File) + module, err := r.Instantiate(ctx, wasmBinary) if err != nil { - return err + return errors.Join(errors.New("failed to instantiate Wasm module"), err) } - function := module.ExportedFunction(t.Function.Name) + function := module.ExportedFunction(functionName) if function == nil { - return fmt.Errorf("function %q not found", t.Function.Name) + return errors.New("failed to find exported function") } - w.TaskCount++ - w.runtimes[t.ID] = r - w.functions[t.ID] = function - w.DB[t.ID] = t + go func() { + results, err := function.Call(ctx, args...) + if err != nil { + w.logger.Error("failed to call function", slog.String("id", id), slog.String("function", functionName), slog.String("error", err.Error())) - return nil -} + return + } -func (w *proplet) RunTask(ctx context.Context, taskID string) ([]uint64, error) { - w.mu.Lock() - defer w.mu.Unlock() + if err := w.StopApp(ctx, id); err != nil { + w.logger.Error("failed to stop app", slog.String("id", id), slog.String("error", err.Error())) + } - t, ok := w.DB[taskID] - if !ok { - return nil, fmt.Errorf("task %q not found", taskID) - } + payload := map[string]interface{}{ + "task_id": id, + "results": results, + } - function := w.functions[t.ID] + topic := fmt.Sprintf(resultsTopic, w.channelID) + if err := w.pubsub.Publish(ctx, topic, payload); err != nil { + w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) - result, err := function.Call(ctx, t.Function.Inputs...) - if err != nil { - return nil, err - } + return + } - r := w.runtimes[t.ID] - if err := r.Close(ctx); err != nil { - return nil, err - } + w.logger.Info("Finished running app", slog.String("id", id)) + }() - return result, nil + return nil } -func (w *proplet) StopTask(ctx context.Context, taskID string) error { - w.mu.Lock() - defer w.mu.Unlock() - - r := w.runtimes[taskID] +func (w *wazeroRuntime) StopApp(ctx context.Context, id string) error { + w.mutex.Lock() + defer w.mutex.Unlock() - return r.Close(ctx) -} + r, exists := w.runtimes[id] + if !exists { + return errors.New("there is no runtime for this id") + } -func (w *proplet) RemoveTask(_ context.Context, taskID string) error { - w.mu.Lock() - defer w.mu.Unlock() + if err := r.Close(ctx); err != nil { + return err + } - delete(w.DB, taskID) - delete(w.runtimes, taskID) - delete(w.functions, taskID) - w.TaskCount-- + delete(w.runtimes, id) return nil } diff --git a/task/task.go b/task/task.go index 0fce402..44b7a4e 100644 --- a/task/task.go +++ b/task/task.go @@ -31,17 +31,13 @@ func (s State) String() string { } } -type Function struct { - File []byte - Name string - Inputs []uint64 -} - type Task struct { ID string `json:"id"` Name string `json:"name"` State State `json:"state"` - Function Function `json:"function"` + File []byte `json:"file,omitempty"` + Inputs []uint64 `json:"inputs,omitempty"` + Results []uint64 `json:"results,omitempty"` StartTime time.Time `json:"start_time"` FinishTime time.Time `json:"finish_time"` CreatedAt time.Time `json:"created_at"` diff --git a/test.md b/test.md index 3df2948..d9a98c6 100644 --- a/test.md +++ b/test.md @@ -80,3 +80,25 @@ Publish alive message to the manager channel. This updates the proplet. ```bash mosquitto_pub -u $PROPLET_THING_ID -P $PROPLET_THING_KEY -I propeller -t channels/$MANAGER_CHANNEL_ID/messages/control/proplet/alive -h localhost -m "{\"proplet_id\": \"$PROPLET_THING_ID\"}" ``` + +To start the manager, run the following command + +```bash +export MANAGER_THING_ID="" +export MANAGER_THING_KEY="" +export PRMANAGER_CHANNEL_ID="" +export PROPLET_THING_ID="" +export PROPLET_THING_KEY="" +propeller-manager +``` + +To start the proplet, run the following command + +```bash +export MANAGER_THING_ID="" +export MANAGER_THING_KEY="" +export PROPLET_CHANNEL_ID="" +export PROPLET_THING_ID="" +export PROPLET_THING_KEY="" +propeller-proplet +```