diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e41b6f61..1b3c41ea2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ **Details** - Popup notifications from workflow instance updates, network and authentication issues in Explorer - Upgraded Spring to version 5.1.3.RELEASE +- Workflow instance history (actions, states) cleanup as part of state processing, configurable through `WorkflowSettings` ## 5.2.0 (2018-11-20) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 8e01409a4..1a1012196 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -872,4 +872,20 @@ public String getWorkflowInstanceType(Integer workflowInstanceId) { return jdbc.queryForObject("select type from nflow_workflow where id = ?", String.class, workflowInstanceId); } + @Transactional + public int deleteWorkflowInstanceHistory(Integer workflowInstanceId, Integer historyDeletableAfterHours) { + MapSqlParameterSource params = new MapSqlParameterSource(); + params.addValue("workflowId", workflowInstanceId); + params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours))); + Integer maxActionId = namedJdbc.queryForObject( + "select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class); + int deletedActions = 0; + if (maxActionId != null) { + params.addValue("maxActionId", maxActionId); + namedJdbc.update("delete from nflow_workflow_state where workflow_id = :workflowId and action_id <= :maxActionId", params); + deletedActions = namedJdbc.update("delete from nflow_workflow_action where workflow_id = :workflowId and id <= :maxActionId", params); + } + return deletedActions; + } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index bb6e4740c..01c6a0c02 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -74,6 +75,7 @@ class WorkflowStateProcessor implements Runnable { private final Map processingInstances; private long startTimeSeconds; private Thread thread; + private Random rnd = new Random(); WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, @@ -151,6 +153,7 @@ private void runImpl() { processAfterFailureListeners(listenerContext, execution.getThrown()); } else { processAfterListeners(listenerContext); + optionallyCleanupWorkflowInstanceHistory(definition.getSettings()); } subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution); instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder); @@ -299,6 +302,17 @@ private NextAction processWithListeners(ListenerContext listenerContext, Workflo return new SkippedStateHandler(nextAction, instance, definition, execution, state).processState(); } + private void optionallyCleanupWorkflowInstanceHistory(WorkflowSettings settings) { + if (settings.historyDeletableAfterHours != null && roughlyEveryTenthTime()) { + logger.debug("Cleaning workflow history older than {} hours", settings.historyDeletableAfterHours); + workflowInstanceDao.deleteWorkflowInstanceHistory(instanceId, settings.historyDeletableAfterHours); + } + } + + private boolean roughlyEveryTenthTime() { + return rnd.nextInt(10) == 0; + } + static class ExecutorListenerChain implements ListenerChain { private final Iterator chain; @@ -543,4 +557,8 @@ private StringBuilder getStackTraceAsString() { } return sb; } + + void setRandom(Random rnd) { + this.rnd = rnd; + } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java index 1cb70544a..b39237092 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java @@ -49,6 +49,11 @@ public class WorkflowSettings extends ModelObject { * Maximum number of subsequent state executions before forcing a short transition delay, per state. */ public final Map maxSubsequentStateExecutionsPerState; + /** + * Delay after which workflow instance history (actions, states) can be deleted from database by nFlow. + * Unit is hours. + */ + public final Integer historyDeletableAfterHours; WorkflowSettings(Builder builder) { this.minErrorTransitionDelay = builder.minErrorTransitionDelay; @@ -58,6 +63,7 @@ public class WorkflowSettings extends ModelObject { this.maxRetries = builder.maxRetries; this.maxSubsequentStateExecutions = builder.maxSubsequentStateExecutions; this.maxSubsequentStateExecutionsPerState = new HashMap<>(builder.maxSubsequentStateExecutionsPerState); + this.historyDeletableAfterHours = builder.historyDeletableAfterHours; } /** @@ -72,6 +78,7 @@ public static class Builder { int maxRetries = 17; int maxSubsequentStateExecutions = 100; Map maxSubsequentStateExecutionsPerState = new HashMap<>(); + Integer historyDeletableAfterHours; /** * Set the maximum delay on execution retry after an error. @@ -159,6 +166,19 @@ public Builder setMaxSubsequentStateExecutions(WorkflowState state, int maxSubse return this; } + /** + * Set the delay after which workflow history (actions, states) can be deleted from the database by nFlow. + * The default value (null) indicates that history is not deletable. + * + * @param historyDeletableAfterHours + * Delay in hours. + * @return this. + */ + public Builder setHistoryDeletableAfterHours(Integer historyDeletableAfterHours) { + this.historyDeletableAfterHours = historyDeletableAfterHours; + return this; + } + /** * Create workflow settings object. * diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 96fab9130..601f3137e 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -727,7 +727,7 @@ public void insertingSubWorkflowWorks() { int parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, started); + int parentActionId = addWorkflowAction(parentWorkflowId, i1, started, started.plusMillis(100)); WorkflowInstance createdInstance = dao.getWorkflowInstance(parentWorkflowId, EnumSet.allOf(WorkflowInstanceInclude.class), null); checkSameWorkflowInfo(i1, createdInstance); @@ -758,7 +758,7 @@ public void wakeUpWorkflowExternallyWorksWithEmptyExpectedStates() { assertThat(createdWorkflow.nextActivation, equalTo(scheduled)); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, now); + int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); assertThat(parentActionId, not(equalTo(-1))); int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); @@ -783,7 +783,7 @@ public void wakeUpWorkflowExternallyWorksWithExpectedStates() { assertThat(createdWorkflow.nextActivation, equalTo(scheduled)); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, now); + int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); assertThat(parentActionId, not(equalTo(-1))); int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); @@ -808,7 +808,7 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() { assertThat(createdWorkflow.nextActivation, equalTo(scheduled)); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, now); + int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); assertThat(parentActionId, not(equalTo(-1))); int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); @@ -903,6 +903,22 @@ public void clearingSignalInsertsAction() { assertThat(dao.getSignal(instanceId), is(Optional.empty())); } + @Test + public void deleteExpiredWorkflowHistory() { + WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); + int workflowId = dao.insertWorkflowInstance(i1); + addWorkflowAction(workflowId, new WorkflowInstance.Builder(i1).putStateVariable("deletedVariable", "deletedValue").build(), now(), now()); + addWorkflowAction(workflowId, new WorkflowInstance.Builder(i1).putStateVariable("preservedVariable", "preservedValue").build(), now(), now().plusDays(1)); + + assertThat(dao.deleteWorkflowInstanceHistory(workflowId, 0), equalTo(1)); + + i1 = dao.getWorkflowInstance(workflowId, EnumSet.allOf(WorkflowInstanceInclude.class), null); + assertThat(i1.getStateVariable("requestData"), equalTo("{ \"parameter\": \"abc\" }")); + assertThat(i1.getStateVariable("deletedVariable"), is(nullValue())); + assertThat(i1.getStateVariable("preservedVariable"), equalTo("preservedValue")); + assertThat(i1.actions.size(), equalTo(1)); + } + private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance i2) { assertThat(i1.type, equalTo(i2.type)); assertThat(i1.executorId, equalTo(i2.executorId)); @@ -924,9 +940,9 @@ private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance assertThat(i1.started, equalTo(i2.started)); } - private int addWorkflowAction(int workflowId, final WorkflowInstance instance, DateTime started) { + private int addWorkflowAction(int workflowId, final WorkflowInstance instance, DateTime started, DateTime ended) { final WorkflowInstanceAction action = new WorkflowInstanceAction.Builder().setExecutionStart(started).setExecutorId(42) - .setExecutionEnd(started.plusMillis(100)).setRetryNo(1).setType(stateExecution).setState("test") + .setExecutionEnd(ended).setRetryNo(1).setType(stateExecution).setState("test") .setStateText("state text") .setWorkflowInstanceId(workflowId).build(); int actionId = transaction.execute(new TransactionCallback() { diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index 5dd8a1b22..872db90df 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -43,9 +44,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.stream.IntStream; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -85,6 +88,7 @@ import io.nflow.engine.workflow.definition.TestWorkflow; import io.nflow.engine.workflow.definition.WorkflowDefinition; import io.nflow.engine.workflow.definition.WorkflowDefinitionTest.TestDefinition; +import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; import io.nflow.engine.workflow.instance.WorkflowInstance; @@ -120,6 +124,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { @Mock StateExecutionImpl executionMock; + @Mock + Random rnd; + @Captor ArgumentCaptor update; @@ -175,6 +182,8 @@ public void setup() { filterChain(listener1); filterChain(listener2); when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries); + when(rnd.nextInt(anyInt())).thenReturn(0); + executor.setRandom(rnd); } @After @@ -852,6 +861,35 @@ public void saveStateRetryAfterFailedPersistence() throws InterruptedException { verify(workflowInstanceDao, atLeast(2)).updateWorkflowInstanceAfterExecution(any(), any(), any(), any(), anyBoolean()); } + @Test + public void cleanupWorkflowInstanceHistoryNotExecutedRoughlyNineTimesOfTen() { + WorkflowInstance instance = executingInstanceBuilder().setType("execute-test").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + IntStream.range(1, 10).forEach(i -> { + when(rnd.nextInt(anyInt())).thenReturn(i); + executor.run(); + verify(workflowInstanceDao, never()).deleteWorkflowInstanceHistory(any(), any()); + }); + } + + @Test + public void cleanupWorkflowInstanceHistoryNotExecutedBasedOnSettings() { + WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + executor.run(); + + verify(workflowInstanceDao, never()).deleteWorkflowInstanceHistory(any(), any()); + } + + @Test + public void cleanupWorkflowInstanceHistoryExecutedRoughlyOneTimeofTen() { + WorkflowInstance instance = executingInstanceBuilder().setType("execute-test").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + executor.run(); + + verify(workflowInstanceDao).deleteWorkflowInstanceHistory(instance.id, executeWf.getSettings().historyDeletableAfterHours); + } + public static class Pojo { public String field; public boolean test; @@ -863,7 +901,7 @@ public static class ExecuteTestWorkflow extends WorkflowDefinition { protected ExecuteTestWorkflow() { - super("test", State.start, State.error); + super("test", State.start, State.error, new WorkflowSettings.Builder().setHistoryDeletableAfterHours(1).build()); permit(State.start, State.process, State.error); permit(State.process, State.done, State.error); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/workflow/definition/WorkflowSettingsTest.java b/nflow-engine/src/test/java/io/nflow/engine/workflow/definition/WorkflowSettingsTest.java index 54ec846aa..238bd4b0c 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/workflow/definition/WorkflowSettingsTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/workflow/definition/WorkflowSettingsTest.java @@ -4,6 +4,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.core.IsNull.nullValue; import static org.joda.time.DateTimeUtils.currentTimeMillis; import static org.junit.Assert.assertThat; @@ -34,6 +35,7 @@ public void verifyConstantDefaultValues() { long delta = s.getShortTransitionActivation().getMillis() - currentTimeMillis() - 30000; assertThat(delta, greaterThanOrEqualTo(-1000L)); assertThat(delta, lessThanOrEqualTo(0L)); + assertThat(s.historyDeletableAfterHours, is(nullValue())); } @Test