Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TexeraWorkflowCompilingService for Workflow Compilation #2796

Merged
merged 20 commits into from
Sep 6, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
server:
# modify applicationContextPath if you want the root path to be the name of the application
# for example, set it to /twitter, then the url will become texera.ics.uci.edu:port/twitter
applicationContextPath: /
applicationConnectors:
- type: http
port: 9090
adminConnectors:
- type: http
port: 9091
requestLog:
type: classic
timeZone: UTC
appenders:
- type: console
- type: file
currentLogFilename: ../log/access.log
threshold: ALL
queueSize: 512
discardingThreshold: 0
archive: true
archivedLogFilenamePattern: ../log/access-%d{yyyy-MM-dd}.log.gz
archivedFileCount: 7
bufferSize: 8KiB
immediateFlush: true
logging:
level: INFO
loggers:
"io.dropwizard": INFO
appenders:
- type: console
logFormat: "[%date{ISO8601}] [%level] [%logger] [%thread] - %msg %n"
- type: file
currentLogFilename: ../log/texera-workflow-compiling-service.log
threshold: ALL
queueSize: 512
discardingThreshold: 0
archive: false
timeZone: UTC
logFormat: "[%date{ISO8601}] [%level] [%logger] [%thread] - %msg %n"
bufferSize: 8KiB
immediateFlush: true
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package edu.uci.ics.amber.engine.architecture.deploysemantics

import akka.actor.Deploy
import akka.remote.RemoteScope
import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties}
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.architecture.common.AkkaActorService
import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution
Expand Down Expand Up @@ -157,6 +158,17 @@ object PhysicalOp {
}
}

// @JsonIgnore is not working when directly annotated to fields of a case class
// https://stackoverflow.com/questions/40482904/jsonignore-doesnt-work-in-scala-case-class
@JsonIgnoreProperties(
Array(
"opExecInitInfo", // function type, ignore it
"derivePartition", // function type, ignore it
"inputPorts", // may contain very long stacktrace, ignore it
"outputPorts", // same reason with above
"propagateSchema" // function type, so ignore it
)
)
case class PhysicalOp(
// the identifier of this PhysicalOp
id: PhysicalOpIdentity,
Expand Down Expand Up @@ -202,7 +214,6 @@ case class PhysicalOp(
/**
* Helper functions related to compile-time operations
*/

def isSourceOperator: Boolean = {
inputPorts.isEmpty
}
Expand All @@ -223,6 +234,7 @@ case class PhysicalOp(
}
}

@JsonIgnore // this is needed to prevent the serialization issue
def getPythonCode: String = {
val (code, _) =
opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package edu.uci.ics.texera.web

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.github.toastshaman.dropwizard.auth.jwt.JwtAuthFilter
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.texera.Utils
import edu.uci.ics.texera.web.TexeraWebApplication.parseArgs
import edu.uci.ics.texera.web.auth.JwtAuth.jwtConsumer
import edu.uci.ics.texera.web.auth.{
GuestAuthFilter,
SessionUser,
UserAuthenticator,
UserRoleAuthorizer
}
import edu.uci.ics.texera.web.resource.WorkflowCompilationResource
import io.dropwizard.auth.{AuthDynamicFeature, AuthValueFactoryProvider}
import io.dropwizard.setup.{Bootstrap, Environment}
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature

object TexeraWorkflowCompilingService {
def main(args: Array[String]): Unit = {
val argMap = parseArgs(args)

new TexeraWorkflowCompilingService().run(
"server",
Utils.amberHomePath
.resolve("src")
.resolve("main")
.resolve("resources")
.resolve("texera-compiling-service-web-config.yml")
.toString
)
}
}

class TexeraWorkflowCompilingService
extends io.dropwizard.Application[TexeraWorkflowCompilingServiceConfiguration]
with LazyLogging {
override def initialize(
bootstrap: Bootstrap[TexeraWorkflowCompilingServiceConfiguration]
): Unit = {
// register scala module to dropwizard default object mapper
bootstrap.getObjectMapper.registerModule(DefaultScalaModule)
}

override def run(
configuration: TexeraWorkflowCompilingServiceConfiguration,
environment: Environment
): Unit = {
// serve backend at /api/texera
environment.jersey.setUrlPattern("/api/texera/*")

// register the compilation endpoint
environment.jersey.register(classOf[WorkflowCompilationResource])

// Add JWT Auth layer (without session)
if (AmberConfig.isUserSystemEnabled) {
environment.jersey.register(
new AuthDynamicFeature(
new JwtAuthFilter.Builder[SessionUser]()
.setJwtConsumer(jwtConsumer)
.setRealm("realm")
.setPrefix("Bearer")
.setAuthenticator(UserAuthenticator)
.setAuthorizer(UserRoleAuthorizer)
.buildAuthFilter()
)
)
} else {
environment.jersey.register(
new AuthDynamicFeature(
new GuestAuthFilter.Builder().setAuthorizer(UserRoleAuthorizer).buildAuthFilter()
)
)
}

environment.jersey.register(
new AuthValueFactoryProvider.Binder[SessionUser](classOf[SessionUser])
)
environment.jersey.register(classOf[RolesAllowedDynamicFeature])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package edu.uci.ics.texera.web;

import io.dropwizard.Configuration;

public class TexeraWorkflowCompilingServiceConfiguration extends Configuration {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ import javax.annotation.security.RolesAllowed
import javax.ws.rs._
import javax.ws.rs.core.MediaType

/**
* The SchemaPropagation functionality will be included by the standalone compiling service
*/
@Deprecated
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/queryplan")
class SchemaPropagationResource extends LazyLogging {

@Deprecated
@POST
@Path("/autocomplete/{wid}")
@RolesAllowed(Array("REGULAR", "ADMIN"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package edu.uci.ics.texera.web.resource

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.common.virtualidentity.WorkflowIdentity
import edu.uci.ics.texera.Utils
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.tuple.schema.Attribute
import edu.uci.ics.texera.workflow.common.workflow.{PhysicalPlan, WorkflowCompiler}
import org.jooq.types.UInteger

import javax.annotation.security.RolesAllowed
import javax.ws.rs.{Consumes, POST, Path, PathParam, Produces}
import javax.ws.rs.core.MediaType

case class WorkflowCompilationResponse(
physicalPlan: Option[PhysicalPlan],
operatorInputSchemas: Map[String, List[Option[List[Attribute]]]],
operatorErrors: Map[String, String]
)

@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@RolesAllowed(Array("REGULAR", "ADMIN"))
@Path("/compilation")
class WorkflowCompilationResource extends LazyLogging {
@POST
@Path("/{wid}")
def compileWorkflow(
workflowStr: String,
@PathParam("wid") wid: UInteger
): WorkflowCompilationResponse = {
val logicalPlanPojo = Utils.objectMapper.readValue(workflowStr, classOf[LogicalPlanPojo])

val context = new WorkflowContext(
workflowId = WorkflowIdentity(wid.toString.toLong)
)

// compile the pojo using WorkflowCompiler
bobbai00 marked this conversation as resolved.
Show resolved Hide resolved
val workflowCompilationResult =
new WorkflowCompiler(context).compile(logicalPlanPojo)
// return the result
WorkflowCompilationResponse(
physicalPlan = workflowCompilationResult.physicalPlan,
operatorInputSchemas = workflowCompilationResult.operatorIdToInputSchemas.map {
case (operatorIdentity, schemas) =>
val opId = operatorIdentity.id
val attributes = schemas.map { schema =>
if (schema.isEmpty)
None
else
Some(schema.get.attributes)
}
(opId, attributes)
},
operatorErrors = workflowCompilationResult.operatorIdToError.map {
case (operatorIdentity, error) => (operatorIdentity.id, error.toString)
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class WorkflowWebsocketResource extends LazyLogging {
sessionState.send(modifyLogicResponse)
}
case editingTimeCompilationRequest: EditingTimeCompilationRequest =>
// TODO: remove this after separating the workflow compiler as a standalone service
val stateStore = if (executionStateOpt.isDefined) {
val currentState =
executionStateOpt.get.executionStateStore.metadataStore.getState.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,17 @@ object PropertyNameConstants { // logical plan property names
final val RESULT_ATTRIBUTE_NAME = "resultAttribute"
final val SPAN_LIST_NAME = "spanListName"
final val TABLE_NAME = "tableName"

// physical plan property names
final val WORKFLOW_ID = "workflowID"
final val EXECUTION_ID = "executionID"
final val PARALLELIZABLE = "parallelizable"
final val LOCATION_PREFERENCE = "locationPreference"
final val PARTITION_REQUIREMENT = "partitionRequirement"
// derivePartition is a function type that cannot be serialized
final val INPUT_PORTS = "inputPorts"
final val OUTPUT_PORTS = "outputPorts"
// propagateSchema is a function type that cannot be serialized
final val IS_ONE_TO_MANY_OP = "isOneToManyOp"
final val SUGGESTED_WORKER_NUM = "suggestedWorkerNum"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.uci.ics.texera.workflow.common.workflow

import com.fasterxml.jackson.annotation.JsonIgnore
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.architecture.deploysemantics.PhysicalOp
import edu.uci.ics.amber.engine.common.VirtualIdentityUtils
Expand All @@ -19,7 +20,6 @@ import org.jgrapht.util.SupplierUtil
import scala.jdk.CollectionConverters.{IteratorHasAsScala, ListHasAsScala, SetHasAsScala}

object PhysicalPlan {

def apply(context: WorkflowContext, logicalPlan: LogicalPlan): PhysicalPlan = {

var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)
Expand Down Expand Up @@ -249,6 +249,7 @@ case class PhysicalPlan(
/**
* create a DAG similar to the physical DAG but with all dependee links removed.
*/
@JsonIgnore // this is needed to prevent the serialization issue
def getDependeeLinksRemovedDAG: PhysicalPlan = {
this.copy(operators, links.diff(getDependeeLinks))
}
Expand Down
Loading
Loading