Skip to content

Commit

Permalink
add wait state to cron workflow (#434)
Browse files Browse the repository at this point in the history
* add wait state to cron workflow

* add possibility to keep waiting

* move unfinished children check code to state execution

* move unfinished children check to workflow instance service, add tests and fix a bug in the query
  • Loading branch information
efonsell authored Feb 19, 2021
1 parent 22c9171 commit 11c4c97
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Control retrying and logging of an exception thrown by a state method via `WorkflowSettings` (replaces deprecated `WorkflowState.isRetryAllowed(...)`).
- Control logging and sleeping after exceptions in `WorkflowDispatcher`.
- Control logging and sleeping after a failure to save workflow instance state.
- Support in `CronWorkflow` to wait for created child workflow instances to finish before scheduling the next work.

**Details**
- `nflow-engine`
Expand All @@ -30,6 +31,8 @@
- Control which log level is used to log the exception.
- Control whether the stack trace of the exception is logged or not.
- Control how long the `WorkflowStateProcessor` should sleep before retrying.
- Support in `CronWorkflow` to wait for child workflow instances created in `doWork` state method to finish before scheduling the next work. Return `NextAction.moveToStateAfter(waitForWorkToFinish, ...)` with some fail-safe waiting time instead of `NextAction.moveToState(schedule, ...)` to avoid immediate re-scheduling. When child workflows finish, they will wake up the parent workflow automatically, if it is still in the waiting state. Default implementation will check if any child workflows are still running, and keep waiting until they are all finished. Override `CronWorkflow.waitForWorkToFinishImpl` for custom logic.
- Add `hasUnfinishedChildWorkflows` helper in `StateExecution` and `WorkflowInstanceService` to check if the workflow instance has any child workflow instances with any other status than `WorkflowInstanceStatus.finished`.
- `nflow-rest-api-common`, `nflow-rest-api-jax-rs`, `nflow-rest-api-spring-web`
- `UpdateWorkflowInstanceRequest.businessKey` field was added to support updating workflow instance business key via REST API.
- Added support for new query parameters `stateVariableKey` and `stateVariableValue` to `GET /v1/workflow-instance` to limit search query by state variable name and key. Only the latest value of the state variable of the workflow instance is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,8 @@ public void handleFailure(AbstractWorkflowDefinition<?> definition, String failu
}
}

@Override
public boolean hasUnfinishedChildWorkflows() {
return workflowInstanceService.hasUnfinishedChildWorkflows(instance.id);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.nflow.engine.service;

import static java.util.Collections.emptySet;
import static java.util.EnumSet.complementOf;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.StringUtils.isEmpty;

import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -24,6 +26,7 @@
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType;

Expand All @@ -34,6 +37,8 @@
public class WorkflowInstanceService {

private static final Logger logger = getLogger(WorkflowInstanceService.class);
private static final WorkflowInstanceStatus[] UNFINISHED_STATUSES = complementOf(EnumSet.of(WorkflowInstanceStatus.finished))
.toArray(new WorkflowInstanceStatus[0]);

private final WorkflowDefinitionService workflowDefinitionService;
private final WorkflowInstanceDao workflowInstanceDao;
Expand Down Expand Up @@ -181,4 +186,17 @@ private AbstractWorkflowDefinition<?> getDefinition(Long workflowInstanceId) {
return workflowDefinitionService.getWorkflowDefinition(workflowInstanceDao.getWorkflowInstanceType(workflowInstanceId));
}

/**
* Return true if this workflow instance has unfinished child workflow instances.
*
* @param workflowInstanceId
* The parent workflow instance id.
*
* @return True if the workflow instance has unfinished child workflow instances, false otherwise.
*/
public boolean hasUnfinishedChildWorkflows(long workflowInstanceId) {
QueryWorkflowInstances unfinishedChildren = new QueryWorkflowInstances.Builder().addStatuses(UNFINISHED_STATUSES)
.setParentWorkflowId(workflowInstanceId).build();
return !listWorkflowInstances(unfinishedChildren).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import static io.nflow.engine.workflow.curated.CronWorkflow.State.failed;
import static io.nflow.engine.workflow.curated.CronWorkflow.State.handleFailure;
import static io.nflow.engine.workflow.curated.CronWorkflow.State.schedule;
import static io.nflow.engine.workflow.curated.CronWorkflow.State.waitForWorkToFinish;
import static io.nflow.engine.workflow.definition.NextAction.moveToState;
import static io.nflow.engine.workflow.definition.NextAction.moveToStateAfter;
import static io.nflow.engine.workflow.definition.NextAction.retryAfter;
import static io.nflow.engine.workflow.definition.WorkflowSettings.Builder.oncePerDay;
import static io.nflow.engine.workflow.definition.WorkflowStateType.manual;
import static io.nflow.engine.workflow.definition.WorkflowStateType.normal;
import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static org.joda.time.Instant.now;
import static io.nflow.engine.workflow.definition.WorkflowStateType.wait;
import static org.joda.time.DateTime.now;
import static org.joda.time.Period.days;
import static org.slf4j.LoggerFactory.getLogger;

Expand Down Expand Up @@ -44,6 +47,7 @@ public abstract class CronWorkflow extends WorkflowDefinition<State> {
public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
schedule(start, "Schedule work to be done according to the cron state variable"), //
doWork(normal, "Execute the actual work"), //
waitForWorkToFinish(wait, "Wait for work to finish"), //
handleFailure(normal, "Handle failure and decide if workflow should be re-scheduled or stopped"), //
failed(manual, "Processing failed, waiting for manual actions");

Expand Down Expand Up @@ -86,6 +90,8 @@ protected CronWorkflow(String type, WorkflowSettings settings) {
super(type, schedule, handleFailure, settings);
permit(schedule, doWork);
permit(doWork, schedule);
permit(doWork, waitForWorkToFinish);
permit(waitForWorkToFinish, schedule);
permit(handleFailure, schedule, failed);
}

Expand All @@ -97,7 +103,10 @@ protected CronWorkflow(String type, WorkflowSettings settings) {
* <pre>
* public NextAction doWork(StateExecution execution) {
* // do the work here
* return NextAction.moveToState(schedule, "Work done");
* if (rescheduleImmediately) {
* return NextAction.moveToState(schedule, "Work done");
* }
* return NextAction.moveToStateAfter(waitForWorkToFinish, DateTime.now().plusHours(hoursToWait), "Waiting for work to finish");
* }
* </pre>
*
Expand Down Expand Up @@ -159,4 +168,36 @@ protected boolean handleFailureImpl(StateExecution execution) {
logger.error("Cron workflow {} / {} work failed", getType(), execution.getWorkflowInstanceId());
return true;
}

/**
* Calls {@link #waitForWorkToFinishImpl} to check if this instance should wait for work to finish or move to schedule state.
*
* @param execution
* The workflow execution context.
* @return Action to retry later or move to schedule state.
*/
public NextAction waitForWorkToFinish(StateExecution execution) {
DateTime waitUntil = waitForWorkToFinishImpl(execution);
if (waitUntil == null) {
return moveToState(schedule, "Work finished, rescheduling");
}
return retryAfter(waitUntil, "Waiting for work to finish");
}

/**
* Returns null to move to schedule state immediately if there are no incompleted child workflows, or current time plus 1 hour
* to check again later. Override for custom logic.
*
* @param execution
* The workflow execution context.
* @return Time when check should be retried. Null to go to schedule state immediately.
*/
protected DateTime waitForWorkToFinishImpl(StateExecution execution) {
if (execution.hasUnfinishedChildWorkflows()) {
logger.info("Unfinished child workflow found, waiting before scheduling next work.");
return now().plusHours(1);
}
logger.info("No unfinished child workflows found, scheduling next work.");
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,10 @@ public interface StateExecution {
*/
Optional<Long> getParentId();

/**
* Return true if this workflow instance has unfinished child workflow instances.
*
* @return True if unfinished child workflow instances are found, false otherwise.
*/
boolean hasUnfinishedChildWorkflows();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress;
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.externalChange;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -12,7 +13,9 @@
import static org.joda.time.DateTimeUtils.setCurrentMillisFixed;
import static org.joda.time.DateTimeUtils.setCurrentMillisSystem;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
Expand All @@ -23,6 +26,7 @@
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,6 +46,7 @@
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType;

Expand Down Expand Up @@ -157,7 +162,7 @@ public void wakeUpWorkflowInstance() {

@Test
public void listWorkflowInstances() {
List<WorkflowInstance> result = asList(constructWorkflowInstanceBuilder().build());
List<WorkflowInstance> result = asList(constructWorkflowInstanceBuilder().build());
QueryWorkflowInstances query = mock(QueryWorkflowInstances.class);
when(workflowInstanceDao.queryWorkflowInstances(query)).thenReturn(result);
assertEquals(result, service.listWorkflowInstances(query));
Expand Down Expand Up @@ -207,4 +212,30 @@ public void resetSignalDoesNotQueryWorkflowDefinition() {
verify(workflowDefinitions, never()).getWorkflowDefinition(anyString());
}

@Test
public void hasUnfinishedChildWorkflowsReturnsFalseWhenInstanceHasNoUnfinishedChildren() {
List<WorkflowInstance> result = emptyList();
when(workflowInstanceDao.queryWorkflowInstances(queryCapture.capture())).thenReturn(result);

assertFalse(service.hasUnfinishedChildWorkflows(42));
QueryWorkflowInstances query = queryCapture.getValue();
assertThat(query.parentWorkflowId, is(42L));
EnumSet.complementOf(EnumSet.of(WorkflowInstanceStatus.finished)).stream()
.forEach(status -> assertTrue(query.statuses.contains(status)));
assertFalse(query.statuses.contains(WorkflowInstanceStatus.finished));
}

@Test
public void hasUnfinishedChildWorkflowsReturnsTrueWhenInstanceHasUnfinishedChild() {
WorkflowInstance unfinished = constructWorkflowInstanceBuilder().build();
List<WorkflowInstance> result = asList(unfinished);
when(workflowInstanceDao.queryWorkflowInstances(queryCapture.capture())).thenReturn(result);

assertTrue(service.hasUnfinishedChildWorkflows(42));
QueryWorkflowInstances query = queryCapture.getValue();
assertThat(query.parentWorkflowId, is(42L));
EnumSet.complementOf(EnumSet.of(WorkflowInstanceStatus.finished)).stream()
.forEach(status -> assertTrue(query.statuses.contains(status)));
assertFalse(query.statuses.contains(WorkflowInstanceStatus.finished));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.nflow.tests.demo.workflow;

import static io.nflow.engine.workflow.curated.CronWorkflow.State.schedule;
import static io.nflow.engine.workflow.definition.NextAction.moveToState;
import static io.nflow.engine.workflow.curated.CronWorkflow.State.waitForWorkToFinish;
import static io.nflow.engine.workflow.definition.NextAction.moveToStateAfter;
import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE;
import static org.joda.time.DateTime.now;
import static org.joda.time.Period.hours;

import org.joda.time.DateTime;
Expand All @@ -11,6 +13,7 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.definition.WorkflowSettings.Builder;
import io.nflow.engine.workflow.instance.WorkflowInstance;

@Component
public class TestCronWorkflow extends CronWorkflow {
Expand All @@ -20,8 +23,10 @@ protected TestCronWorkflow() {
super(TYPE, new Builder().setHistoryDeletableAfter(hours(1)).setDeleteHistoryCondition(() -> true).build());
}

public NextAction doWork(@SuppressWarnings("unused") StateExecution execution) {
return moveToState(schedule, "ok");
public NextAction doWork(StateExecution execution) {
WorkflowInstance childWorkflow = new WorkflowInstance.Builder().setType(DEMO_WORKFLOW_TYPE).build();
execution.addChildWorkflows(childWorkflow);
return moveToStateAfter(waitForWorkToFinish, now().plusMinutes(1), "Work delegated to child workflow");
}

@Override
Expand All @@ -33,4 +38,9 @@ protected DateTime getNextActivationTime(StateExecution execution, String cron)
protected boolean handleFailureImpl(StateExecution execution) {
return super.handleFailureImpl(execution);
}

@Override
protected DateTime waitForWorkToFinishImpl(StateExecution execution) {
return super.waitForWorkToFinishImpl(execution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@ public void startCronWorkflow() {

@Test
@Order(2)
public void letItRunFor5Seconds() throws InterruptedException {
SECONDS.sleep(5);
public void letItRunForTenSeconds() throws InterruptedException {
SECONDS.sleep(10);
}

@Test
@Order(3)
public void verifyItHasRunPeriodically() {
List<Action> actions = getWorkflowInstance(resp.id).actions;
long scheduleActions = actions.stream().filter(a -> CronWorkflow.State.schedule.name().equals(a.state)).count();
long waitActions = actions.stream().filter(a -> CronWorkflow.State.waitForWorkToFinish.name().equals(a.state)).count();
long doWorkActions = actions.stream().filter(a -> CronWorkflow.State.doWork.name().equals(a.state)).count();
assertThat(scheduleActions, is(greaterThanOrEqualTo(1L)));
assertThat(waitActions, is(greaterThanOrEqualTo(1L)));
assertThat(doWorkActions, is(greaterThanOrEqualTo(1L)));
}

Expand Down

0 comments on commit 11c4c97

Please sign in to comment.