Skip to content

Commit

Permalink
Cleanup workflow history based on WorkflowSettings (#277)
Browse files Browse the repository at this point in the history
* Cleanup workflow history based on WorkflowSettings
* WorkflowSettings.historyDeletableAfter -> .historyDeletableAfterHours
* Use Random when considering workflow history cleanup
  • Loading branch information
eputtone authored and gmokki committed Jan 3, 2019
1 parent f1a59d0 commit 8e02b13
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Details**
- Popup notifications from workflow instance updates, network and authentication issues in Explorer
- Upgraded Spring to version 5.1.3.RELEASE
- Workflow instance history (actions, states) cleanup as part of state processing, configurable through `WorkflowSettings`

## 5.2.0 (2018-11-20)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,20 @@ public String getWorkflowInstanceType(Integer workflowInstanceId) {
return jdbc.queryForObject("select type from nflow_workflow where id = ?", String.class, workflowInstanceId);
}

@Transactional
public int deleteWorkflowInstanceHistory(Integer workflowInstanceId, Integer historyDeletableAfterHours) {
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("workflowId", workflowInstanceId);
params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours)));
Integer maxActionId = namedJdbc.queryForObject(
"select max(id) from nflow_workflow_action where workflow_id = :workflowId and " +
sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class);
int deletedActions = 0;
if (maxActionId != null) {
params.addValue("maxActionId", maxActionId);
namedJdbc.update("delete from nflow_workflow_state where workflow_id = :workflowId and action_id <= :maxActionId", params);
deletedActions = namedJdbc.update("delete from nflow_workflow_action where workflow_id = :workflowId and id <= :maxActionId", params);
}
return deletedActions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand Down Expand Up @@ -74,6 +75,7 @@ class WorkflowStateProcessor implements Runnable {
private final Map<Integer, WorkflowStateProcessor> processingInstances;
private long startTimeSeconds;
private Thread thread;
private Random rnd = new Random();

WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao,
Expand Down Expand Up @@ -151,6 +153,7 @@ private void runImpl() {
processAfterFailureListeners(listenerContext, execution.getThrown());
} else {
processAfterListeners(listenerContext);
optionallyCleanupWorkflowInstanceHistory(definition.getSettings());
}
subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution);
instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder);
Expand Down Expand Up @@ -299,6 +302,17 @@ private NextAction processWithListeners(ListenerContext listenerContext, Workflo
return new SkippedStateHandler(nextAction, instance, definition, execution, state).processState();
}

private void optionallyCleanupWorkflowInstanceHistory(WorkflowSettings settings) {
if (settings.historyDeletableAfterHours != null && roughlyEveryTenthTime()) {
logger.debug("Cleaning workflow history older than {} hours", settings.historyDeletableAfterHours);
workflowInstanceDao.deleteWorkflowInstanceHistory(instanceId, settings.historyDeletableAfterHours);
}
}

private boolean roughlyEveryTenthTime() {
return rnd.nextInt(10) == 0;
}

static class ExecutorListenerChain implements ListenerChain {
private final Iterator<WorkflowExecutorListener> chain;

Expand Down Expand Up @@ -543,4 +557,8 @@ private StringBuilder getStackTraceAsString() {
}
return sb;
}

void setRandom(Random rnd) {
this.rnd = rnd;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class WorkflowSettings extends ModelObject {
* Maximum number of subsequent state executions before forcing a short transition delay, per state.
*/
public final Map<WorkflowState, Integer> maxSubsequentStateExecutionsPerState;
/**
* Delay after which workflow instance history (actions, states) can be deleted from database by nFlow.
* Unit is hours.
*/
public final Integer historyDeletableAfterHours;

WorkflowSettings(Builder builder) {
this.minErrorTransitionDelay = builder.minErrorTransitionDelay;
Expand All @@ -58,6 +63,7 @@ public class WorkflowSettings extends ModelObject {
this.maxRetries = builder.maxRetries;
this.maxSubsequentStateExecutions = builder.maxSubsequentStateExecutions;
this.maxSubsequentStateExecutionsPerState = new HashMap<>(builder.maxSubsequentStateExecutionsPerState);
this.historyDeletableAfterHours = builder.historyDeletableAfterHours;
}

/**
Expand All @@ -72,6 +78,7 @@ public static class Builder {
int maxRetries = 17;
int maxSubsequentStateExecutions = 100;
Map<WorkflowState, Integer> maxSubsequentStateExecutionsPerState = new HashMap<>();
Integer historyDeletableAfterHours;

/**
* Set the maximum delay on execution retry after an error.
Expand Down Expand Up @@ -159,6 +166,19 @@ public Builder setMaxSubsequentStateExecutions(WorkflowState state, int maxSubse
return this;
}

/**
* Set the delay after which workflow history (actions, states) can be deleted from the database by nFlow.
* The default value (<code>null</code>) indicates that history is not deletable.
*
* @param historyDeletableAfterHours
* Delay in hours.
* @return this.
*/
public Builder setHistoryDeletableAfterHours(Integer historyDeletableAfterHours) {
this.historyDeletableAfterHours = historyDeletableAfterHours;
return this;
}

/**
* Create workflow settings object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ public void insertingSubWorkflowWorks() {
int parentWorkflowId = dao.insertWorkflowInstance(i1);
assertThat(parentWorkflowId, not(equalTo(-1)));

int parentActionId = addWorkflowAction(parentWorkflowId, i1, started);
int parentActionId = addWorkflowAction(parentWorkflowId, i1, started, started.plusMillis(100));
WorkflowInstance createdInstance = dao.getWorkflowInstance(parentWorkflowId, EnumSet.allOf(WorkflowInstanceInclude.class),
null);
checkSameWorkflowInfo(i1, createdInstance);
Expand Down Expand Up @@ -758,7 +758,7 @@ public void wakeUpWorkflowExternallyWorksWithEmptyExpectedStates() {

assertThat(createdWorkflow.nextActivation, equalTo(scheduled));

int parentActionId = addWorkflowAction(parentWorkflowId, i1, now);
int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100));
assertThat(parentActionId, not(equalTo(-1)));

int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId);
Expand All @@ -783,7 +783,7 @@ public void wakeUpWorkflowExternallyWorksWithExpectedStates() {

assertThat(createdWorkflow.nextActivation, equalTo(scheduled));

int parentActionId = addWorkflowAction(parentWorkflowId, i1, now);
int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100));
assertThat(parentActionId, not(equalTo(-1)));

int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId);
Expand All @@ -808,7 +808,7 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() {

assertThat(createdWorkflow.nextActivation, equalTo(scheduled));

int parentActionId = addWorkflowAction(parentWorkflowId, i1, now);
int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100));
assertThat(parentActionId, not(equalTo(-1)));

int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId);
Expand Down Expand Up @@ -903,6 +903,22 @@ public void clearingSignalInsertsAction() {
assertThat(dao.getSignal(instanceId), is(Optional.empty()));
}

@Test
public void deleteExpiredWorkflowHistory() {
WorkflowInstance i1 = constructWorkflowInstanceBuilder().build();
int workflowId = dao.insertWorkflowInstance(i1);
addWorkflowAction(workflowId, new WorkflowInstance.Builder(i1).putStateVariable("deletedVariable", "deletedValue").build(), now(), now());
addWorkflowAction(workflowId, new WorkflowInstance.Builder(i1).putStateVariable("preservedVariable", "preservedValue").build(), now(), now().plusDays(1));

assertThat(dao.deleteWorkflowInstanceHistory(workflowId, 0), equalTo(1));

i1 = dao.getWorkflowInstance(workflowId, EnumSet.allOf(WorkflowInstanceInclude.class), null);
assertThat(i1.getStateVariable("requestData"), equalTo("{ \"parameter\": \"abc\" }"));
assertThat(i1.getStateVariable("deletedVariable"), is(nullValue()));
assertThat(i1.getStateVariable("preservedVariable"), equalTo("preservedValue"));
assertThat(i1.actions.size(), equalTo(1));
}

private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance i2) {
assertThat(i1.type, equalTo(i2.type));
assertThat(i1.executorId, equalTo(i2.executorId));
Expand All @@ -924,9 +940,9 @@ private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance
assertThat(i1.started, equalTo(i2.started));
}

private int addWorkflowAction(int workflowId, final WorkflowInstance instance, DateTime started) {
private int addWorkflowAction(int workflowId, final WorkflowInstance instance, DateTime started, DateTime ended) {
final WorkflowInstanceAction action = new WorkflowInstanceAction.Builder().setExecutionStart(started).setExecutorId(42)
.setExecutionEnd(started.plusMillis(100)).setRetryNo(1).setType(stateExecution).setState("test")
.setExecutionEnd(ended).setRetryNo(1).setType(stateExecution).setState("test")
.setStateText("state text")
.setWorkflowInstanceId(workflowId).build();
int actionId = transaction.execute(new TransactionCallback<Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
Expand All @@ -43,9 +44,11 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;

import org.hamcrest.Description;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -85,6 +88,7 @@
import io.nflow.engine.workflow.definition.TestWorkflow;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowDefinitionTest.TestDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;
import io.nflow.engine.workflow.instance.WorkflowInstance;
Expand Down Expand Up @@ -120,6 +124,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest {
@Mock
StateExecutionImpl executionMock;

@Mock
Random rnd;

@Captor
ArgumentCaptor<WorkflowInstance> update;

Expand Down Expand Up @@ -175,6 +182,8 @@ public void setup() {
filterChain(listener1);
filterChain(listener2);
when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries);
when(rnd.nextInt(anyInt())).thenReturn(0);
executor.setRandom(rnd);
}

@After
Expand Down Expand Up @@ -852,6 +861,35 @@ public void saveStateRetryAfterFailedPersistence() throws InterruptedException {
verify(workflowInstanceDao, atLeast(2)).updateWorkflowInstanceAfterExecution(any(), any(), any(), any(), anyBoolean());
}

@Test
public void cleanupWorkflowInstanceHistoryNotExecutedRoughlyNineTimesOfTen() {
WorkflowInstance instance = executingInstanceBuilder().setType("execute-test").setState("start").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
IntStream.range(1, 10).forEach(i -> {
when(rnd.nextInt(anyInt())).thenReturn(i);
executor.run();
verify(workflowInstanceDao, never()).deleteWorkflowInstanceHistory(any(), any());
});
}

@Test
public void cleanupWorkflowInstanceHistoryNotExecutedBasedOnSettings() {
WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("start").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
executor.run();

verify(workflowInstanceDao, never()).deleteWorkflowInstanceHistory(any(), any());
}

@Test
public void cleanupWorkflowInstanceHistoryExecutedRoughlyOneTimeofTen() {
WorkflowInstance instance = executingInstanceBuilder().setType("execute-test").setState("start").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
executor.run();

verify(workflowInstanceDao).deleteWorkflowInstanceHistory(instance.id, executeWf.getSettings().historyDeletableAfterHours);
}

public static class Pojo {
public String field;
public boolean test;
Expand All @@ -863,7 +901,7 @@ public static class ExecuteTestWorkflow extends
WorkflowDefinition<ExecuteTestWorkflow.State> {

protected ExecuteTestWorkflow() {
super("test", State.start, State.error);
super("test", State.start, State.error, new WorkflowSettings.Builder().setHistoryDeletableAfterHours(1).build());
permit(State.start, State.process, State.error);
permit(State.process, State.done, State.error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.IsNull.nullValue;
import static org.joda.time.DateTimeUtils.currentTimeMillis;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -34,6 +35,7 @@ public void verifyConstantDefaultValues() {
long delta = s.getShortTransitionActivation().getMillis() - currentTimeMillis() - 30000;
assertThat(delta, greaterThanOrEqualTo(-1000L));
assertThat(delta, lessThanOrEqualTo(0L));
assertThat(s.historyDeletableAfterHours, is(nullValue()));
}

@Test
Expand Down

0 comments on commit 8e02b13

Please sign in to comment.