Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic For Loop #3214

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ message ControlRequest {
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
IterationCompletedRequest iterationCompletedRequest = 12;

// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
Expand Down Expand Up @@ -152,6 +153,11 @@ message PortCompletedRequest {
bool input = 2;
}

message IterationCompletedRequest {
core.ActorVirtualIdentity startWorkerId = 1 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity endWorkerId = 2 [(scalapb.field).no_box = true];
}

message WorkerStateUpdatedRequest {
worker.WorkerState state = 1 [(scalapb.field).no_box = true];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ service ControllerService {
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatePythonExpressionResponse);
rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns (EmptyReturn);
rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn);
rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn);
rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse);
rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn);
rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ service WorkerService {
rpc PrepareCheckpoint(PrepareCheckpointRequest) returns (EmptyReturn);
rpc QueryStatistics(EmptyRequest) returns (WorkerMetricsResponse);
rpc ResumeWorker(EmptyRequest) returns (WorkerStateResponse);
rpc ResumeLoop(IterationCompletedRequest) returns (EmptyReturn);
rpc RetrieveState(EmptyRequest) returns (EmptyReturn);
rpc RetryCurrentTuple(EmptyRequest) returns (EmptyReturn);
rpc StartWorker(EmptyRequest) returns (WorkerStateResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ControllerAsyncRPCHandlerInitializer(
with AmberLogging
with LinkWorkersHandler
with WorkerExecutionCompletedHandler
with IterationCompletedHandler
with WorkerStateUpdatedHandler
with PauseHandler
with QueryWorkerStatisticsHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package edu.uci.ics.amber.engine.architecture.controller.promisehandlers

import com.twitter.util.Future
import edu.uci.ics.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, IterationCompletedRequest}
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn


trait IterationCompletedHandler {
this: ControllerAsyncRPCHandlerInitializer =>

override def iterationCompleted(
msg: IterationCompletedRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
workerInterface.resumeLoop(msg, mkContext(msg.startWorkerId))
EmptyReturn()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging {
}

def getPortId: PortIdentity = {
this.portId.get
if (this.portId.isEmpty) {
PortIdentity()
} else {
this.portId.get
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,24 @@ package edu.uci.ics.amber.engine.architecture.worker

import com.softwaremill.macwire.wire
import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.marker.{EndOfInputChannel, StartOfInputChannel, State}
import edu.uci.ics.amber.core.tuple.{
FinalizeExecutor,
FinalizePort,
SchemaEnforceable,
Tuple,
TupleLike
}
import edu.uci.ics.amber.core.marker.{EndOfInputChannel, EndOfIteration, StartOfInputChannel, StartOfIteration, State}
import edu.uci.ics.amber.core.tuple.{FinalizeExecutor, FinalizePort, SchemaEnforceable, Tuple, TupleLike}
import edu.uci.ics.amber.engine.architecture.common.AmberProcessor
import edu.uci.ics.amber.engine.architecture.logreplay.ReplayLogManager
import edu.uci.ics.amber.engine.architecture.messaginglayer.{
InputManager,
OutputManager,
WorkerTimerService
}
import edu.uci.ics.amber.engine.architecture.messaginglayer.{InputManager, OutputManager, WorkerTimerService}
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.ChannelMarkerType.REQUIRE_ALIGNMENT
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands._
import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage
import edu.uci.ics.amber.engine.architecture.worker.managers.SerializationManager
import edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState.{
COMPLETED,
READY,
RUNNING
}
import edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState.{COMPLETED, READY, RUNNING}
import edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerStatistics
import edu.uci.ics.amber.engine.common.ambermessage._
import edu.uci.ics.amber.engine.common.statetransition.WorkerStateManager
import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
import edu.uci.ics.amber.error.ErrorUtils.{mkConsoleMessage, safely}
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.operator.loop.{LoopEndOpExec, LoopStartOpExec}

class DataProcessor(
actorId: ActorVirtualIdentity,
Expand Down Expand Up @@ -185,6 +172,8 @@ class DataProcessor(
asyncRPCClient.mkContext(CONTROLLER)
)
case schemaEnforceable: SchemaEnforceable =>

println("data:",statisticsManager.getOutputTupleCount,schemaEnforceable)
if (outputPortOpt.isEmpty) {
statisticsManager.increaseOutputTupleCount(outputManager.getSingleOutputPortIdentity)
} else {
Expand Down Expand Up @@ -212,6 +201,7 @@ class DataProcessor(
): Unit = {
val dataProcessingStartTime = System.nanoTime()
val portId = this.inputGateway.getChannel(channelId).getPortId
println("dp:",executor, dataPayload)
dataPayload match {
case DataFrame(tuples) =>
stateManager.conditionalTransitTo(
Expand All @@ -228,6 +218,33 @@ class DataProcessor(
processInputTuple(inputManager.getNextTuple)
case MarkerFrame(marker) =>
marker match {
case StartOfIteration() =>


executor match {
case loopStartExecutor: LoopStartOpExec =>
if (loopStartExecutor.checkCondition()){
processEndOfInputChannel(portId.id)
outputManager.emitMarker(EndOfIteration(actorId))
}
else{
outputManager.finalizeOutput() // fix here
}
case _ =>
}
case EndOfIteration(startWorkerId) =>
if (executor.isInstanceOf[LoopEndOpExec]){
asyncRPCClient.controllerInterface.iterationCompleted(
IterationCompletedRequest(startWorkerId, actorId),
asyncRPCClient.mkContext(CONTROLLER)
)
}
else{
executor.reset()
processEndOfInputChannel(portId.id)
outputManager.emitMarker(EndOfIteration(startWorkerId))
}

case state: State =>
processInputState(state, portId.id)
case StartOfInputChannel() =>
Expand All @@ -243,7 +260,14 @@ class DataProcessor(
}
if (inputManager.getAllPorts.forall(portId => inputManager.isPortCompleted(portId))) {
// assuming all the output ports finalize after all input ports are finalized.
outputManager.finalizeOutput()

executor match {
case _: LoopStartOpExec =>
outputManager.emitMarker(EndOfIteration(actorId))
case _ =>
outputManager.finalizeOutput()
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
with AddPartitioningHandler
with QueryStatisticsHandler
with ResumeHandler
with ResumeLoopHandler
with StartHandler
with AssignPortHandler
with AddInputChannelHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package edu.uci.ics.amber.engine.architecture.worker.promisehandlers

import com.twitter.util.Future
import edu.uci.ics.amber.core.marker.StartOfIteration
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, IterationCompletedRequest}
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.ResumeLoopHandler.loopToSelfChannelId
import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
import edu.uci.ics.amber.engine.common.ambermessage.MarkerFrame

object ResumeLoopHandler {

private val loopSelf = ActorVirtualIdentity("loopSelf")
val loopToSelfChannelId: ChannelIdentity = ChannelIdentity(loopSelf, loopSelf, isControl = false)
}


trait ResumeLoopHandler {
this: DataProcessorRPCHandlerInitializer =>

override def resumeLoop(
request: IterationCompletedRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
dp.processDataPayload(
loopToSelfChannelId,
MarkerFrame(StartOfIteration()))
EmptyReturn()
}

}
Binary file added core/gui/src/assets/operator_images/LoopEnd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added core/gui/src/assets/operator_images/LoopStart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ trait OperatorExecutor {

def close(): Unit = {}

def reset(): Unit = {
close()
open()
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package edu.uci.ics.amber.core.marker

import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity

import scala.collection.mutable

sealed trait Marker

final case class StartOfInputChannel() extends Marker
final case class EndOfInputChannel() extends Marker
final case class StartOfIteration() extends Marker
final case class EndOfIteration(workerId: ActorVirtualIdentity) extends Marker

final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean = false)
extends Marker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ import com.fasterxml.jackson.annotation._
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
WorkflowIdentity
}
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID}
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity}
import edu.uci.ics.amber.operator.aggregate.AggregateOpDesc
Expand All @@ -20,22 +16,15 @@ import edu.uci.ics.amber.operator.distinct.DistinctOpDesc
import edu.uci.ics.amber.operator.dummy.DummyOpDesc
import edu.uci.ics.amber.operator.filter.SpecializedFilterOpDesc
import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc
import edu.uci.ics.amber.operator.huggingFace.{
HuggingFaceIrisLogisticRegressionOpDesc,
HuggingFaceSentimentAnalysisOpDesc,
HuggingFaceSpamSMSDetectionOpDesc,
HuggingFaceTextSummarizationOpDesc
}
import edu.uci.ics.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc}
import edu.uci.ics.amber.operator.ifStatement.IfOpDesc
import edu.uci.ics.amber.operator.intersect.IntersectOpDesc
import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc
import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc
import edu.uci.ics.amber.operator.limit.LimitOpDesc
import edu.uci.ics.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
import edu.uci.ics.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc
import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{
SklearnAdvancedKNNClassifierTrainerOpDesc,
SklearnAdvancedKNNRegressorTrainerOpDesc
}
import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc}
import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc
import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc
import edu.uci.ics.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants}
Expand All @@ -45,13 +34,11 @@ import edu.uci.ics.amber.operator.regex.RegexOpDesc
import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc
import edu.uci.ics.amber.operator.sentiment.SentimentAnalysisOpDesc
import edu.uci.ics.amber.operator.sklearn._
import edu.uci.ics.amber.operator.sleep.SleepOpDesc
import edu.uci.ics.amber.operator.sort.SortOpDesc
import edu.uci.ics.amber.operator.sortPartitions.SortPartitionsOpDesc
import edu.uci.ics.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc
import edu.uci.ics.amber.operator.source.apis.twitter.v2.{
TwitterFullArchiveSearchSourceOpDesc,
TwitterSearchSourceOpDesc
}
import edu.uci.ics.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc}
import edu.uci.ics.amber.operator.source.fetcher.URLFetcherOpDesc
import edu.uci.ics.amber.operator.source.scan.FileScanSourceOpDesc
import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc
Expand Down Expand Up @@ -162,6 +149,9 @@ trait StateTransferFunc
new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"),
new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"),
new Type(value = classOf[LimitOpDesc], name = "Limit"),
new Type(value = classOf[SleepOpDesc], name = "Sleep"),
new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"),
new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"),
new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"),
new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"),
new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class LimitOpExec(descString: String) extends OperatorExecutor {
private val desc: LimitOpDesc = objectMapper.readValue(descString, classOf[LimitOpDesc])
var count = 0

override def open(): Unit = {
count = 0
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {

if (count < desc.limit) {
Expand All @@ -16,7 +20,5 @@ class LimitOpExec(descString: String) extends OperatorExecutor {
} else {
Iterator()
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package edu.uci.ics.amber.operator.loop

import edu.uci.ics.amber.core.executor.OpExecWithClassName
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}

class LoopEndOpDesc extends LogicalOp {
override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec")
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withSuggestedWorkerNum(1)
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
"Loop End",
"Loop End",
OperatorGroupConstants.CONTROL_GROUP,
inputPorts = List(InputPort()),
outputPorts = List(OutputPort())
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edu.uci.ics.amber.operator.loop

import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import scala.collection.mutable

class LoopEndOpExec extends OperatorExecutor {
private val data = new mutable.ArrayBuffer[Tuple]

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
data.append(tuple)
Iterator.empty
}

override def onFinish(port: Int): Iterator[TupleLike] = data.iterator
}
Loading
Loading