Skip to content

Commit

Permalink
fix sql perf / memory issue (#415)
Browse files Browse the repository at this point in the history
* fix sql perf / memory issue

* fetch all states if found actions < maxActions

* fix tests to compile

* fix h2 sql syntax error and add test

* avoid creating temp list

* Fix @StateVar(instantiateIfNotExists=true) Mutable<Type> instantiation

* fix formatting and typo and jdk8 compatibility

Co-authored-by: Edvard Fonsell <[email protected]>
Co-authored-by: Mikko Tiihonen <[email protected]>
  • Loading branch information
3 people authored Dec 22, 2020
1 parent b1ad83b commit b1dfaa9
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 55 deletions.
15 changes: 9 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
## 7.2.1-SNAPSHOT (future release)

**Highlights**
- `nflow-engine`
- Optimize fetching workflow instance with large history with many modified state variables.

- `nflow-rest-api-spring-web` and `nflow-netty`
- Change REST API calls to use a dedicated thread pool for all blocking database operations to avoid blocking the netty EventLoop thread.

**Details**

- `nflow-engine`
- Fix SQL performance / memory issue when getting single workflow instance with action state variables, when the instance has lots of actions with lots of state variables. The old code fetched all state variables of all actions of the instance, the new code only fetches the state variables for the actions that will be returned.
- Fix instantiation of @StateVar(instantiateIfNotExists=true) Mutable<Type> - the result was incorrectly a Mutable wrapped in Mutable.
- Fix potential resource leaks
- Dependency updates:
- spring 5.2.8
Expand Down Expand Up @@ -266,7 +269,7 @@
**Details**
- `nflow-engine`
- Add `priority` two byte integer to the `nflow_workflow` table. When the dispatcher chooses from many available scheduled workflow instances it primarily (unfairly) picks the workflow instances with the largest priority values, and for workflows with the same priority, the ones with oldest `next_activation` timestamp. Priority defaults to 0 and can also be negative. Default priority value for the new workflow instances can be set per workflow definition (`WorkflowSettings.Builder.setDefaultPriority`), and overridden per workflow instance (`WorkflowInstance.Builder.setPriority`). Requires database migration, see database update scripts for details.
- Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes.
- Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes.
- Add `disableMariaDbDriver` to default MySQL JDBC URL so that in case there are both MySQL and MariaDB JDBC drivers in the classpath then MariaDB will not steal the MySQL URL.
- Add support for `nflow.db.mariadb` profile.
- Update database indices to match current workflow instance polling code.
Expand Down Expand Up @@ -408,8 +411,8 @@ This release introduced issue #306 which may cause OutOfMemory errors while fetc
- jetty 9.4.15.v20190215
- h2 1.4.199
- Fix workflow history cleanup to keep the actions that hold the latest values of state variables
- nFlow Explorer: Custom content to workflow definition and workflow instance pages.
- nFlow Explorer: Executors page to use standard time formatting in tooltips
- nFlow Explorer: Custom content to workflow definition and workflow instance pages.
- nFlow Explorer: Executors page to use standard time formatting in tooltips
- nFlow netty: Add support for registering Spring ApplicationListeners
- nFlow jetty: Replace deprecated NCSARequestLog with CustomRequestLog
- Fix `WorkflowLifecycle.stop()` blocking forever if `nflow.autostart=false` and `WorkflowLifecycle.start()` not called
Expand Down Expand Up @@ -496,7 +499,7 @@ This release introduced issue #306 which may cause OutOfMemory errors while fetc
- `nflow-jetty` now serves all paths under `/nflow/*`. The new paths are as follows:
- /nflow/api/v1 -> API v1 (was: /api/nflow/v1)
- /nflow/api/swagger.json -> Swagger config (was: /api/swagger.json)
- /nflow/ui -> nFlow statics assets
- /nflow/ui -> nFlow statics assets
- /nflow/ui/explorer -> nFlow UI (was: /explorer)
- /nflow/ui/doc -> Swagger UI (was: /doc)
- /nflow/metrics -> metrics and health checks (was: /metrics)
Expand Down Expand Up @@ -644,7 +647,7 @@ This release introduced issue #306 which may cause OutOfMemory errors while fetc
- fixed: workflow instance recovery functionality (broken by version 2.0.0)
- fixed: Oracle database schema
- nflow-rest-api:
- **_breaking change:_** Prefixed operation paths by "/nflow" (e.g. /v1/statistics -> /nflow/v1/statistics)
- **_breaking change:_** Prefixed operation paths by "/nflow" (e.g. /v1/statistics -> /nflow/v1/statistics)
- Support for Jersey JAX-RS implementation
- **_breaking change:_** Moved exception mappers to nflow-jetty (BadRequestExceptionMapper, CustomValidationExceptionMapper, NotFoundExceptionMapper)
- Improved Swagger documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,10 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) {
try {
StringBuilder sqlb = new StringBuilder(256);
sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)");
Object[] instanceValues = new Object[] { instance.type, instance.priority,
instance.parentWorkflowId, instance.parentActionId, instance.businessKey, instance.externalId,
executorInfo.getExecutorGroup(), instance.status.name(), instance.state,
abbreviate(instance.stateText, getInstanceStateTextLength()), toTimestamp(instance.nextActivation),
instance.signal.orElse(null) };
Object[] instanceValues = new Object[] { instance.type, instance.priority, instance.parentWorkflowId,
instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(),
instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()),
toTimestamp(instance.nextActivation), instance.signal.orElse(null) };
int pos = instanceValues.length;
Object[] args = Arrays.copyOf(instanceValues, pos + instance.stateVariables.size() * 2);
for (Entry<String, String> variable : instance.stateVariables.entrySet()) {
Expand Down Expand Up @@ -523,9 +522,9 @@ private void fillState(final WorkflowInstance instance) {
jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside "
+ "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key",
rs -> {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}, instance.id);
rs -> {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}, instance.id);
instance.originalStateVariables.putAll(instance.stateVariables);
}

Expand Down Expand Up @@ -557,8 +556,8 @@ private List<Long> pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize)

private List<Long> pollNextWorkflowInstanceIdsWithTransaction(final int batchSize) {
String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize);
List<OptimisticLockKey> instances = transaction.execute(tx -> jdbc.query(sql,
(rs, rowNum) -> new OptimisticLockKey(rs.getLong("id"), sqlVariants.getTimestamp(rs, "modified"))));
List<OptimisticLockKey> instances = transaction.execute(
tx -> jdbc.query(sql, (rs, rowNum) -> new OptimisticLockKey(rs.getLong("id"), sqlVariants.getTimestamp(rs, "modified"))));
if (instances.isEmpty()) {
return emptyList();
}
Expand Down Expand Up @@ -588,8 +587,7 @@ private List<Long> updateNextWorkflowInstancesWithMultipleUpdates(List<Optimisti
private List<Long> updateNextWorkflowInstancesWithBatchUpdate(List<OptimisticLockKey> instances) {
String sql = updateInstanceForExecutionQuery() + " where id = ? and modified = ? and executor_id is null";
List<Object[]> batchArgs = instances.stream()
.map(instance -> new Object[] { instance.id, sqlVariants.tuneTimestampForDb(instance.modified) })
.collect(toList());
.map(instance -> new Object[] { instance.id, sqlVariants.tuneTimestampForDb(instance.modified) }).collect(toList());
int[] updateStatuses = jdbc.batchUpdate(sql, batchArgs);
List<Long> ids = new ArrayList<>(instances.size());
for (int i = 0; i < updateStatuses.length; ++i) {
Expand Down Expand Up @@ -669,7 +667,7 @@ public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInst
sql = sqlVariants.limit(sql, getMaxResults(query.maxResults));

Stream<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build);
.map(WorkflowInstance.Builder::build);
if (query.includeCurrentStateVariables) {
ret = ret.peek(instance -> fillState(instance));
}
Expand Down Expand Up @@ -698,13 +696,22 @@ private long getMaxResults(Long maxResults) {
return min(maxResults, workflowInstanceQueryMaxResults);
}

private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) {
Map<Long, Map<String, String>> actionStates = includeStateVariables ? fetchActionStateVariables(instance)
: EMPTY_ACTION_STATE_MAP;
String sql = sqlVariants.limit(
"select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc",
getMaxActions(maxActions));
instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id));
private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long requestedMaxActions) {
long maxActions = getMaxActions(requestedMaxActions);
String sql = sqlVariants
.limit("select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", maxActions);
List<WorkflowInstanceAction.Builder> actionBuilders = jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants),
instance.id);
if (includeStateVariables) {
Map<Long, Map<String, String>> actionStates = fetchActionStateVariables(instance, actionBuilders.size(), maxActions);
actionBuilders.forEach(builder -> {
Map<String, String> actionState = actionStates.get(builder.getId());
if (actionState != null) {
builder.setUpdatedStateVariables(actionState);
}
});
}
actionBuilders.stream().map(WorkflowInstanceAction.Builder::build).forEach(instance.actions::add);
}

private long getMaxActions(Long maxActions) {
Expand All @@ -714,9 +721,17 @@ private long getMaxActions(Long maxActions) {
return min(maxActions, workflowInstanceQueryMaxActions);
}

private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance) {
return jdbc.query("select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc",
new WorkflowActionStateRowMapper(), instance.id);
private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance, long actions, long maxActions) {
String sql;
if (actions < maxActions) {
sql = "select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc";
} else {
sql = "select nflow_workflow_state.* from ("
+ sqlVariants.limit("select id from nflow_workflow_action where workflow_id = ? order by id desc", maxActions)
+ ") action_id inner join nflow_workflow_state on action_id.id = nflow_workflow_state.action_id "
+ "order by nflow_workflow_state.action_id, nflow_workflow_state.state_key asc";
}
return jdbc.query(sql, new WorkflowActionStateRowMapper(), instance.id);
}

@Transactional(propagation = MANDATORY)
Expand Down Expand Up @@ -780,30 +795,25 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep
}
}

static class WorkflowInstanceActionRowMapper implements RowMapper<WorkflowInstanceAction> {
static class WorkflowInstanceActionRowMapper implements RowMapper<WorkflowInstanceAction.Builder> {
private final SQLVariants sqlVariants;
private final Map<Long, Map<String, String>> actionStates;

public WorkflowInstanceActionRowMapper(SQLVariants sqlVariants, Map<Long, Map<String, String>> actionStates) {
public WorkflowInstanceActionRowMapper(SQLVariants sqlVariants) {
this.sqlVariants = sqlVariants;
this.actionStates = actionStates;
}

@Override
public WorkflowInstanceAction mapRow(ResultSet rs, int rowNum) throws SQLException {
long actionId = rs.getLong("id");
Map<String, String> actionState = actionStates.getOrDefault(actionId, emptyMap());
public WorkflowInstanceAction.Builder mapRow(ResultSet rs, int rowNum) throws SQLException {
return new WorkflowInstanceAction.Builder() //
.setId(actionId) //
.setId(rs.getLong("id")) //
.setWorkflowInstanceId(rs.getLong("workflow_id")) //
.setExecutorId(rs.getInt("executor_id")) //
.setType(WorkflowActionType.valueOf(rs.getString("type"))) //
.setState(rs.getString("state")) //
.setStateText(rs.getString("state_text")) //
.setUpdatedStateVariables(actionState) //
.setRetryNo(rs.getInt("retry_no")) //
.setExecutionStart(sqlVariants.getDateTime(rs, "execution_start")) //
.setExecutionEnd(sqlVariants.getDateTime(rs, "execution_end")).build();
.setExecutionEnd(sqlVariants.getDateTime(rs, "execution_end"));
}
}

Expand All @@ -816,11 +826,7 @@ public Map<Long, Map<String, String>> extractData(ResultSet rs) throws SQLExcept
long actionId = rs.getLong("action_id");
String stateKey = rs.getString("state_key");
String stateValue = rs.getString("state_value");
if (!actionStates.containsKey(actionId)) {
actionStates.put(actionId, new LinkedHashMap<String, String>());
}
Map<String, String> stateMap = actionStates.get(actionId);
stateMap.put(stateKey, stateValue);
actionStates.computeIfAbsent(actionId, k -> new LinkedHashMap<>()).put(stateKey, stateValue);
}
return actionStates;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public Map<String, WorkflowStateMethod> getStateMethods(Class<?> definition) {
if (Mutable.class.isAssignableFrom(clazz)) {
ParameterizedType pType = (ParameterizedType) type;
type = pType.getActualTypeArguments()[0];
clazz = (Class<?>) type;
readOnly = false;
mutable = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.nflow.engine.workflow.definition;

import java.util.Objects;

/**
* Wrapper class to provide mutable object for immutable value.
*
Expand Down Expand Up @@ -53,4 +55,21 @@ public T getVal() {
public String toString() {
return String.valueOf(val);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Mutable<?> mutable = (Mutable<?>) o;
return Objects.equals(val, mutable.val);
}

@Override
public int hashCode() {
return Objects.hash(val);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ public Builder setId(long id) {
return this;
}

/**
* Get the action id (generated by database)
* @return The action id.
*/
public long getId() {
return id;
}

/**
* Set the workflow instance identifier.
* @param workflowInstanceId The workflow instance identifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -867,10 +866,10 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo
String status = jdbc.queryForObject("select status from nflow_workflow where id = ?", String.class, id);
assertThat(status, is(inProgress.name()));

List<WorkflowInstanceAction> actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?",
new WorkflowInstanceActionRowMapper(sqlVariant, Collections.emptyMap()), id);
List<WorkflowInstanceAction.Builder> actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?",
new WorkflowInstanceActionRowMapper(sqlVariant), id);
assertThat(actions.size(), is(1));
WorkflowInstanceAction workflowInstanceAction = actions.get(0);
WorkflowInstanceAction workflowInstanceAction = actions.get(0).build();
assertThat(workflowInstanceAction.executorId, is(executorDao.getExecutorId()));
assertThat(workflowInstanceAction.type, is(recovery));
assertThat(workflowInstanceAction.stateText, is("Recovered"));
Expand All @@ -881,7 +880,7 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo
assertThat(executorId, is(nullValue()));

actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?",
new WorkflowInstanceActionRowMapper(sqlVariant, Collections.emptyMap()), id);
new WorkflowInstanceActionRowMapper(sqlVariant), id);
assertThat(actions.size(), is(1));
assertThat(workflowInstanceAction.executorId, is(executorDao.getExecutorId()));
assertThat(workflowInstanceAction.type, is(recovery));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.nflow.engine.internal.workflow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

import java.lang.reflect.Type;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.nflow.engine.internal.workflow.WorkflowStateMethod.StateParameter;
import io.nflow.engine.workflow.definition.Mutable;
import io.nflow.engine.workflow.definition.StateExecution;

@ExtendWith(MockitoExtension.class)
class ObjectStringMapperTest {

private final ObjectStringMapper mapper = new ObjectStringMapper(new ObjectMapper());

@Mock
StateExecution execution;

@Test
public void stringRoundTrip() {
verifyRoundTrip(String.class, "test☃");
}

@Test
public void longRoundTrip() {
verifyRoundTrip(Long.class, -1L);
}

private void verifyRoundTrip(Type type, Object value) {
String data = mapper.convertFromObject(null, value);
Object result = mapper.convertToObject(type, null, data);
assertThat(result, is(value));
}

@Test
public void methodStringParam() {
verifyParam(String.class, "test€", "test€");
}

@Test
public void methodLongParam() {
verifyParam(Long.class, "42", 42L);
}

@Test
public void methodPrimitiveIntParam() {
verifyParam(Integer.TYPE, "128", 0, 0, 128);
}

@Test
public void methodMutableLongParam() {
verifyParam(Long.class, "42", 0L, new Mutable<>(0L), new Mutable<>(42L));
verifyParam(Long.class, "42", null, new Mutable<>(null), new Mutable<>(42L));
}

private void verifyParam(Type type, String strVal, Object expectedVal) {
verifyParam(type, strVal, null, null, expectedVal);
}

private void verifyParam(Type type, String strVal, Object defaultVal, Object expectedDefautlVal, Object expectedVal) {
boolean mutable = expectedDefautlVal instanceof Mutable;
WorkflowStateMethod method = new WorkflowStateMethod(null, new StateParameter("null", type, defaultVal, false, mutable),
new StateParameter("key", type, defaultVal, false, mutable));
when(execution.getVariable("null")).thenReturn(null);
when(execution.getVariable("key")).thenReturn(strVal);
Object[] vars = mapper.createArguments(execution, method);
assertThat(vars[1], is(expectedDefautlVal));
assertThat(vars[2], is(expectedVal));
}
}
Loading

0 comments on commit b1dfaa9

Please sign in to comment.