Skip to content

Commit

Permalink
Improve main engine function
Browse files Browse the repository at this point in the history
  • Loading branch information
lostbean committed Dec 2, 2023
1 parent cfa8a31 commit 4d0f6c7
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func CreateEngine(
if err != nil {
return nil, stacktrace.Propagate(
err,
"An error occurred creating the Enclave Manager UI's http port spec object using number '%v' and protocol '%v'",
"An error occurred creating the REST API server's http port spec object using number '%v' and protocol '%v'",
restAPIPort,
consts.EngineTransportProtocol.String(),
)
Expand Down
109 changes: 57 additions & 52 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ const (

shouldFlushMetricsClientQueueOnEachEvent = false

restAPIPortAddr uint16 = 9779
restAPIPortAddr uint16 = 977 //TODO: pass this parameter9
restAPIHostIP string = "0.0.0.0"

streamerPoolSize = 1000
streamerExpirationTime = time.Hour * 2

pathToEnclaveSpecs = "/api/specs/enclave"
pathToEngineSpecs = "/api/specs/engine"
pathToWebsocketSpecs = "/api/specs/websocket"
)

// Nil indicates that the KurtosisBackend should not operate in API container mode, which is appropriate here
Expand Down Expand Up @@ -243,15 +248,22 @@ func runMain() error {
}
}()

go restApiServer(
ctx,
serverArgs,
enclaveManager,
perWeekLogsDatabaseClient,
perFileLogsDatabaseClient,
logFileManager,
metricsClient,
)
go func() {
err := restApiServer(
ctx,
serverArgs,
enclaveManager,
perWeekLogsDatabaseClient,
perFileLogsDatabaseClient,
logFileManager,
metricsClient,
)
if err != nil {
logrus.Fatal("The REST API server is down, exiting!", err)
fmt.Fprintln(logrus.StandardLogger().Out, err)
os.Exit(failureExitCode)
}
}()

engineConnectServer := server.NewEngineConnectServerService(
serverArgs.ImageVersionTag,
Expand Down Expand Up @@ -373,48 +385,27 @@ func restApiServer(
perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient,
logFileManager *log_file_manager.LogFileManager,
metricsClient metrics_client.MetricsClient,
) {
) error {

asyncStarlarkLogs := streaming.NewStreamerPool[*kurtosis_core_rpc_api_bindings.StarlarkRunResponseLine](streamerPoolSize, streamerExpirationTime)
defer asyncStarlarkLogs.Clean()

logrus.Info("Running REST API server...")

// This is how you set up a basic Echo router
e := echo.New()
// Log all requests
e.Use(echomiddleware.Logger())
echoRouter := echo.New()
echoRouter.Use(echomiddleware.Logger())

// ============================== Engine Management API ======================================
// OpenAPI schema.
swagger_engine, err := engineApi.GetSwagger()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err)
os.Exit(1)
}
server.ServeSwaggerUI(e, "/api/specs/engine", server.NewSwaggerUIConfig(swagger_engine))

// Use our validation middleware to check all requests against the
// e.Use(middleware.OapiRequestValidator(swagger_engine))

// We now register our runtime above as the handler for the interface
engineRuntime := restApi.EngineRuntime{
ImageVersionTag: serverArgs.ImageVersionTag,
EnclaveManager: enclave_manager,
LogFileManager: logFileManager,
MetricsClient: metricsClient,
}
engineApi.RegisterHandlers(e, engineApi.NewStrictHandler(engineRuntime, nil))
engineApi.RegisterHandlers(echoRouter, engineApi.NewStrictHandler(engineRuntime, nil))

// ============================== Logging API ======================================

swagger_websocket, err := loggingApi.GetSwagger()
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err)
os.Exit(1)
}
server.ServeSwaggerUI(e, "/api/specs/websocket", server.NewSwaggerUIConfig(swagger_websocket))
// Use our validation middleware to check all requests against the
// e.Use(middleware.OapiRequestValidator(swagger_websocket))
webSocketRuntime := restApi.WebSocketRuntime{
ImageVersionTag: serverArgs.ImageVersionTag,
EnclaveManager: enclave_manager,
Expand All @@ -426,29 +417,43 @@ func restApiServer(
MetricsClient: metricsClient,
AsyncStarlarkLogs: asyncStarlarkLogs,
}
// TODO(edgar) add logging Close()
// defer webSocketRuntime.ShutDown()
loggingApi.RegisterHandlers(e, webSocketRuntime)
loggingApi.RegisterHandlers(echoRouter, webSocketRuntime)

// ============================== Engine Management API ======================================
// OpenAPI schema.
swagger_enclave, err := enclaveApi.GetSwagger()
enclaveRuntime, err := restApi.NewEnclaveRuntime(ctx, *enclave_manager, asyncStarlarkLogs, false)
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err)
os.Exit(1)
stacktrace.Propagate(err, "Failed to initialize %T", enclaveRuntime)
return err
}
server.ServeSwaggerUI(e, "/api/specs/enclave", server.NewSwaggerUIConfig(swagger_enclave))
enclaveApi.RegisterHandlers(echoRouter, enclaveApi.NewStrictHandler(enclaveRuntime, nil))

// Use our validation middleware to check all requests against the
// e.Use(middleware.OapiRequestValidator(swagger_enclave))
enclaveRuntime, err := restApi.NewEnclaveRuntime(ctx, *enclave_manager, asyncStarlarkLogs, false)
// ============================== Serve OpenAPI specs ======================================
// TODO (edgar) Move Spec service to Web Server
// =========================================================================================
swaggerEngine, err := engineApi.GetSwagger()
if err != nil {
// Log and skip since this is non-essential
logrus.Errorf("Error loading swagger spec: %v", err)
} else {
server.ServeSwaggerUI(echoRouter, pathToEngineSpecs, server.NewSwaggerUIConfig(swaggerEngine))
}

swaggerEnclave, err := enclaveApi.GetSwagger()
if err != nil {
// Log and skip since this is non-essential
logrus.Errorf("Error loading swagger spec: %v", err)
} else {
server.ServeSwaggerUI(echoRouter, pathToEnclaveSpecs, server.NewSwaggerUIConfig(swaggerEnclave))
}

swaggerWebsocket, err := loggingApi.GetSwagger()
if err != nil {
// TODO(edgar) fix error handling
// Log and skip since this is non-essential
logrus.Errorf("Error loading swagger spec: %v", err)
} else {
server.ServeSwaggerUI(echoRouter, pathToWebsocketSpecs, server.NewSwaggerUIConfig(swaggerWebsocket))
}
// TODO(edgar) add enclave Close()
// defer enclaveRuntime.ShutDown()
enclaveApi.RegisterHandlers(e, enclaveApi.NewStrictHandler(enclaveRuntime, nil))

// ============================== Start Server ======================================
e.Logger.Fatal(e.Start(net.JoinHostPort("0.0.0.0", fmt.Sprint(restAPIPortAddr))))
return echoRouter.Start(net.JoinHostPort(restAPIHostIP, fmt.Sprint(restAPIPortAddr)))
}
4 changes: 3 additions & 1 deletion engine/server/engine/server/websocket_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ type WebSocketRuntime struct {

LogFileManager *log_file_manager.LogFileManager

MetricsClient metrics_client.MetricsClient
MetricsClient metrics_client.MetricsClient

// Pool of Starlark log streamers create by package/script runs
AsyncStarlarkLogs streaming.StreamerPool[*rpc_api.StarlarkRunResponseLine]
}

Expand Down
4 changes: 4 additions & 0 deletions engine/server/engine/streaming/streaming_pool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ func (streamerPool StreamerPool[T]) Consume(uuid StreamerUUID, consumer func(*ku

return true, nil
}

func (streamerPool StreamerPool[T]) Clean() {
streamerPool.pool.Purge()
}

0 comments on commit 4d0f6c7

Please sign in to comment.