Skip to content

Commit

Permalink
Introduce TexeraWorkflowCompilingService for Workflow Compilation (#2796
Browse files Browse the repository at this point in the history
)

This PR adds a standalone web service called
`TexeraWorkflowCompilingService`.

## The purpose of having a standalone compilation service
Separate the Texera Workflow Compilation with Workflow Execution. 

## The configuration of this service

### 1. Web Server Config
All the configuration items are put in
`core/amber/src/main/resources/texera-compiler-web-config.yml`. By
default, this server runs at port 9090.

### 2. Application Config
This service shares the same application configuration file, i.e.
`core/amber/src/main/resources/application.conf`. To ensure that JWT
token can be recognized by both `TexeraWebApplication` and
`TexeraWorkflowCompilingService`, `user-sys.jwt.256-bit-secret` must NOT
be set as random, instead it must be set as a fixed 256-bit string.

## The Endpoint provided by `TexeraWorkflowCompilingService`
A single endpoint is provided by this web application:
```
POST    /api/texera/compilation/{wid}
```
- request format: the serialized workflow json string
- response:
- physicalPlan: Option[PhysicalPlan] // the physical plan of the
workflow, if None, the compilation is failed
- operatorInputSchemas: Map[String, List[Option[List[Attribute]]]]. //
from `OpId` to list of `Schema`(List of attributes) indexed by port id
- operatorErrors: Map[String, String] // from `OpId` to error of this
operator
   
## Future TODO after this PR

1. Use this web service when GUI is doing the workflow editing, maintain
the physical plan in GUI.
2. Change all the deployment scripts, including `terminate-daemon.sh`,
`deploy-daemon.sh` and `Dockerfile` to launch & terminate this web
service
3. Clean up the code in
`core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala`
and
`core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala`
  • Loading branch information
bobbai00 authored and PurelyBlank committed Dec 4, 2024
1 parent effcccf commit 4446179
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
server:
# modify applicationContextPath if you want the root path to be the name of the application
# for example, set it to /twitter, then the url will become texera.ics.uci.edu:port/twitter
applicationContextPath: /
applicationConnectors:
- type: http
port: 9090
adminConnectors:
- type: http
port: 9091
requestLog:
type: classic
timeZone: UTC
appenders:
- type: console
- type: file
currentLogFilename: ../log/access.log
threshold: ALL
queueSize: 512
discardingThreshold: 0
archive: true
archivedLogFilenamePattern: ../log/access-%d{yyyy-MM-dd}.log.gz
archivedFileCount: 7
bufferSize: 8KiB
immediateFlush: true
logging:
level: INFO
loggers:
"io.dropwizard": INFO
appenders:
- type: console
logFormat: "[%date{ISO8601}] [%level] [%logger] [%thread] - %msg %n"
- type: file
currentLogFilename: ../log/texera-workflow-compiling-service.log
threshold: ALL
queueSize: 512
discardingThreshold: 0
archive: false
timeZone: UTC
logFormat: "[%date{ISO8601}] [%level] [%logger] [%thread] - %msg %n"
bufferSize: 8KiB
immediateFlush: true
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package edu.uci.ics.amber.engine.architecture.deploysemantics

import akka.actor.Deploy
import akka.remote.RemoteScope
import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties}
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.architecture.common.AkkaActorService
import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution
Expand Down Expand Up @@ -157,6 +158,17 @@ object PhysicalOp {
}
}

// @JsonIgnore is not working when directly annotated to fields of a case class
// https://stackoverflow.com/questions/40482904/jsonignore-doesnt-work-in-scala-case-class
@JsonIgnoreProperties(
Array(
"opExecInitInfo", // function type, ignore it
"derivePartition", // function type, ignore it
"inputPorts", // may contain very long stacktrace, ignore it
"outputPorts", // same reason with above
"propagateSchema" // function type, so ignore it
)
)
case class PhysicalOp(
// the identifier of this PhysicalOp
id: PhysicalOpIdentity,
Expand Down Expand Up @@ -202,7 +214,6 @@ case class PhysicalOp(
/**
* Helper functions related to compile-time operations
*/

def isSourceOperator: Boolean = {
inputPorts.isEmpty
}
Expand All @@ -223,6 +234,7 @@ case class PhysicalOp(
}
}

@JsonIgnore // this is needed to prevent the serialization issue
def getPythonCode: String = {
val (code, _) =
opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package edu.uci.ics.texera.web

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.github.toastshaman.dropwizard.auth.jwt.JwtAuthFilter
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.texera.Utils
import edu.uci.ics.texera.web.TexeraWebApplication.parseArgs
import edu.uci.ics.texera.web.auth.JwtAuth.jwtConsumer
import edu.uci.ics.texera.web.auth.{
GuestAuthFilter,
SessionUser,
UserAuthenticator,
UserRoleAuthorizer
}
import edu.uci.ics.texera.web.resource.WorkflowCompilationResource
import io.dropwizard.auth.{AuthDynamicFeature, AuthValueFactoryProvider}
import io.dropwizard.setup.{Bootstrap, Environment}
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature

object TexeraWorkflowCompilingService {
def main(args: Array[String]): Unit = {
val argMap = parseArgs(args)

new TexeraWorkflowCompilingService().run(
"server",
Utils.amberHomePath
.resolve("src")
.resolve("main")
.resolve("resources")
.resolve("texera-compiling-service-web-config.yml")
.toString
)
}
}

class TexeraWorkflowCompilingService
extends io.dropwizard.Application[TexeraWorkflowCompilingServiceConfiguration]
with LazyLogging {
override def initialize(
bootstrap: Bootstrap[TexeraWorkflowCompilingServiceConfiguration]
): Unit = {
// register scala module to dropwizard default object mapper
bootstrap.getObjectMapper.registerModule(DefaultScalaModule)
}

override def run(
configuration: TexeraWorkflowCompilingServiceConfiguration,
environment: Environment
): Unit = {
// serve backend at /api/texera
environment.jersey.setUrlPattern("/api/texera/*")

// register the compilation endpoint
environment.jersey.register(classOf[WorkflowCompilationResource])

// Add JWT Auth layer (without session)
if (AmberConfig.isUserSystemEnabled) {
environment.jersey.register(
new AuthDynamicFeature(
new JwtAuthFilter.Builder[SessionUser]()
.setJwtConsumer(jwtConsumer)
.setRealm("realm")
.setPrefix("Bearer")
.setAuthenticator(UserAuthenticator)
.setAuthorizer(UserRoleAuthorizer)
.buildAuthFilter()
)
)
} else {
environment.jersey.register(
new AuthDynamicFeature(
new GuestAuthFilter.Builder().setAuthorizer(UserRoleAuthorizer).buildAuthFilter()
)
)
}

environment.jersey.register(
new AuthValueFactoryProvider.Binder[SessionUser](classOf[SessionUser])
)
environment.jersey.register(classOf[RolesAllowedDynamicFeature])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package edu.uci.ics.texera.web;

import io.dropwizard.Configuration;

public class TexeraWorkflowCompilingServiceConfiguration extends Configuration {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ import javax.annotation.security.RolesAllowed
import javax.ws.rs._
import javax.ws.rs.core.MediaType

/**
* The SchemaPropagation functionality will be included by the standalone compiling service
*/
@Deprecated
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/queryplan")
class SchemaPropagationResource extends LazyLogging {

@Deprecated
@POST
@Path("/autocomplete/{wid}")
@RolesAllowed(Array("REGULAR", "ADMIN"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package edu.uci.ics.texera.web.resource

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.common.virtualidentity.WorkflowIdentity
import edu.uci.ics.texera.Utils
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.tuple.schema.Attribute
import edu.uci.ics.texera.workflow.common.workflow.{PhysicalPlan, WorkflowCompiler}
import org.jooq.types.UInteger

import javax.annotation.security.RolesAllowed
import javax.ws.rs.{Consumes, POST, Path, PathParam, Produces}
import javax.ws.rs.core.MediaType

case class WorkflowCompilationResponse(
physicalPlan: Option[PhysicalPlan],
operatorInputSchemas: Map[String, List[Option[List[Attribute]]]],
operatorErrors: Map[String, String]
)

@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@RolesAllowed(Array("REGULAR", "ADMIN"))
@Path("/compilation")
class WorkflowCompilationResource extends LazyLogging {
@POST
@Path("/{wid}")
def compileWorkflow(
workflowStr: String,
@PathParam("wid") wid: UInteger
): WorkflowCompilationResponse = {
val logicalPlanPojo = Utils.objectMapper.readValue(workflowStr, classOf[LogicalPlanPojo])

val context = new WorkflowContext(
workflowId = WorkflowIdentity(wid.toString.toLong)
)

// compile the pojo using WorkflowCompiler
val workflowCompilationResult =
new WorkflowCompiler(context).compile(logicalPlanPojo)
// return the result
WorkflowCompilationResponse(
physicalPlan = workflowCompilationResult.physicalPlan,
operatorInputSchemas = workflowCompilationResult.operatorIdToInputSchemas.map {
case (operatorIdentity, schemas) =>
val opId = operatorIdentity.id
val attributes = schemas.map { schema =>
if (schema.isEmpty)
None
else
Some(schema.get.attributes)
}
(opId, attributes)
},
operatorErrors = workflowCompilationResult.operatorIdToError.map {
case (operatorIdentity, error) => (operatorIdentity.id, error.toString)
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class WorkflowWebsocketResource extends LazyLogging {
sessionState.send(modifyLogicResponse)
}
case editingTimeCompilationRequest: EditingTimeCompilationRequest =>
// TODO: remove this after separating the workflow compiler as a standalone service
val stateStore = if (executionStateOpt.isDefined) {
val currentState =
executionStateOpt.get.executionStateStore.metadataStore.getState.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,17 @@ object PropertyNameConstants { // logical plan property names
final val RESULT_ATTRIBUTE_NAME = "resultAttribute"
final val SPAN_LIST_NAME = "spanListName"
final val TABLE_NAME = "tableName"

// physical plan property names
final val WORKFLOW_ID = "workflowID"
final val EXECUTION_ID = "executionID"
final val PARALLELIZABLE = "parallelizable"
final val LOCATION_PREFERENCE = "locationPreference"
final val PARTITION_REQUIREMENT = "partitionRequirement"
// derivePartition is a function type that cannot be serialized
final val INPUT_PORTS = "inputPorts"
final val OUTPUT_PORTS = "outputPorts"
// propagateSchema is a function type that cannot be serialized
final val IS_ONE_TO_MANY_OP = "isOneToManyOp"
final val SUGGESTED_WORKER_NUM = "suggestedWorkerNum"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.uci.ics.texera.workflow.common.workflow

import com.fasterxml.jackson.annotation.JsonIgnore
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.architecture.deploysemantics.PhysicalOp
import edu.uci.ics.amber.engine.common.VirtualIdentityUtils
Expand All @@ -19,7 +20,6 @@ import org.jgrapht.util.SupplierUtil
import scala.jdk.CollectionConverters.{IteratorHasAsScala, ListHasAsScala, SetHasAsScala}

object PhysicalPlan {

def apply(context: WorkflowContext, logicalPlan: LogicalPlan): PhysicalPlan = {

var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)
Expand Down Expand Up @@ -249,6 +249,7 @@ case class PhysicalPlan(
/**
* create a DAG similar to the physical DAG but with all dependee links removed.
*/
@JsonIgnore // this is needed to prevent the serialization issue
def getDependeeLinksRemovedDAG: PhysicalPlan = {
this.copy(operators, links.diff(getDependeeLinks))
}
Expand Down
Loading

0 comments on commit 4446179

Please sign in to comment.