Skip to content

Commit

Permalink
organize codes
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Aug 21, 2024
1 parent 1540db7 commit 5738e9d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class WorkflowCompilationResource extends LazyLogging {
@POST
@Path("/{wid}")
@RolesAllowed(Array("REGULAR", "ADMIN"))
def suggestAutocompleteSchema(
def compileWorkflow(
workflowStr: String,
@PathParam("wid") wid: UInteger,
@Auth sessionUser: SessionUser
Expand All @@ -42,7 +42,7 @@ class WorkflowCompilationResource extends LazyLogging {
)

// compile the pojo
val workflowCompilationResult = new WorkflowCompiler(context).cleanCompile(logicalPlanPojo)
val workflowCompilationResult = new WorkflowCompiler(context).compileToPhysicalPlan(logicalPlanPojo)

// return the result
WorkflowCompilationResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class WorkflowWebsocketResource extends LazyLogging {
sessionState.send(modifyLogicResponse)
}
case editingTimeCompilationRequest: EditingTimeCompilationRequest =>
// TODO: remove this after separating the workflow compiler as a standalone service
val stateStore = if (executionStateOpt.isDefined) {
val currentState =
executionStateOpt.get.executionStateStore.metadataStore.getState.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class WorkflowCompiler(
context: WorkflowContext
) extends LazyLogging {

/**
* Compile the workflow to logical plan and errors(if any)
* @param logicalPlanPojo the pojo parsed from workflow str provided by user
* @return LogicalPlan, and a Map from OpId to Op's error(this map is empty if there is no error)
*/
def compileToLogicalPlan(
logicalPlanPojo: LogicalPlanPojo
): (LogicalPlan, Map[OperatorIdentity, WorkflowFatalError]) = {
Expand Down Expand Up @@ -62,11 +67,53 @@ class WorkflowCompiler(
}
(logicalPlan, opIdToError.toMap)
}

/**
* Compile a workflow to physical plan, along with the schema propagation result and error(if any)
*
* @param logicalPlanPojo the pojo parsed from workflow str provided by user
* @return WorkflowCompilationResult, containing the physical plan, input schemas per op and error per op
*/
def compileToPhysicalPlan(
logicalPlanPojo: LogicalPlanPojo
): WorkflowCompilationResult = {
val (logicalPlan, opIdToError) = compileToLogicalPlan(logicalPlanPojo)
if (opIdToError.nonEmpty) {
// encounter errors during compile pojo to logical plan,
// so directly return empty physical plan, schema map and non-empty error map
return WorkflowCompilationResult(
physicalPlan = PhysicalPlan.empty,
operatorIdToInputSchemas = Map.empty,
operatorIdToError = opIdToError
)
}
// from logical plan to physical plan
val physicalPlan = PhysicalPlan(context, logicalPlan)

// Extract physical input schemas, excluding internal ports
val physicalInputSchemas = physicalPlan.operators.map { physicalOp =>
physicalOp.id -> physicalOp.inputPorts.values
.filterNot(_._1.id.internal)
.map {
case (port, _, schema) => port.id -> schema.toOption
}
}

// Group the physical input schemas by their logical operator ID and consolidate the schemas
val opIdToInputSchemas = physicalInputSchemas
.groupBy(_._1.logicalOpId)
.view
.mapValues(_.flatMap(_._2).toList.sortBy(_._1.id).map(_._2))
.toMap

WorkflowCompilationResult(physicalPlan, opIdToInputSchemas, Map.empty)
}

def compileLogicalPlan(
logicalPlanPojo: LogicalPlanPojo,
executionStateStore: ExecutionStateStore
): LogicalPlan = {

// TODO: remove this function after separating compiler as a standalone service
val errorList = new ArrayBuffer[(OperatorIdentity, Throwable)]()
// remove previous error state
executionStateStore.metadataStore.updateState { metadataStore =>
Expand Down Expand Up @@ -109,10 +156,11 @@ class WorkflowCompiler(
opResultStorage: OpResultStorage,
executionStateStore: ExecutionStateStore
): Workflow = {
// TODO: remove this function after separating compiler as a standalone service
// generate a LogicalPlan. The logical plan is the injected with all necessary sinks
val logicalPlan = compileLogicalPlan(logicalPlanPojo, executionStateStore)

// assign the storage location to sink operators
// TODO: push the sink storage assignment directly on physical plan in workflow execution service
assignSinkStorage(
logicalPlan,
context,
Expand All @@ -129,49 +177,14 @@ class WorkflowCompiler(
)
}

def cleanCompile(
logicalPlanPojo: LogicalPlanPojo
): WorkflowCompilationResult = {
val (logicalPlan, opIdToError) = compileToLogicalPlan(logicalPlanPojo)
if (opIdToError.nonEmpty) {
// encounter error during compile the logical plan pojo to logical plan,
// so directly return empty physical plan, empty schema map and error
return WorkflowCompilationResult(
physicalPlan = PhysicalPlan.empty,
operatorIdToInputSchemas = Map.empty,
operatorIdToError = opIdToError
)
}
// the PhysicalPlan with topology expanded.
val physicalPlan = PhysicalPlan(context, logicalPlan)

// Extract physical input schemas, excluding internal ports
val physicalInputSchemas = physicalPlan.operators.map { physicalOp =>
physicalOp.id -> physicalOp.inputPorts.values
.filterNot(_._1.id.internal)
.map {
case (port, _, schema) => port.id -> schema.toOption
}
}

// Group the physical input schemas by their logical operator ID and consolidate the schemas
val opIdToInputSchemas = physicalInputSchemas
.groupBy(_._1.logicalOpId)
.view
.mapValues(_.flatMap(_._2).toList.sortBy(_._1.id).map(_._2))
.toMap

WorkflowCompilationResult(physicalPlan, opIdToInputSchemas, Map.empty)
}

private def assignSinkStorage(
logicalPlan: LogicalPlan,
context: WorkflowContext,
storage: OpResultStorage,
reuseStorageSet: Set[OperatorIdentity] = Set()
): Unit = {
// create a JSON object that holds pointers to the workflow's results in Mongo
// TODO in the future, will extract this logic from here when we need pointers to the stats storage
// TODO: move it to the execution service, and change the 1st parameter from LogicalPlan to PhysicalPlan
val resultsJSON = objectMapper.createObjectNode()
val sinksPointers = objectMapper.createArrayNode()
// assign storage to texera-managed sinks before generating exec config
Expand Down

0 comments on commit 5738e9d

Please sign in to comment.