diff --git a/CHANGELOG.md b/CHANGELOG.md index eac6943d7..473c4be29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Update libraries for nFlow Explorer. Includes fix for morgan library security issue. - https://github.com/NitorCreations/nflow/network/alert/nflow-explorer/package-lock.json/morgan/open - Fix travis build to actually run unit tests for nflow-explorer module. +- Add possibility for an executor to temporarily stop polling for new workflow instances by invoking pause() on WorkflowLifecycle, and continue polling with resume(). ## 5.4.1 (2019-03-18) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 0fde69214..e0680828f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -30,6 +30,7 @@ public class WorkflowDispatcher implements Runnable { private volatile boolean shutdownRequested; private volatile boolean running = false; + private volatile boolean paused = false; private final CountDownLatch shutdownDone = new CountDownLatch(1); private final WorkflowInstanceExecutor executor; @@ -70,27 +71,31 @@ public void run() { } running = true; while (!shutdownRequested) { - try { - executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil()); - - if (!shutdownRequested) { - if (executorDao.tick()) { - workflowInstances.recoverWorkflowInstancesFromDeadNodes(); - } - int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors(); - if (potentiallyStuckProcessors > 0) { - periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)", - potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds); + if (paused) { + sleep(false); + } else { + try { + executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil()); + + if (!shutdownRequested) { + if (executorDao.tick()) { + workflowInstances.recoverWorkflowInstancesFromDeadNodes(); + } + int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors(); + if (potentiallyStuckProcessors > 0) { + periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)", + potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds); + } + dispatch(getNextInstanceIds()); } - dispatch(getNextInstanceIds()); + } catch (PollingRaceConditionException pex) { + logger.info(pex.getMessage()); + sleep(true); + } catch (@SuppressWarnings("unused") InterruptedException dropThrough) { + } catch (Exception e) { + logger.error("Exception in executing dispatcher - retrying after sleep period (" + e.getMessage() + ")", e); + sleep(false); } - } catch (PollingRaceConditionException pex) { - logger.info(pex.getMessage()); - sleep(true); - } catch (@SuppressWarnings("unused") InterruptedException dropThrough) { - } catch (Exception e) { - logger.error("Exception in executing dispatcher - retrying after sleep period (" + e.getMessage() + ")", e); - sleep(false); } } @@ -121,6 +126,18 @@ public void shutdown() { } } + public void pause() { + paused = true; + } + + public void resume() { + paused = false; + } + + public boolean isPaused() { + return paused; + } + public boolean isRunning() { return running; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java index 0bf105a9c..739a4a247 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java @@ -57,6 +57,18 @@ public void start() { dispatcherThread.start(); } + public void pause() { + dispatcher.pause(); + } + + public void resume() { + dispatcher.resume(); + } + + public boolean isPaused() { + return dispatcher.isPaused(); + } + @Override public boolean isRunning() { return dispatcherThread.isAlive(); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index c037df2d7..9d5d2ccfe 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -325,6 +325,15 @@ public void finish() { runOnce(new DispatcherLogsWarning()); } + @Test + public void pauseAndResumeWorks() { + assertEquals(dispatcher.isPaused(), false); + dispatcher.pause(); + assertEquals(dispatcher.isPaused(), true); + dispatcher.resume(); + assertEquals(dispatcher.isPaused(), false); + } + void assertPoolIsShutdown(boolean isTrue) { assertEquals(isTrue, executor.executor.isShutdown()); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java index fde908edd..cd1c4ddd3 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java @@ -73,6 +73,16 @@ public void stopWithCallbackStopsDispatcherThreadAndRunsCallback() { verify(callback).run(); } + @Test + public void pauseAndResumeWorks() { + lifecycle.pause(); + verify(dispatcher).pause(); + lifecycle.resume(); + verify(dispatcher).resume(); + lifecycle.isPaused(); + verify(dispatcher).isPaused(); + } + // @Test // public void isRunningReturnsDispatcherThreadStatus() { // when(dispatcherThread.isAlive()).thenReturn(true); // native method mocking would require PowerMock