diff --git a/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/engine_functions/create_engine.go b/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/engine_functions/create_engine.go index 9b9d975aa5..c054eac180 100644 --- a/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/engine_functions/create_engine.go +++ b/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/engine_functions/create_engine.go @@ -140,7 +140,7 @@ func CreateEngine( if err != nil { return nil, stacktrace.Propagate( err, - "An error occurred creating the Enclave Manager UI's http port spec object using number '%v' and protocol '%v'", + "An error occurred creating the REST API server's http port spec object using number '%v' and protocol '%v'", restAPIPort, consts.EngineTransportProtocol.String(), ) diff --git a/engine/server/engine/main.go b/engine/server/engine/main.go index 7f0bf4e7f5..2193a81e57 100644 --- a/engine/server/engine/main.go +++ b/engine/server/engine/main.go @@ -72,10 +72,15 @@ const ( shouldFlushMetricsClientQueueOnEachEvent = false - restAPIPortAddr uint16 = 9779 + restAPIPortAddr uint16 = 977 //TODO: pass this parameter9 + restAPIHostIP string = "0.0.0.0" streamerPoolSize = 1000 streamerExpirationTime = time.Hour * 2 + + pathToEnclaveSpecs = "/api/specs/enclave" + pathToEngineSpecs = "/api/specs/engine" + pathToWebsocketSpecs = "/api/specs/websocket" ) // Nil indicates that the KurtosisBackend should not operate in API container mode, which is appropriate here @@ -243,15 +248,22 @@ func runMain() error { } }() - go restApiServer( - ctx, - serverArgs, - enclaveManager, - perWeekLogsDatabaseClient, - perFileLogsDatabaseClient, - logFileManager, - metricsClient, - ) + go func() { + err := restApiServer( + ctx, + serverArgs, + enclaveManager, + perWeekLogsDatabaseClient, + perFileLogsDatabaseClient, + logFileManager, + metricsClient, + ) + if err != nil { + logrus.Fatal("The REST API server is down, exiting!", err) + fmt.Fprintln(logrus.StandardLogger().Out, err) + os.Exit(failureExitCode) + } + }() engineConnectServer := server.NewEngineConnectServerService( serverArgs.ImageVersionTag, @@ -373,48 +385,27 @@ func restApiServer( perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient, logFileManager *log_file_manager.LogFileManager, metricsClient metrics_client.MetricsClient, -) { +) error { asyncStarlarkLogs := streaming.NewStreamerPool[*kurtosis_core_rpc_api_bindings.StarlarkRunResponseLine](streamerPoolSize, streamerExpirationTime) + defer asyncStarlarkLogs.Clean() logrus.Info("Running REST API server...") // This is how you set up a basic Echo router - e := echo.New() - // Log all requests - e.Use(echomiddleware.Logger()) + echoRouter := echo.New() + echoRouter.Use(echomiddleware.Logger()) // ============================== Engine Management API ====================================== - // OpenAPI schema. - swagger_engine, err := engineApi.GetSwagger() - if err != nil { - fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err) - os.Exit(1) - } - server.ServeSwaggerUI(e, "/api/specs/engine", server.NewSwaggerUIConfig(swagger_engine)) - - // Use our validation middleware to check all requests against the - // e.Use(middleware.OapiRequestValidator(swagger_engine)) - - // We now register our runtime above as the handler for the interface engineRuntime := restApi.EngineRuntime{ ImageVersionTag: serverArgs.ImageVersionTag, EnclaveManager: enclave_manager, LogFileManager: logFileManager, MetricsClient: metricsClient, } - engineApi.RegisterHandlers(e, engineApi.NewStrictHandler(engineRuntime, nil)) + engineApi.RegisterHandlers(echoRouter, engineApi.NewStrictHandler(engineRuntime, nil)) // ============================== Logging API ====================================== - - swagger_websocket, err := loggingApi.GetSwagger() - if err != nil { - fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err) - os.Exit(1) - } - server.ServeSwaggerUI(e, "/api/specs/websocket", server.NewSwaggerUIConfig(swagger_websocket)) - // Use our validation middleware to check all requests against the - // e.Use(middleware.OapiRequestValidator(swagger_websocket)) webSocketRuntime := restApi.WebSocketRuntime{ ImageVersionTag: serverArgs.ImageVersionTag, EnclaveManager: enclave_manager, @@ -426,29 +417,43 @@ func restApiServer( MetricsClient: metricsClient, AsyncStarlarkLogs: asyncStarlarkLogs, } - // TODO(edgar) add logging Close() - // defer webSocketRuntime.ShutDown() - loggingApi.RegisterHandlers(e, webSocketRuntime) + loggingApi.RegisterHandlers(echoRouter, webSocketRuntime) // ============================== Engine Management API ====================================== - // OpenAPI schema. - swagger_enclave, err := enclaveApi.GetSwagger() + enclaveRuntime, err := restApi.NewEnclaveRuntime(ctx, *enclave_manager, asyncStarlarkLogs, false) if err != nil { - fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err) - os.Exit(1) + stacktrace.Propagate(err, "Failed to initialize %T", enclaveRuntime) + return err } - server.ServeSwaggerUI(e, "/api/specs/enclave", server.NewSwaggerUIConfig(swagger_enclave)) + enclaveApi.RegisterHandlers(echoRouter, enclaveApi.NewStrictHandler(enclaveRuntime, nil)) - // Use our validation middleware to check all requests against the - // e.Use(middleware.OapiRequestValidator(swagger_enclave)) - enclaveRuntime, err := restApi.NewEnclaveRuntime(ctx, *enclave_manager, asyncStarlarkLogs, false) + // ============================== Serve OpenAPI specs ====================================== + // TODO (edgar) Move Spec service to Web Server + // ========================================================================================= + swaggerEngine, err := engineApi.GetSwagger() + if err != nil { + // Log and skip since this is non-essential + logrus.Errorf("Error loading swagger spec: %v", err) + } else { + server.ServeSwaggerUI(echoRouter, pathToEngineSpecs, server.NewSwaggerUIConfig(swaggerEngine)) + } + + swaggerEnclave, err := enclaveApi.GetSwagger() + if err != nil { + // Log and skip since this is non-essential + logrus.Errorf("Error loading swagger spec: %v", err) + } else { + server.ServeSwaggerUI(echoRouter, pathToEnclaveSpecs, server.NewSwaggerUIConfig(swaggerEnclave)) + } + + swaggerWebsocket, err := loggingApi.GetSwagger() if err != nil { - // TODO(edgar) fix error handling + // Log and skip since this is non-essential + logrus.Errorf("Error loading swagger spec: %v", err) + } else { + server.ServeSwaggerUI(echoRouter, pathToWebsocketSpecs, server.NewSwaggerUIConfig(swaggerWebsocket)) } - // TODO(edgar) add enclave Close() - // defer enclaveRuntime.ShutDown() - enclaveApi.RegisterHandlers(e, enclaveApi.NewStrictHandler(enclaveRuntime, nil)) // ============================== Start Server ====================================== - e.Logger.Fatal(e.Start(net.JoinHostPort("0.0.0.0", fmt.Sprint(restAPIPortAddr)))) + return echoRouter.Start(net.JoinHostPort(restAPIHostIP, fmt.Sprint(restAPIPortAddr))) } diff --git a/engine/server/engine/server/websocket_api_handler.go b/engine/server/engine/server/websocket_api_handler.go index 55c4f09dfd..25444e8526 100644 --- a/engine/server/engine/server/websocket_api_handler.go +++ b/engine/server/engine/server/websocket_api_handler.go @@ -44,7 +44,9 @@ type WebSocketRuntime struct { LogFileManager *log_file_manager.LogFileManager - MetricsClient metrics_client.MetricsClient + MetricsClient metrics_client.MetricsClient + + // Pool of Starlark log streamers create by package/script runs AsyncStarlarkLogs streaming.StreamerPool[*rpc_api.StarlarkRunResponseLine] } diff --git a/engine/server/engine/streaming/streaming_pool_manager.go b/engine/server/engine/streaming/streaming_pool_manager.go index d3c7f1f0f7..01e625c54d 100644 --- a/engine/server/engine/streaming/streaming_pool_manager.go +++ b/engine/server/engine/streaming/streaming_pool_manager.go @@ -77,3 +77,7 @@ func (streamerPool StreamerPool[T]) Consume(uuid StreamerUUID, consumer func(*ku return true, nil } + +func (streamerPool StreamerPool[T]) Clean() { + streamerPool.pool.Purge() +}