From 74d50b0b49c4e7e1dc61472eb32aa057662d4ba2 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Sat, 17 Feb 2024 23:12:57 -0800 Subject: [PATCH] progress --- .../core/noderun/NodeFailureException.java | 25 + .../getable/core/noderun/NodeRunModel.java | 501 +++++++++++------- .../model/getable/core/wfrun/SubNodeRun.java | 76 ++- .../getable/core/wfrun/ThreadRunModel.java | 107 ++-- .../model/getable/core/wfrun/WfRunModel.java | 48 +- .../subnoderun/ExternalEventRunModel.java | 2 +- .../wfrun/subnoderun/SleepNodeRunModel.java | 2 +- .../subnoderun/WaitForThreadsRunModel.java | 2 +- 8 files changed, 458 insertions(+), 305 deletions(-) create mode 100644 server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeFailureException.java diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeFailureException.java b/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeFailureException.java new file mode 100644 index 0000000000..e357fea33e --- /dev/null +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeFailureException.java @@ -0,0 +1,25 @@ +package io.littlehorse.common.model.getable.core.noderun; + +import io.littlehorse.common.model.getable.core.wfrun.failure.FailureModel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * This class is thrown by the NodeRunModel class when trying to advance. It wraps the + * FailureModel class, and is handled by the ThreadRunModel. We are doing this to remove + * the ThreadRunModel#fail() method, since the NodeRunModel should not + */ +@Getter +@AllArgsConstructor +public class NodeFailureException extends Exception { + + /** + * The LittleHorse Workflow Failure that was thrown. + */ + private FailureModel failure; + + @Override + public String getMessage() { + return failure.getMessage(); + } +} diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeRunModel.java index 32e4769497..b3cf763d36 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/noderun/NodeRunModel.java @@ -18,6 +18,7 @@ import io.littlehorse.common.model.getable.core.wfrun.subnoderun.TaskNodeRunModel; import io.littlehorse.common.model.getable.core.wfrun.subnoderun.UserTaskNodeRunModel; import io.littlehorse.common.model.getable.core.wfrun.subnoderun.WaitForThreadsRunModel; +import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel; import io.littlehorse.common.model.getable.global.wfspec.node.NodeModel; import io.littlehorse.common.model.getable.objectId.NodeRunIdModel; import io.littlehorse.common.model.getable.objectId.WfSpecIdModel; @@ -25,6 +26,7 @@ import io.littlehorse.common.util.LHUtil; import io.littlehorse.sdk.common.proto.Failure; import io.littlehorse.sdk.common.proto.LHStatus; +import io.littlehorse.sdk.common.proto.Node; import io.littlehorse.sdk.common.proto.Node.NodeCase; import io.littlehorse.sdk.common.proto.NodeRun; import io.littlehorse.sdk.common.proto.NodeRun.NodeTypeCase; @@ -34,13 +36,14 @@ import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Optional; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; @Getter @@ -51,107 +54,43 @@ public class NodeRunModel extends CoreGetable { private NodeRunIdModel id; private WfSpecIdModel wfSpecId; private String threadSpecName; - - public LHStatus status; - - public Date arrivalTime; - public Date endTime; - - public String nodeName; - - public String errorMessage; - - public List failures = new ArrayList<>(); - - public ExternalEventRunModel externalEventRun; - public TaskNodeRunModel taskRun; - public NodeTypeCase type; - public ExitRunModel exitRun; - public EntrypointRunModel entrypointRun; - public StartThreadRunModel startThreadRun; + private LHStatus status; + private Date arrivalTime; + private Date endTime; + private String nodeName; + private String errorMessage; + private List failures = new ArrayList<>(); + private List failureHandlerIds = new ArrayList<>(); + + private NodeTypeCase type; + private ExternalEventRunModel externalEventRun; + private TaskNodeRunModel taskRun; + private ExitRunModel exitRun; + private EntrypointRunModel entrypointRun; + private StartThreadRunModel startThreadRun; private StartMultipleThreadsRunModel startMultipleThreadsRun; - public WaitForThreadsRunModel waitThreadsRun; - public SleepNodeRunModel sleepNodeRun; - public UserTaskNodeRunModel userTaskRun; + private WaitForThreadsRunModel waitThreadsRun; + private SleepNodeRunModel sleepNodeRun; + private UserTaskNodeRunModel userTaskRun; - public List failureHandlerIds = new ArrayList<>(); private ExecutionContext executionContext; - public NodeRunModel() {} - - public NodeRunModel(ProcessorExecutionContext processorContext) { - this.executionContext = processorContext; - } - - public Object getEntrypointRunForJacksonOnly() { - if (entrypointRun != null) { - return new HashMap<>(); - } - return null; - } - @Getter(AccessLevel.NONE) @Setter(AccessLevel.NONE) - private ThreadRunModel threadRunModelDoNotUseMe; + // Use `NodeRunModel#getThreadRun()`, as this field is lazy-loaded. + private ThreadRunModel threadRunDoNotUseMe; - public ThreadRunModel getThreadRun() { - if (threadRunModelDoNotUseMe == null) { - ProcessorExecutionContext processorContext = - executionContext.castOnSupport(ProcessorExecutionContext.class); - WfRunModel wfRunModel = processorContext.getableManager().get(id.getWfRunId()); - threadRunModelDoNotUseMe = wfRunModel.getThreadRun(id.getThreadRunNumber()); - } - return threadRunModelDoNotUseMe; - } - - public int getThreadRunNumber() { - return id.getThreadRunNumber(); - } - - public void setThreadRun(ThreadRunModel threadRunModel) { - threadRunModelDoNotUseMe = threadRunModel; - } - - public FailureModel getLatestFailure() { - if (failures.size() == 0) return null; - return failures.get(failures.size() - 1); - } + public NodeRunModel() {} - public NodeRunIdModel getObjectId() { - return id; + public NodeRunModel(ProcessorExecutionContext processorContext) { + this.executionContext = processorContext; } + @Override public Class getProtoBaseClass() { return NodeRun.class; } - public Date getCreatedAt() { - return arrivalTime; - } - - @Override - public List>> getIndexConfigurations() { - return List.of(new GetableIndex( - List.of( - Pair.of("status", GetableIndex.ValueType.SINGLE), - Pair.of("type", GetableIndex.ValueType.SINGLE)), - Optional.of(TagStorageType.LOCAL))); - } - - @Override - public List getIndexValues(String key, Optional tagStorageType) { - switch (key) { - case "status" -> { - return List.of(new IndexedField(key, this.getStatus().toString(), TagStorageType.LOCAL)); - } - case "type" -> { - return List.of(new IndexedField(key, this.getType().toString(), TagStorageType.LOCAL)); - } - } - log.warn("Tried to get value for unknown index field {}", key); - return List.of(); - } - @Override public void initFrom(Message p, ExecutionContext context) { NodeRun proto = (NodeRun) p; @@ -213,67 +152,35 @@ public void initFrom(Message p, ExecutionContext context) { getSubNodeRun().setNodeRun(this); } - public SubNodeRun getSubNodeRun() { - switch (type) { - case TASK: - return taskRun; - case EXTERNAL_EVENT: - return externalEventRun; - case ENTRYPOINT: - return entrypointRun; - case EXIT: - return exitRun; - case WAIT_THREADS: - return waitThreadsRun; - case START_THREAD: - return startThreadRun; - case SLEEP: - return sleepNodeRun; - case USER_TASK: - return userTaskRun; - case START_MULTIPLE_THREADS: - return startMultipleThreadsRun; - case NODETYPE_NOT_SET: - } - throw new RuntimeException("Not possible"); + @Override + public Date getCreatedAt() { + return arrivalTime; } - public void setSubNodeRun(SubNodeRun snr) { - Class cls = snr.getClass(); - if (cls.equals(TaskNodeRunModel.class)) { - type = NodeTypeCase.TASK; - taskRun = (TaskNodeRunModel) snr; - } else if (cls.equals(EntrypointRunModel.class)) { - type = NodeTypeCase.ENTRYPOINT; - entrypointRun = (EntrypointRunModel) snr; - } else if (cls.equals(ExitRunModel.class)) { - type = NodeTypeCase.EXIT; - exitRun = (ExitRunModel) snr; - } else if (cls.equals(ExternalEventRunModel.class)) { - type = NodeTypeCase.EXTERNAL_EVENT; - externalEventRun = (ExternalEventRunModel) snr; - } else if (cls.equals(StartThreadRunModel.class)) { - type = NodeTypeCase.START_THREAD; - startThreadRun = (StartThreadRunModel) snr; - } else if (cls.equals(WaitForThreadsRunModel.class)) { - type = NodeTypeCase.WAIT_THREADS; - waitThreadsRun = (WaitForThreadsRunModel) snr; - } else if (cls.equals(SleepNodeRunModel.class)) { - type = NodeTypeCase.SLEEP; - sleepNodeRun = (SleepNodeRunModel) snr; - } else if (cls.equals(UserTaskNodeRunModel.class)) { - type = NodeTypeCase.USER_TASK; - userTaskRun = (UserTaskNodeRunModel) snr; - } else if (cls.equals(StartMultipleThreadsRunModel.class)) { - type = NodeTypeCase.START_MULTIPLE_THREADS; - startMultipleThreadsRun = (StartMultipleThreadsRunModel) snr; - } else { - throw new RuntimeException("Didn't recognize " + snr.getClass()); - } + @Override + public List>> getIndexConfigurations() { + return List.of(new GetableIndex( + List.of( + Pair.of("status", GetableIndex.ValueType.SINGLE), + Pair.of("type", GetableIndex.ValueType.SINGLE)), + Optional.of(TagStorageType.LOCAL))); + } - snr.nodeRun = this; + @Override + public List getIndexValues(String key, Optional tagStorageType) { + switch (key) { + case "status" -> { + return List.of(new IndexedField(key, this.getStatus().toString(), TagStorageType.LOCAL)); + } + case "type" -> { + return List.of(new IndexedField(key, this.getType().toString(), TagStorageType.LOCAL)); + } + } + log.warn("Tried to get value for unknown index field {}", key); + return List.of(); } + @Override public NodeRun.Builder toProto() { NodeRun.Builder out = NodeRun.newBuilder() .setId(id.toProto()) @@ -327,6 +234,131 @@ public NodeRun.Builder toProto() { return out; } + @Override + public NodeRunIdModel getObjectId() { + return id; + } + + /** + * A SubNodeRun is the sub-field of a NodeRun. This method returns the appropriate one + * from this NodeRun. + * @return the SubNodeRun for this NodeRun. + */ + public SubNodeRun getSubNodeRun() { + switch (type) { + case TASK: + return taskRun; + case EXTERNAL_EVENT: + return externalEventRun; + case ENTRYPOINT: + return entrypointRun; + case EXIT: + return exitRun; + case WAIT_THREADS: + return waitThreadsRun; + case START_THREAD: + return startThreadRun; + case SLEEP: + return sleepNodeRun; + case USER_TASK: + return userTaskRun; + case START_MULTIPLE_THREADS: + return startMultipleThreadsRun; + case NODETYPE_NOT_SET: + } + throw new RuntimeException("Not possible"); + } + + /** + * Sets the SubNodeRun of this NodeRun. This will also set the type of the NodeRun. + * + * Called during initialization; eg. when the ThreadRunModel activates a new Node on the ThreadRun. + * @param subNodeRun is the SubNodeRun to assign for this NodeRunModel. + */ + public void setSubNodeRun(SubNodeRun subNodeRun) { + Class cls = subNodeRun.getClass(); + if (cls.equals(TaskNodeRunModel.class)) { + type = NodeTypeCase.TASK; + taskRun = (TaskNodeRunModel) subNodeRun; + } else if (cls.equals(EntrypointRunModel.class)) { + type = NodeTypeCase.ENTRYPOINT; + entrypointRun = (EntrypointRunModel) subNodeRun; + } else if (cls.equals(ExitRunModel.class)) { + type = NodeTypeCase.EXIT; + exitRun = (ExitRunModel) subNodeRun; + } else if (cls.equals(ExternalEventRunModel.class)) { + type = NodeTypeCase.EXTERNAL_EVENT; + externalEventRun = (ExternalEventRunModel) subNodeRun; + } else if (cls.equals(StartThreadRunModel.class)) { + type = NodeTypeCase.START_THREAD; + startThreadRun = (StartThreadRunModel) subNodeRun; + } else if (cls.equals(WaitForThreadsRunModel.class)) { + type = NodeTypeCase.WAIT_THREADS; + waitThreadsRun = (WaitForThreadsRunModel) subNodeRun; + } else if (cls.equals(SleepNodeRunModel.class)) { + type = NodeTypeCase.SLEEP; + sleepNodeRun = (SleepNodeRunModel) subNodeRun; + } else if (cls.equals(UserTaskNodeRunModel.class)) { + type = NodeTypeCase.USER_TASK; + userTaskRun = (UserTaskNodeRunModel) subNodeRun; + } else if (cls.equals(StartMultipleThreadsRunModel.class)) { + type = NodeTypeCase.START_MULTIPLE_THREADS; + startMultipleThreadsRun = (StartMultipleThreadsRunModel) subNodeRun; + } else { + throw new RuntimeException("Didn't recognize " + subNodeRun.getClass()); + } + + subNodeRun.nodeRun = this; + } + + /** + * Returns the ThreadRunModel representing the ThreadRun that the NodeRun for this NodeRunModel + * is a part of. + * + * Requires a ProcessorExecutionContext; meaning that this should only be called from within the + * CommandProcessor execution context. + * @return the ThreadRunModel for the ThreadRun that this NodeRunModel's NodeRun belongs to. + */ + public ThreadRunModel getThreadRun() { + if (threadRunDoNotUseMe == null) { + ProcessorExecutionContext processorContext = + executionContext.castOnSupport(ProcessorExecutionContext.class); + WfRunModel wfRunModel = processorContext.getableManager().get(id.getWfRunId()); + threadRunDoNotUseMe = wfRunModel.getThreadRun(id.getThreadRunNumber()); + } + return threadRunDoNotUseMe; + } + + /** + * Returns the Id of the ThreadRun that this NodeRunModel's NodeRun belongs to. + * @return the ID of the ThreadRun that this NodeRunModel's NodeRun belongs to. + */ + public int getThreadRunNumber() { + return id.getThreadRunNumber(); + } + + /** + * Called on initialization/building of a NodeRunModel. + * @param threadRunModel is the ThreadRunModel. + */ + public void setThreadRun(ThreadRunModel threadRunModel) { + threadRunDoNotUseMe = threadRunModel; + } + + /** + * A NodeRun in LittleHorse can have zero or more Failures. For example, a NodeRun can + * have one Failure that is + * @return + */ + public FailureModel getLatestFailure() { + if (failures.size() == 0) return null; + return failures.get(failures.size() - 1); + } + + /** + * Returns whether the NodeRun is making progress; i.e. it's starting/running/halting. + * @return if the NodeRun is in progress. + */ public boolean isInProgress() { switch (status) { case STARTING: @@ -342,86 +374,149 @@ public boolean isInProgress() { return false; } - public boolean isCompletedOrRecoveredFromFailure() { - if (status == LHStatus.COMPLETED) { - return true; - } - - if (status == LHStatus.ERROR || status == LHStatus.EXCEPTION) { - return failures.stream().allMatch(failure -> failure.isProperlyHandled()); - } - - return false; - } - + /* + * Returns the Node from the ThreadSpec that this NodeRun is running. + */ public NodeModel getNode() { return getThreadRun().getThreadSpecModel().nodes.get(nodeName); } + /** + * Returns the type of the Node. + * @return the type of the Node. + */ public NodeCase getNodeType() { return getNode().type; } - /* - * Returns whether it's currently safe to start an interrupt thread on this - * NodeRun. The default answer is, "Is the node currently Running? If so, then - * nope, else yes". However, + /** + * Checks if the processing performed by this NodeRunModel is completed. If so, then the ThreadRunModel + * has permission to advance the ThreadRun past this NodeRunModel's NodeRun. Otherwise, the ThreadRunModel + * must continue waiting at this NodeRunModel. + * + * This method may mutate the state of the NodeRun. + * @return */ - public boolean canBeInterrupted() { - return getSubNodeRun().canBeInterrupted(); + public boolean checkIfProcessingCompleted() throws NodeFailureException { + return getSubNodeRun().checkIfProcessingCompleted(); } /** - * Tries to advance the NodeRun, and returns true if the status of something - * changes. + * In LittleHorse, a NodeRun may return an output. For example, a TASK NodeRun's output is the VariableValue + * returned by the Task Method invoked during the TaskRun. An EXTERNAL_EVENT NodeRun's output is the content + * of the ExternalEvent. + * + * Not all NodeRun types return an output though; for example, WAIT_FOR_THREADS *currently* does NOT return + * an output. + * @precondition the NodeRUnModel should already be completed or recovered from failure. + * @return the output from this NodeRunModel's NodeRun, if such output exists. */ - public boolean advanceIfPossible(Date time) { - if (isCompletedOrRecoveredFromFailure()) { - getThreadRun().advanceFrom(getNode()); - return true; - } else { - return getSubNodeRun().advanceIfPossible(time); + public Optional getOutput() { + if (status != LHStatus.COMPLETED) { + throw new IllegalStateException("Cannot get output from a non-completed NodeRun"); } - } - - public void complete(VariableValueModel output, Date time) { - endTime = time; - status = LHStatus.COMPLETED; - getThreadRun().completeCurrentNode(output, time); - } - - public void fail(FailureModel failure, Date time) { - this.failures.add(failure); - endTime = time; - status = failure.getStatus(); - errorMessage = failure.message; - getThreadRun().fail(failure, time); - } - - public void failWithoutGrace(FailureModel failure, Date time) { - this.failures.add(failure); - endTime = time; - status = failure.getStatus(); - errorMessage = failure.message; - getThreadRun().failWithoutGrace(failure, time); - } - public void maybeUnHalt() { - setStatus(LHStatus.RUNNING); + return getSubNodeRun().getOutput(); } - public void maybeHalt() { + /** + * Halts this NodeRun if possible; otherwise starts the halting process and sets the status + * to HALTING. Returns true if the NodeRun is successfully HALTED. + * @return true if the NodeRun is successfully HALTED; else false. + */ + public boolean maybeHalt() { if (!isInProgress()) { - log.trace("no need to halt; nodeRun not running"); - return; + // If the NodeRun is already completed, failed, or halted, then we're done (: + return true; } - if (!canBeInterrupted()) { + + if (getSubNodeRun().maybeHalt()) { + status = LHStatus.HALTED; + return true; + } else { status = LHStatus.HALTING; - log.trace("NodeRun can't be interrupted now, moving to HALTING not HALTED"); - return; + return false; } + } - status = LHStatus.HALTED; - getSubNodeRun().halt(); + // /** + // * Notes that the NodeRun is completed and at what time, and sets status appropriately. + // * Called by the ThreadRunModel. Does not affect the processing + // */ + // public void completeOrFail(Date time) { + // this.endTime = time; + // if (getLatestFailure() != null) { + // this.status = getLatestFailure().getStatus(); + // } else { + // this.status = LHStatus.COMPLETED; + // } + // } + + /** + * Returns the WfSpecModel for the WfSpec that this NodeRunModel's NodeRun belongs to. Note + * that in the case of a WfSpec Version Migration, this might be different than the return + * value of getWfRun().getWfSpec(). For example, if we call nodeRun.getWfSpec() on an old + * NodeRun after a WfSpec Version Migration has already occurred, the version of the WfSpec + * returned could be older than the version of the WfSpec returned by wfRun.getWfSpec(). + * + * Can only be called in the CommandProcessor execution context. + * @return the WfSpecModel for the WfSpec of this NodeRunModel's NodeRun + */ + public WfSpecModel getWfSpec() { + ProcessorExecutionContext ctx = executionContext.castOnSupport(ProcessorExecutionContext.class); + return ctx.service().getWfSpec(wfSpecId); + } + + /** + * Evaluates the outgoing edge, maybe mutates variables, and returns the next Node that the ThreadRun + * should go to. + * + * If the NodeRun had a failure, then VariableMutations do NOT happen (this is part of our public API + * behavior. See comments in issue #656 on GitHub.). + * + * If the evaluation of outgoing edges fails, or the variable mutations fail, then this method adds a + * Failure to the NodeRun (responsibility of the NodeRunModel) and also throws a NodeFailureException (so + * that the ThreadRunModel can react appropriately). + * + * For several good reasons, Outgoing Edges are a property of the Node proto, not the ThreadSpec proto. + * Furthermore, Variable Mutations are a property of the Outgoing Edges (and by extension, also the Node). + * This means that the following are all responsibilities of the NodeRun: + * - Choosing the next Node to go to (evaluating outgoing edges) + * - Telling the ThreadRunModel to mutate the variables. + * + * If either of those things fail, then the resulting `Failure` is a property of the NodeRun itself. + * + * EXIT Node's do NOT have OutgoingEdges, so this method should not be called on the NodeRunModel for + * an EXIT NodeRun. + * @precondition the NodeRun succeeded OR its failures were all properly handled. + * @postcondition if the NodeRun was successful, variable mutations on the activated edge are executed. + * @return the Node that the ThreadSpecModel should advance to next. + * @throws NodeFailureException if evaluation of outgoing edges fails or if variable mutations fail. + */ + public NodeModel evaluateOutgoingEdgesAndMaybeMutateVariables() throws NodeFailureException { + throw new NotImplementedException(); } + + // @Deprecated(forRemoval = true) + // public void fail(FailureModel failure, Date time) { + // this.failures.add(failure); + // endTime = time; + // status = failure.getStatus(); + // errorMessage = failure.message; + // getThreadRun().fail(failure, time); + // } + + // @Deprecated(forRemoval = true) + // public void failWithoutGrace(FailureModel failure, Date time) { + // this.failures.add(failure); + // endTime = time; + // status = failure.getStatus(); + // errorMessage = failure.message; + // getThreadRun().failWithoutGrace(failure, time); + // } + + // @Deprecated(forRemoval = true) + // public void maybeUnHalt() { + // setStatus(LHStatus.RUNNING); + // } } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/SubNodeRun.java b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/SubNodeRun.java index 9669b5e937..4ce5714233 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/SubNodeRun.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/SubNodeRun.java @@ -2,10 +2,14 @@ import com.google.protobuf.Message; import io.littlehorse.common.LHSerializable; +import io.littlehorse.common.model.getable.core.noderun.NodeFailureException; import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; +import io.littlehorse.common.model.getable.core.variable.VariableValueModel; import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel; import io.littlehorse.common.model.getable.global.wfspec.node.NodeModel; import java.util.Date; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; @@ -15,39 +19,83 @@ public abstract class SubNodeRun extends LHSerializable { public NodeRunModel nodeRun; - /* - * Tries to move forward. Returns true if the status of something in the noderun changed. That means - * the WfRunModel#advance() method will try to call advance again on everything. + /** + * Optionally erform any initial setup/scheduling of stuff when the ThreadRun first arrives at this + * NodeRun. This is only to be called once. + * @param time the time at which the NodeRun was arrived at. */ - public abstract boolean advanceIfPossible(Date time); - public abstract void arrive(Date time); - /* - * The default is that we can't interrupt a node that's making active progress, - * the clearest example being that when a Task Worker is working on a TaskRun - * we have to wait for the response to come back before it's safe to initialize - * an interrupt thread. + /** + * Returns the output of the NodeRun. Can only be called after completion. Requires the CommandProcessor + * execution context. + * @return the output of the SubNodeRun if any output exists. + */ + public abstract Optional getOutput(); + + /** + * Checks if the processing of this SubNodeRun has been completed, and returns true. This method can + * alter the state of the SubNodeRunModel and its dependents but should NOT alter the state of the + * parent NodeRunModel. + * + * Requires the Command Processor execution context. + * @return true if the processing for this SubNodeRun has been completed; false otherwise. + * @throws NodeFailureException if the SubNodeRun throws a failure. */ - public boolean canBeInterrupted() { + public abstract boolean checkIfProcessingCompleted() throws NodeFailureException; + + /** + * Maybe halt the SubNodeRun. This is a default implementation which can be overriden by implementations + * of the SubNodeRun class. The default behavior is to only halt if the NodeRun is not in progress. + * + * Default behavior does not modify any state; however, SubNodeRun implementations are free to override + * this behavior. For example, in the future we may want a HALTED UserTaskRun to change the status of + * the UserTaskRun from `ASSIGNED` to `ASSIGNED_BUT_HALTED` or something like that... + * @return true if the SubNodeRun was successfully halted. + */ + public boolean maybeHalt() { return !nodeRun.isInProgress(); } + /** + * Can be overriden by SubNodeRun implementations to "un-halt" a SubNodeRun. Called by NodeRunModel. + */ + public void unHalt() { + // Nothing to do in default case. + } + + /** + * Called during initialization. Sets the parent NodeRun. + * @param nodeRunModel is the NodeRunModel for this subNodeRun. + */ public void setNodeRun(NodeRunModel nodeRunModel) { this.nodeRun = nodeRunModel; } + /** + * Returns the WfSpec that this NodeRunModel's NodeRun belongs to. NOTE: during the case + * of WfSpec Version Migration, we need to check the NodeRun's actual WfSpecId, rather than + * blindly returning the WfRun's WfSpec, because different NodeRun's in a long-running WfRun + * can belong to different WfSpec's after a migration has occurred. + * @return the WfSpecModel for the WfSpec that this NodeRun belongs to. + */ public WfSpecModel getWfSpec() { - return getWfRun().getWfSpec(); + return nodeRun.getWfSpec(); } + /** + * Returns the WfRunModel for the WfRun that this SubNodeRun belongs to. + * @return the WfRunModel for this SubNodeRun + */ public WfRunModel getWfRun() { return nodeRun.getThreadRun().getWfRun(); } + /** + * Returns the NodeModel for the Node in the WfSpec that this NodeRun represents. + * @return the Node from the WfSpec. + */ public NodeModel getNode() { return nodeRun.getNode(); } - - public void halt() {} } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/ThreadRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/ThreadRunModel.java index cf4e8aa487..a740125788 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/ThreadRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/ThreadRunModel.java @@ -458,7 +458,7 @@ public boolean updateStatus() { return false; } } else if (status == LHStatus.HALTING) { - if (getCurrentNodeRun().canBeInterrupted()) { + if (getCurrentNodeRun().canBeHalted()) { setStatus(LHStatus.HALTED); return true; } else { @@ -477,7 +477,7 @@ public void setStatus(LHStatus status) { */ public boolean canBeInterrupted() { - if (getCurrentNodeRun().canBeInterrupted()) return true; + if (getCurrentNodeRun().canBeHalted()) return true; for (int childId : childThreadIds) { if (wfRun.getThreadRun(childId).isRunning()) { @@ -514,7 +514,7 @@ public boolean advance(Date eventTime) { } else if (status == LHStatus.HALTING) { log.trace("Tried to advance HALTING thread, checking if halted yet."); - if (currentNodeRunModel.canBeInterrupted()) { + if (currentNodeRunModel.canBeHalted()) { setStatus(LHStatus.HALTED); log.trace("Moving thread to HALTED"); return true; @@ -542,6 +542,7 @@ public boolean advance(Date eventTime) { * @param failure is the Failure that was raised. * @param time when the Failure occurred. */ + @Deprecated(forRemoval = true) public void fail(FailureModel failure, Date time) { // First determine if the node that was failed has a relevant exception // handler attached. @@ -686,67 +687,45 @@ public void complete(Date time) { wfRun.handleThreadStatus(number, new Date(), status); } - /* - * Callback used by the NodeRunModel when the NodeRun is completed, to notify this ThreadRunModel - * that it's time to advance the ThreadRun. - * - * This callback makes the ThreadRun advance to the next Node in the WfSpec (thus creating a new - * NodeRun) and it tells the WfRun to try to advance everything. - */ - public void completeCurrentNode(VariableValueModel output, Date eventTime) { - NodeRunModel crn = getCurrentNodeRun(); - crn.status = LHStatus.COMPLETED; - - if (status == LHStatus.RUNNING) { - // If we got here, then we're good. - advanceFrom(getCurrentNode(), output); - } - getWfRun().advance(eventTime); - } - - public void advanceFrom(NodeModel curNode) { - advanceFrom(curNode, null); - } - - public void advanceFrom(NodeModel curNode, VariableValueModel output) { - if (curNode.getSubNode().getClass().equals(ExitNodeModel.class)) { - return; - } - NodeModel nextNode = null; - for (EdgeModel e : curNode.outgoingEdges) { - try { - if (evaluateEdge(e)) { - nextNode = e.getSinkNode(); - if (output != null) { - mutateVariables(output, e.getVariableMutations()); - } - break; - } - } catch (LHVarSubError exn) { - log.debug("Failing threadrun due to VarSubError {} {}", wfRun.getId(), currentNodePosition, exn); - getCurrentNodeRun() - .fail( - new FailureModel( - "Failed evaluating outgoing edge: " + exn.getMessage(), - LHConstants.VAR_MUTATION_ERROR), - new Date()); - return; - } - } - if (nextNode == null) { - // TODO: Later versions should validate wfSpec's so that this is not possible; however, it may - // always require some runtime checks. - getCurrentNodeRun() - .fail( - new FailureModel( - "WfSpec was invalid. There were no activated outgoing edges" - + " from a non-exit node.", - LHConstants.INTERNAL_ERROR), - new Date()); - } else { - activateNode(nextNode); - } - } + // public void advanceFrom(NodeModel curNode) { + // if (curNode.getSubNode().getClass().equals(ExitNodeModel.class)) { + // return; + // } + // NodeModel nextNode = null; + // for (EdgeModel e : curNode.outgoingEdges) { + // try { + // if (evaluateEdge(e)) { + // nextNode = e.getSinkNode(); + // if (output != null) { + // mutateVariables(output, e.getVariableMutations()); + // } + // break; + // } + // } catch (LHVarSubError exn) { + // log.debug("Failing threadrun due to VarSubError {} {}", wfRun.getId(), currentNodePosition, exn); + // getCurrentNodeRun() + // .fail( + // new FailureModel( + // "Failed evaluating outgoing edge: " + exn.getMessage(), + // LHConstants.VAR_MUTATION_ERROR), + // new Date()); + // return; + // } + // } + // if (nextNode == null) { + // // TODO: Later versions should validate wfSpec's so that this is not possible; however, it may + // // always require some runtime checks. + // getCurrentNodeRun() + // .fail( + // new FailureModel( + // "WfSpec was invalid. There were no activated outgoing edges" + // + " from a non-exit node.", + // LHConstants.INTERNAL_ERROR), + // new Date()); + // } else { + // activateNode(nextNode); + // } + // } public void activateNode(NodeModel node) { Date arrivalTime = new Date(); diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/WfRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/WfRunModel.java index 0268cad2d8..471262af08 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/WfRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/WfRunModel.java @@ -50,6 +50,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; + +import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -69,7 +71,11 @@ public class WfRunModel extends CoreGetable { public Date startTime; public Date endTime; - private List threadRuns = new ArrayList<>(); + + // Using this directly is dangerous; better to use `WfRunModel#getThreadRun()`. + @Getter(AccessLevel.NONE) + private List threadRunsDoNotUseMe = new ArrayList<>(); + public List pendingInterrupts = new ArrayList<>(); public List pendingFailures = new ArrayList<>(); private ExecutionContext executionContext; @@ -145,7 +151,7 @@ public void setWfSpec(WfSpecModel spec) { } public ThreadRunModel getThreadRun(int threadRunNumber) { - return threadRuns.stream() + return threadRunsDoNotUseMe.stream() .filter(thread -> thread.getNumber() == threadRunNumber) .findFirst() .orElse(null); @@ -166,7 +172,7 @@ public void initFrom(Message p, ExecutionContext context) { for (ThreadRun trpb : proto.getThreadRunsList()) { ThreadRunModel thr = ThreadRunModel.fromProto(trpb, context); thr.wfRun = this; - threadRuns.add(thr); + threadRunsDoNotUseMe.add(thr); } for (PendingInterrupt pipb : proto.getPendingInterruptsList()) { pendingInterrupts.add(PendingInterruptModel.fromProto(pipb, context)); @@ -208,7 +214,7 @@ public WfRun.Builder toProto() { out.setEndTime(LHUtil.fromDate(endTime)); } - for (ThreadRunModel threadRunModel : threadRuns) { + for (ThreadRunModel threadRunModel : threadRunsDoNotUseMe) { out.addThreadRuns(threadRunModel.toProto()); } @@ -264,7 +270,7 @@ public ThreadRunModel startThread( thread.wfRun = this; thread.type = type; - threadRuns.add(thread); + threadRunsDoNotUseMe.add(thread); thread.validateVariablesAndStart(variables); return thread; @@ -387,37 +393,37 @@ private boolean startXnHandlers(Date time) { public void advance(Date time) { boolean statusChanged = false; // Update status and then advance - for (ThreadRunModel thread : threadRuns) { + for (ThreadRunModel thread : threadRunsDoNotUseMe) { statusChanged = thread.updateStatus() || statusChanged; } boolean xnHandlersStarted = startXnHandlersAndInterrupts(time); statusChanged = xnHandlersStarted || statusChanged; - for (int i = threadRuns.size() - 1; i >= 0; i--) { - ThreadRunModel thread = threadRuns.get(i); + for (int i = threadRunsDoNotUseMe.size() - 1; i >= 0; i--) { + ThreadRunModel thread = threadRunsDoNotUseMe.get(i); statusChanged = thread.advance(time) || statusChanged; } - for (int i = threadRuns.size() - 1; i >= 0; i--) { - ThreadRunModel thread = threadRuns.get(i); + for (int i = threadRunsDoNotUseMe.size() - 1; i >= 0; i--) { + ThreadRunModel thread = threadRunsDoNotUseMe.get(i); statusChanged = thread.updateStatus() || statusChanged; } while (statusChanged) { startXnHandlersAndInterrupts(time); statusChanged = false; - for (int i = threadRuns.size() - 1; i >= 0; i--) { - ThreadRunModel thread = threadRuns.get(i); + for (int i = threadRunsDoNotUseMe.size() - 1; i >= 0; i--) { + ThreadRunModel thread = threadRunsDoNotUseMe.get(i); statusChanged = thread.advance(time) || statusChanged; } - for (int i = threadRuns.size() - 1; i >= 0; i--) { - ThreadRunModel thread = threadRuns.get(i); + for (int i = threadRunsDoNotUseMe.size() - 1; i >= 0; i--) { + ThreadRunModel thread = threadRunsDoNotUseMe.get(i); statusChanged = thread.updateStatus() || statusChanged; } } // Now we remove any old threadruns that we don't want anymore. - for (int i = threadRuns.size() - 1; i >= 0; i--) { - ThreadRunModel thread = threadRuns.get(i); + for (int i = threadRunsDoNotUseMe.size() - 1; i >= 0; i--) { + ThreadRunModel thread = threadRunsDoNotUseMe.get(i); ThreadSpecModel spec = thread.getThreadSpecModel(); if (spec.getRetentionPolicy() != null) { if (spec.getRetentionPolicy().shouldGcThreadRun(thread)) { @@ -438,7 +444,7 @@ private void removeThreadRun(ThreadRunModel thread) { parent.childThreadIds.removeIf(childId -> childId.equals(thread.getNumber())); } - threadRuns.removeIf(candidate -> candidate.getNumber() == thread.getNumber()); + threadRunsDoNotUseMe.removeIf(candidate -> candidate.getNumber() == thread.getNumber()); } public void processExtEvtTimeout(ExternalEventTimeoutModel timeout) { @@ -455,14 +461,14 @@ public void failDueToWfSpecDeletion() { public void processExternalEvent(ExternalEventModel event) { // TODO LH-303: maybe if the event has a `threadRunNumber` and // `nodeRunPosition` set, it should do some validation here? - for (ThreadRunModel thread : threadRuns) { + for (ThreadRunModel thread : threadRunsDoNotUseMe) { thread.processExternalEvent(event); } advance(event.getCreatedAt()); } public void processStopRequest(StopWfRunRequestModel req) { - if (req.threadRunNumber >= threadRuns.size() || req.threadRunNumber < 0) { + if (req.threadRunNumber >= threadRunsDoNotUseMe.size() || req.threadRunNumber < 0) { throw new LHApiException(Status.INVALID_ARGUMENT, "Tried to stop a non-existent thread id."); } @@ -485,7 +491,7 @@ public void stop(ThreadRunModel thread, ThreadHaltReasonModel threadHaltReason) } public void processResumeRequest(ResumeWfRunRequestModel req) { - if (req.threadRunNumber >= threadRuns.size() || req.threadRunNumber < 0) { + if (req.threadRunNumber >= threadRunsDoNotUseMe.size() || req.threadRunNumber < 0) { throw new LHApiException(Status.INVALID_ARGUMENT, "Tried to resume a non-existent thread id."); } @@ -503,7 +509,7 @@ public void processResumeRequest(ResumeWfRunRequestModel req) { public void processSleepNodeMatured(SleepNodeMaturedModel req, Date time) throws LHValidationError { int threadRunNumber = req.getNodeRunId().getThreadRunNumber(); int nodeRunPosition = req.getNodeRunId().getPosition(); - if (threadRunNumber >= threadRuns.size() || threadRunNumber < 0) { + if (threadRunNumber >= threadRunsDoNotUseMe.size() || threadRunNumber < 0) { throw new LHValidationError(null, "Reference to nonexistent thread."); } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/ExternalEventRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/ExternalEventRunModel.java index 10dbadc256..ce5543fd03 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/ExternalEventRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/ExternalEventRunModel.java @@ -122,7 +122,7 @@ public boolean advanceIfPossible(Date time) { * happen. */ @Override - public boolean canBeInterrupted() { + public boolean canBeHalted() { return true; } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/SleepNodeRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/SleepNodeRunModel.java index ef7f45f605..dbfc2e2965 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/SleepNodeRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/SleepNodeRunModel.java @@ -89,7 +89,7 @@ public void processSleepNodeMatured(SleepNodeMaturedModel evt) { } @Override - public boolean canBeInterrupted() { + public boolean canBeHalted() { return true; } } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/WaitForThreadsRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/WaitForThreadsRunModel.java index 7c1d526694..890b533baf 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/WaitForThreadsRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/wfrun/subnoderun/WaitForThreadsRunModel.java @@ -74,7 +74,7 @@ public static WaitForThreadsRunModel fromProto(WaitForThreadsRun p, ExecutionCon } @Override - public boolean canBeInterrupted() { + public boolean canBeHalted() { return true; }