Skip to content

Commit

Permalink
Merge pull request #303 from NitorCreations/pause-and-unpause-to-work…
Browse files Browse the repository at this point in the history
…flow-lifecycle

Add pause() and resume() to WorkflowLifecycle
  • Loading branch information
ttiurani authored Apr 2, 2019
2 parents cae321c + 1f2cf7f commit 3a3867b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Update libraries for nFlow Explorer. Includes fix for morgan library security issue.
- https://github.com/NitorCreations/nflow/network/alert/nflow-explorer/package-lock.json/morgan/open
- Fix travis build to actually run unit tests for nflow-explorer module.
- Add possibility for an executor to temporarily stop polling for new workflow instances by invoking pause() on WorkflowLifecycle, and continue polling with resume().

## 5.4.1 (2019-03-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class WorkflowDispatcher implements Runnable {

private volatile boolean shutdownRequested;
private volatile boolean running = false;
private volatile boolean paused = false;
private final CountDownLatch shutdownDone = new CountDownLatch(1);

private final WorkflowInstanceExecutor executor;
Expand Down Expand Up @@ -70,27 +71,31 @@ public void run() {
}
running = true;
while (!shutdownRequested) {
try {
executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil());

if (!shutdownRequested) {
if (executorDao.tick()) {
workflowInstances.recoverWorkflowInstancesFromDeadNodes();
}
int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors();
if (potentiallyStuckProcessors > 0) {
periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)",
potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds);
if (paused) {
sleep(false);
} else {
try {
executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil());

if (!shutdownRequested) {
if (executorDao.tick()) {
workflowInstances.recoverWorkflowInstancesFromDeadNodes();
}
int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors();
if (potentiallyStuckProcessors > 0) {
periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)",
potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds);
}
dispatch(getNextInstanceIds());
}
dispatch(getNextInstanceIds());
} catch (PollingRaceConditionException pex) {
logger.info(pex.getMessage());
sleep(true);
} catch (@SuppressWarnings("unused") InterruptedException dropThrough) {
} catch (Exception e) {
logger.error("Exception in executing dispatcher - retrying after sleep period (" + e.getMessage() + ")", e);
sleep(false);
}
} catch (PollingRaceConditionException pex) {
logger.info(pex.getMessage());
sleep(true);
} catch (@SuppressWarnings("unused") InterruptedException dropThrough) {
} catch (Exception e) {
logger.error("Exception in executing dispatcher - retrying after sleep period (" + e.getMessage() + ")", e);
sleep(false);
}
}

Expand Down Expand Up @@ -121,6 +126,18 @@ public void shutdown() {
}
}

public void pause() {
paused = true;
}

public void resume() {
paused = false;
}

public boolean isPaused() {
return paused;
}

public boolean isRunning() {
return running;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public void start() {
dispatcherThread.start();
}

public void pause() {
dispatcher.pause();
}

public void resume() {
dispatcher.resume();
}

public boolean isPaused() {
return dispatcher.isPaused();
}

@Override
public boolean isRunning() {
return dispatcherThread.isAlive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,15 @@ public void finish() {
runOnce(new DispatcherLogsWarning());
}

@Test
public void pauseAndResumeWorks() {
assertEquals(dispatcher.isPaused(), false);
dispatcher.pause();
assertEquals(dispatcher.isPaused(), true);
dispatcher.resume();
assertEquals(dispatcher.isPaused(), false);
}

void assertPoolIsShutdown(boolean isTrue) {
assertEquals(isTrue, executor.executor.isShutdown());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public void stopWithCallbackStopsDispatcherThreadAndRunsCallback() {
verify(callback).run();
}

@Test
public void pauseAndResumeWorks() {
lifecycle.pause();
verify(dispatcher).pause();
lifecycle.resume();
verify(dispatcher).resume();
lifecycle.isPaused();
verify(dispatcher).isPaused();
}

// @Test
// public void isRunningReturnsDispatcherThreadStatus() {
// when(dispatcherThread.isAlive()).thenReturn(true); // native method mocking would require PowerMock
Expand Down

0 comments on commit 3a3867b

Please sign in to comment.