diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c49fe35d..87f1b5893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,16 @@ ## 7.2.1-SNAPSHOT (future release) **Highlights** +- `nflow-engine` + - Optimize fetching workflow instance with large history with many modified state variables. - `nflow-rest-api-spring-web` and `nflow-netty` - Change REST API calls to use a dedicated thread pool for all blocking database operations to avoid blocking the netty EventLoop thread. **Details** - - `nflow-engine` + - Fix SQL performance / memory issue when getting single workflow instance with action state variables, when the instance has lots of actions with lots of state variables. The old code fetched all state variables of all actions of the instance, the new code only fetches the state variables for the actions that will be returned. + - Fix instantiation of @StateVar(instantiateIfNotExists=true) Mutable - the result was incorrectly a Mutable wrapped in Mutable. - Fix potential resource leaks - Dependency updates: - spring 5.2.8 @@ -266,7 +269,7 @@ **Details** - `nflow-engine` - Add `priority` two byte integer to the `nflow_workflow` table. When the dispatcher chooses from many available scheduled workflow instances it primarily (unfairly) picks the workflow instances with the largest priority values, and for workflows with the same priority, the ones with oldest `next_activation` timestamp. Priority defaults to 0 and can also be negative. Default priority value for the new workflow instances can be set per workflow definition (`WorkflowSettings.Builder.setDefaultPriority`), and overridden per workflow instance (`WorkflowInstance.Builder.setPriority`). Requires database migration, see database update scripts for details. - - Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes. + - Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes. - Add `disableMariaDbDriver` to default MySQL JDBC URL so that in case there are both MySQL and MariaDB JDBC drivers in the classpath then MariaDB will not steal the MySQL URL. - Add support for `nflow.db.mariadb` profile. - Update database indices to match current workflow instance polling code. @@ -408,8 +411,8 @@ This release introduced issue #306 which may cause OutOfMemory errors while fetc - jetty 9.4.15.v20190215 - h2 1.4.199 - Fix workflow history cleanup to keep the actions that hold the latest values of state variables -- nFlow Explorer: Custom content to workflow definition and workflow instance pages. -- nFlow Explorer: Executors page to use standard time formatting in tooltips +- nFlow Explorer: Custom content to workflow definition and workflow instance pages. +- nFlow Explorer: Executors page to use standard time formatting in tooltips - nFlow netty: Add support for registering Spring ApplicationListeners - nFlow jetty: Replace deprecated NCSARequestLog with CustomRequestLog - Fix `WorkflowLifecycle.stop()` blocking forever if `nflow.autostart=false` and `WorkflowLifecycle.start()` not called @@ -496,7 +499,7 @@ This release introduced issue #306 which may cause OutOfMemory errors while fetc - `nflow-jetty` now serves all paths under `/nflow/*`. The new paths are as follows: - /nflow/api/v1 -> API v1 (was: /api/nflow/v1) - /nflow/api/swagger.json -> Swagger config (was: /api/swagger.json) - - /nflow/ui -> nFlow statics assets + - /nflow/ui -> nFlow statics assets - /nflow/ui/explorer -> nFlow UI (was: /explorer) - /nflow/ui/doc -> Swagger UI (was: /doc) - /nflow/metrics -> metrics and health checks (was: /metrics) @@ -644,7 +647,7 @@ This release introduced issue #306 which may cause OutOfMemory errors while fetc - fixed: workflow instance recovery functionality (broken by version 2.0.0) - fixed: Oracle database schema - nflow-rest-api: - - **_breaking change:_** Prefixed operation paths by "/nflow" (e.g. /v1/statistics -> /nflow/v1/statistics) + - **_breaking change:_** Prefixed operation paths by "/nflow" (e.g. /v1/statistics -> /nflow/v1/statistics) - Support for Jersey JAX-RS implementation - **_breaking change:_** Moved exception mappers to nflow-jetty (BadRequestExceptionMapper, CustomValidationExceptionMapper, NotFoundExceptionMapper) - Improved Swagger documentation diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index c13f31e8b..aa8d20651 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -182,11 +182,10 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { try { StringBuilder sqlb = new StringBuilder(256); sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)"); - Object[] instanceValues = new Object[] { instance.type, instance.priority, - instance.parentWorkflowId, instance.parentActionId, instance.businessKey, instance.externalId, - executorInfo.getExecutorGroup(), instance.status.name(), instance.state, - abbreviate(instance.stateText, getInstanceStateTextLength()), toTimestamp(instance.nextActivation), - instance.signal.orElse(null) }; + Object[] instanceValues = new Object[] { instance.type, instance.priority, instance.parentWorkflowId, + instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(), + instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), + toTimestamp(instance.nextActivation), instance.signal.orElse(null) }; int pos = instanceValues.length; Object[] args = Arrays.copyOf(instanceValues, pos + instance.stateVariables.size() * 2); for (Entry variable : instance.stateVariables.entrySet()) { @@ -523,9 +522,9 @@ private void fillState(final WorkflowInstance instance) { jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", - rs -> { - instance.stateVariables.put(rs.getString(1), rs.getString(2)); - }, instance.id); + rs -> { + instance.stateVariables.put(rs.getString(1), rs.getString(2)); + }, instance.id); instance.originalStateVariables.putAll(instance.stateVariables); } @@ -557,8 +556,8 @@ private List pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) private List pollNextWorkflowInstanceIdsWithTransaction(final int batchSize) { String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize); - List instances = transaction.execute(tx -> jdbc.query(sql, - (rs, rowNum) -> new OptimisticLockKey(rs.getLong("id"), sqlVariants.getTimestamp(rs, "modified")))); + List instances = transaction.execute( + tx -> jdbc.query(sql, (rs, rowNum) -> new OptimisticLockKey(rs.getLong("id"), sqlVariants.getTimestamp(rs, "modified")))); if (instances.isEmpty()) { return emptyList(); } @@ -588,8 +587,7 @@ private List updateNextWorkflowInstancesWithMultipleUpdates(List updateNextWorkflowInstancesWithBatchUpdate(List instances) { String sql = updateInstanceForExecutionQuery() + " where id = ? and modified = ? and executor_id is null"; List batchArgs = instances.stream() - .map(instance -> new Object[] { instance.id, sqlVariants.tuneTimestampForDb(instance.modified) }) - .collect(toList()); + .map(instance -> new Object[] { instance.id, sqlVariants.tuneTimestampForDb(instance.modified) }).collect(toList()); int[] updateStatuses = jdbc.batchUpdate(sql, batchArgs); List ids = new ArrayList<>(instances.size()); for (int i = 0; i < updateStatuses.length; ++i) { @@ -669,7 +667,7 @@ public Stream queryWorkflowInstancesAsStream(QueryWorkflowInst sql = sqlVariants.limit(sql, getMaxResults(query.maxResults)); Stream ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() - .map(WorkflowInstance.Builder::build); + .map(WorkflowInstance.Builder::build); if (query.includeCurrentStateVariables) { ret = ret.peek(instance -> fillState(instance)); } @@ -698,13 +696,22 @@ private long getMaxResults(Long maxResults) { return min(maxResults, workflowInstanceQueryMaxResults); } - private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) { - Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance) - : EMPTY_ACTION_STATE_MAP; - String sql = sqlVariants.limit( - "select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", - getMaxActions(maxActions)); - instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id)); + private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long requestedMaxActions) { + long maxActions = getMaxActions(requestedMaxActions); + String sql = sqlVariants + .limit("select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", maxActions); + List actionBuilders = jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants), + instance.id); + if (includeStateVariables) { + Map> actionStates = fetchActionStateVariables(instance, actionBuilders.size(), maxActions); + actionBuilders.forEach(builder -> { + Map actionState = actionStates.get(builder.getId()); + if (actionState != null) { + builder.setUpdatedStateVariables(actionState); + } + }); + } + actionBuilders.stream().map(WorkflowInstanceAction.Builder::build).forEach(instance.actions::add); } private long getMaxActions(Long maxActions) { @@ -714,9 +721,17 @@ private long getMaxActions(Long maxActions) { return min(maxActions, workflowInstanceQueryMaxActions); } - private Map> fetchActionStateVariables(WorkflowInstance instance) { - return jdbc.query("select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc", - new WorkflowActionStateRowMapper(), instance.id); + private Map> fetchActionStateVariables(WorkflowInstance instance, long actions, long maxActions) { + String sql; + if (actions < maxActions) { + sql = "select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc"; + } else { + sql = "select nflow_workflow_state.* from (" + + sqlVariants.limit("select id from nflow_workflow_action where workflow_id = ? order by id desc", maxActions) + + ") action_id inner join nflow_workflow_state on action_id.id = nflow_workflow_state.action_id " + + "order by nflow_workflow_state.action_id, nflow_workflow_state.state_key asc"; + } + return jdbc.query(sql, new WorkflowActionStateRowMapper(), instance.id); } @Transactional(propagation = MANDATORY) @@ -780,30 +795,25 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep } } - static class WorkflowInstanceActionRowMapper implements RowMapper { + static class WorkflowInstanceActionRowMapper implements RowMapper { private final SQLVariants sqlVariants; - private final Map> actionStates; - public WorkflowInstanceActionRowMapper(SQLVariants sqlVariants, Map> actionStates) { + public WorkflowInstanceActionRowMapper(SQLVariants sqlVariants) { this.sqlVariants = sqlVariants; - this.actionStates = actionStates; } @Override - public WorkflowInstanceAction mapRow(ResultSet rs, int rowNum) throws SQLException { - long actionId = rs.getLong("id"); - Map actionState = actionStates.getOrDefault(actionId, emptyMap()); + public WorkflowInstanceAction.Builder mapRow(ResultSet rs, int rowNum) throws SQLException { return new WorkflowInstanceAction.Builder() // - .setId(actionId) // + .setId(rs.getLong("id")) // .setWorkflowInstanceId(rs.getLong("workflow_id")) // .setExecutorId(rs.getInt("executor_id")) // .setType(WorkflowActionType.valueOf(rs.getString("type"))) // .setState(rs.getString("state")) // .setStateText(rs.getString("state_text")) // - .setUpdatedStateVariables(actionState) // .setRetryNo(rs.getInt("retry_no")) // .setExecutionStart(sqlVariants.getDateTime(rs, "execution_start")) // - .setExecutionEnd(sqlVariants.getDateTime(rs, "execution_end")).build(); + .setExecutionEnd(sqlVariants.getDateTime(rs, "execution_end")); } } @@ -816,11 +826,7 @@ public Map> extractData(ResultSet rs) throws SQLExcept long actionId = rs.getLong("action_id"); String stateKey = rs.getString("state_key"); String stateValue = rs.getString("state_value"); - if (!actionStates.containsKey(actionId)) { - actionStates.put(actionId, new LinkedHashMap()); - } - Map stateMap = actionStates.get(actionId); - stateMap.put(stateKey, stateValue); + actionStates.computeIfAbsent(actionId, k -> new LinkedHashMap<>()).put(stateKey, stateValue); } return actionStates; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScanner.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScanner.java index 45aa023e8..6554e5ea6 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScanner.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScanner.java @@ -69,6 +69,7 @@ public Map getStateMethods(Class definition) { if (Mutable.class.isAssignableFrom(clazz)) { ParameterizedType pType = (ParameterizedType) type; type = pType.getActualTypeArguments()[0]; + clazz = (Class) type; readOnly = false; mutable = true; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/Mutable.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/Mutable.java index 7a7c28848..20c90e036 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/Mutable.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/Mutable.java @@ -1,5 +1,7 @@ package io.nflow.engine.workflow.definition; +import java.util.Objects; + /** * Wrapper class to provide mutable object for immutable value. * @@ -53,4 +55,21 @@ public T getVal() { public String toString() { return String.valueOf(val); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Mutable mutable = (Mutable) o; + return Objects.equals(val, mutable.val); + } + + @Override + public int hashCode() { + return Objects.hash(val); + } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java index 0177fd533..dba730bce 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java @@ -161,6 +161,14 @@ public Builder setId(long id) { return this; } + /** + * Get the action id (generated by database) + * @return The action id. + */ + public long getId() { + return id; + } + /** * Set the workflow instance identifier. * @param workflowInstanceId The workflow instance identifier. diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index dee500250..3afe1c18a 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -42,7 +42,6 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; @@ -867,10 +866,10 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo String status = jdbc.queryForObject("select status from nflow_workflow where id = ?", String.class, id); assertThat(status, is(inProgress.name())); - List actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?", - new WorkflowInstanceActionRowMapper(sqlVariant, Collections.emptyMap()), id); + List actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?", + new WorkflowInstanceActionRowMapper(sqlVariant), id); assertThat(actions.size(), is(1)); - WorkflowInstanceAction workflowInstanceAction = actions.get(0); + WorkflowInstanceAction workflowInstanceAction = actions.get(0).build(); assertThat(workflowInstanceAction.executorId, is(executorDao.getExecutorId())); assertThat(workflowInstanceAction.type, is(recovery)); assertThat(workflowInstanceAction.stateText, is("Recovered")); @@ -881,7 +880,7 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo assertThat(executorId, is(nullValue())); actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?", - new WorkflowInstanceActionRowMapper(sqlVariant, Collections.emptyMap()), id); + new WorkflowInstanceActionRowMapper(sqlVariant), id); assertThat(actions.size(), is(1)); assertThat(workflowInstanceAction.executorId, is(executorDao.getExecutorId())); assertThat(workflowInstanceAction.type, is(recovery)); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/ObjectStringMapperTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/ObjectStringMapperTest.java new file mode 100644 index 000000000..094122f88 --- /dev/null +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/ObjectStringMapperTest.java @@ -0,0 +1,79 @@ +package io.nflow.engine.internal.workflow; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Type; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.nflow.engine.internal.workflow.WorkflowStateMethod.StateParameter; +import io.nflow.engine.workflow.definition.Mutable; +import io.nflow.engine.workflow.definition.StateExecution; + +@ExtendWith(MockitoExtension.class) +class ObjectStringMapperTest { + + private final ObjectStringMapper mapper = new ObjectStringMapper(new ObjectMapper()); + + @Mock + StateExecution execution; + + @Test + public void stringRoundTrip() { + verifyRoundTrip(String.class, "test☃"); + } + + @Test + public void longRoundTrip() { + verifyRoundTrip(Long.class, -1L); + } + + private void verifyRoundTrip(Type type, Object value) { + String data = mapper.convertFromObject(null, value); + Object result = mapper.convertToObject(type, null, data); + assertThat(result, is(value)); + } + + @Test + public void methodStringParam() { + verifyParam(String.class, "test€", "test€"); + } + + @Test + public void methodLongParam() { + verifyParam(Long.class, "42", 42L); + } + + @Test + public void methodPrimitiveIntParam() { + verifyParam(Integer.TYPE, "128", 0, 0, 128); + } + + @Test + public void methodMutableLongParam() { + verifyParam(Long.class, "42", 0L, new Mutable<>(0L), new Mutable<>(42L)); + verifyParam(Long.class, "42", null, new Mutable<>(null), new Mutable<>(42L)); + } + + private void verifyParam(Type type, String strVal, Object expectedVal) { + verifyParam(type, strVal, null, null, expectedVal); + } + + private void verifyParam(Type type, String strVal, Object defaultVal, Object expectedDefautlVal, Object expectedVal) { + boolean mutable = expectedDefautlVal instanceof Mutable; + WorkflowStateMethod method = new WorkflowStateMethod(null, new StateParameter("null", type, defaultVal, false, mutable), + new StateParameter("key", type, defaultVal, false, mutable)); + when(execution.getVariable("null")).thenReturn(null); + when(execution.getVariable("key")).thenReturn(strVal); + Object[] vars = mapper.createArguments(execution, method); + assertThat(vars[1], is(expectedDefautlVal)); + assertThat(vars[2], is(expectedVal)); + } +} diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScannerTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScannerTest.java index 5acb0ceb9..cd1ac6adf 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScannerTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/WorkflowDefinitionScannerTest.java @@ -7,6 +7,7 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -67,6 +68,9 @@ public void mutableStateParamSetsMutableFlag() { Map methods = scanner.getStateMethods(MutableParamWorkflow.class); assertThat(methods.keySet(), hasItemsOf(asList("start", "end"))); assertThat(methods.get("end").params[0], stateParam("paramKey", String.class, false, true)); + assertThat(methods.get("end").params[0].nullValue, nullValue()); + assertThat(methods.get("end").params[1], stateParam("longKey", Long.class, false, true)); + assertThat(methods.get("end").params[1].nullValue, is(0L)); } @Test @@ -191,17 +195,20 @@ public void readOnlyStateVarFlagSetsFlagInStateParameter() { assertThat(methods.get("end").params[0], stateParam("paramKey", String.class, true, false)); } - private CustomMatcher stateParam(final String key, final Type type, final boolean readOnly, final boolean mutable) { - return new CustomMatcher("") { + private CustomMatcher stateParam(final String key, final Type type, final boolean readOnly, + final boolean mutable) { + return new CustomMatcher("") { @Override public boolean matches(Object item) { StateParameter p = (StateParameter) item; - return Objects.equals(key, p.key) && Objects.equals(type, p.type) - && Objects.equals(readOnly, p.readOnly) && Objects.equals(mutable, p.mutable); - + return Objects.equals(key, p.key) // + && Objects.equals(type, p.type) // + && Objects.equals(readOnly, p.readOnly) // + && Objects.equals(mutable, p.mutable); } }; } + public static enum ScannerState implements WorkflowState{ start(WorkflowStateType.start), end(WorkflowStateType.end); @@ -261,7 +268,7 @@ public MutableParamWorkflow() { super("mutableParam", ScannerState.start, ScannerState.end); } public NextAction start(StateExecution exec) { return null; } - public NextAction end(StateExecution exec, @StateVar("paramKey") Mutable param) { return null; } + public NextAction end(StateExecution exec, @StateVar("paramKey") Mutable param, @StateVar(value="longKey", instantiateIfNotExists=true) Mutable param2) { return null; } } public static class InitiateParameterWorkflow extends WorkflowDefinition { diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/ActionStateVariableWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/ActionStateVariableWorkflow.java new file mode 100644 index 000000000..dea71bfba --- /dev/null +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/ActionStateVariableWorkflow.java @@ -0,0 +1,61 @@ +package io.nflow.tests.demo.workflow; + +import static io.nflow.engine.workflow.definition.NextAction.moveToState; +import static io.nflow.engine.workflow.definition.NextAction.stopInState; +import static io.nflow.engine.workflow.definition.WorkflowStateType.end; +import static io.nflow.engine.workflow.definition.WorkflowStateType.manual; +import static io.nflow.engine.workflow.definition.WorkflowStateType.start; +import static io.nflow.tests.demo.workflow.ActionStateVariableWorkflow.State.done; +import static io.nflow.tests.demo.workflow.ActionStateVariableWorkflow.State.error; +import static io.nflow.tests.demo.workflow.ActionStateVariableWorkflow.State.setVariable; + +import org.springframework.stereotype.Component; + +import io.nflow.engine.workflow.definition.Mutable; +import io.nflow.engine.workflow.definition.NextAction; +import io.nflow.engine.workflow.definition.StateExecution; +import io.nflow.engine.workflow.definition.StateVar; +import io.nflow.engine.workflow.definition.WorkflowDefinition; +import io.nflow.engine.workflow.definition.WorkflowSettings; +import io.nflow.engine.workflow.definition.WorkflowStateType; + +@Component +public class ActionStateVariableWorkflow extends WorkflowDefinition { + + public static final String WORKFLOW_TYPE = "actionStateVariableWorkflow"; + public static final int MAX_STATE_VAR_VALUE = 10; + private static final String STATE_VAR = "stateVar"; + + public static enum State implements io.nflow.engine.workflow.definition.WorkflowState { + setVariable(start), done(end), error(manual); + + private final WorkflowStateType type; + + private State(WorkflowStateType type) { + this.type = type; + } + + @Override + public WorkflowStateType getType() { + return type; + } + } + + public ActionStateVariableWorkflow() { + super(WORKFLOW_TYPE, setVariable, error, new WorkflowSettings.Builder().setMinErrorTransitionDelay(0) + .setMaxErrorTransitionDelay(0).setShortTransitionDelay(0).setMaxRetries(3).build()); + setDescription("Workflow for testing action state variables"); + permit(setVariable, setVariable); + permit(setVariable, done); + } + + public NextAction setVariable(StateExecution execution, + @StateVar(value = STATE_VAR, instantiateIfNotExists = true) Mutable val) { + if (val.getVal() >= MAX_STATE_VAR_VALUE) { + execution.setCreateAction(false); + return stopInState(done, "Done"); + } + val.setVal(val.getVal() + 1); + return moveToState(setVariable, "Continue"); + } +} diff --git a/nflow-tests/src/test/java/io/nflow/tests/ActionStateVariablesTest.java b/nflow-tests/src/test/java/io/nflow/tests/ActionStateVariablesTest.java new file mode 100644 index 000000000..4067fecdf --- /dev/null +++ b/nflow-tests/src/test/java/io/nflow/tests/ActionStateVariablesTest.java @@ -0,0 +1,75 @@ +package io.nflow.tests; + +import static java.lang.Thread.sleep; +import static java.time.Duration.ofSeconds; +import static java.util.UUID.randomUUID; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.springframework.context.annotation.Bean; + +import io.nflow.rest.v1.msg.Action; +import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; +import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; +import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse; +import io.nflow.tests.demo.workflow.ActionStateVariableWorkflow; +import io.nflow.tests.extension.NflowServerConfig; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class ActionStateVariablesTest extends AbstractNflowTest { + + public static NflowServerConfig server = new NflowServerConfig.Builder().springContextClass(TestConfiguration.class).build(); + private static CreateWorkflowInstanceRequest createRequest; + private static CreateWorkflowInstanceResponse createResponse; + + public ActionStateVariablesTest() { + super(server); + } + + static class TestConfiguration { + @Bean + public ActionStateVariableWorkflow actionStateVariableWorkflow() { + return new ActionStateVariableWorkflow(); + } + } + + @Test + @Order(1) + public void createWorkflow() { + createRequest = new CreateWorkflowInstanceRequest(); + createRequest.type = ActionStateVariableWorkflow.WORKFLOW_TYPE; + createRequest.externalId = randomUUID().toString(); + createResponse = assertTimeoutPreemptively(ofSeconds(5), () -> createWorkflowInstance(createRequest)); + assertThat(createResponse.id, notNullValue()); + } + + @Test + @Order(2) + public void checkActionStateVariables() { + int maxActions = 5; + ListWorkflowInstanceResponse response = assertTimeoutPreemptively(ofSeconds(5), () -> { + ListWorkflowInstanceResponse wf = null; + do { + sleep(200); + wf = getInstanceIdResource(createResponse.id) // + .query("include", "actions,actionStateVariables") // + .query("maxActions", maxActions) // + .get(ListWorkflowInstanceResponse.class); + } while (wf == null || !"done".equals(wf.state)); + return wf; + }); + + assertThat(response.actions, hasSize(maxActions)); + for (int i = 0; i < maxActions; i++) { + Action action = response.actions.get(i); + assertEquals(ActionStateVariableWorkflow.MAX_STATE_VAR_VALUE - i, action.updatedStateVariables.get("stateVar")); + } + } +}