Skip to content

Commit

Permalink
Merge pull request CESNET#3137 from Johaney-s/dev
Browse files Browse the repository at this point in the history
Suspending propagating tasks to engine
  • Loading branch information
zlamalp authored Mar 30, 2021
2 parents 20722e1 + 461299d commit be7057e
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 0 deletions.
5 changes: 5 additions & 0 deletions perun-base/src/main/resources/perun-roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4120,6 +4120,11 @@ perun_policies:
include_policies:
- default_policy

suspendTasksPropagation_policy:
policy_roles: []
include_policies:
- default_policy

getFacilityAssignedServicesForGUI_Facility_policy:
policy_roles:
- FACILITYADMIN: Facility
Expand Down
10 changes: 10 additions & 0 deletions perun-cli/Perun/TasksAgent.pm
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,14 @@ sub getTaskResultsForDestinations
return Perun::Common::callManagerMethod('getTaskResultsForDestinations', '[]TaskResult', @_);
}

sub suspendTasksPropagation
{
return Perun::Common::callManagerMethod('suspendTasksPropagation', '', @_);
}

sub resumeTasksPropagation
{
return Perun::Common::callManagerMethod('resumeTasksPropagation', '', @_);
}

1;
31 changes: 31 additions & 0 deletions perun-cli/resumeTasksPropagation
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/perl

use strict;
use warnings;
use Getopt::Long qw(:config no_ignore_case);
use Perun::Agent;
use Perun::Common qw(printMessage);

sub help {
return qq{
Resumes tasks propagation to engine.
--------------------------------------
Available options:
--batch | -b batch
--help | -h prints this help
};
}

my ($batch);
GetOptions ("help|h" => sub {
print help();
exit 0;
}, "batch|b" => \$batch) || die help();

my $agent = Perun::Agent->new();
my $tasksAgent = $agent->getTasksAgent;

$tasksAgent->resumeTasksPropagation();

printMessage("Tasks propagation resumed.", $batch);
31 changes: 31 additions & 0 deletions perun-cli/suspendTasksPropagation
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/perl

use strict;
use warnings;
use Getopt::Long qw(:config no_ignore_case);
use Perun::Agent;
use Perun::Common qw(printMessage);

sub help {
return qq{
Suspends tasks propagation to engine.
--------------------------------------
Available options:
--batch | -b batch
--help | -h prints this help
};
}

my ($batch);
GetOptions ("help|h" => sub {
print help();
exit 0;
}, "batch|b" => \$batch) || die help();

my $agent = Perun::Agent->new();
my $tasksAgent = $agent->getTasksAgent;

$tasksAgent->suspendTasksPropagation();

printMessage("Tasks propagation is suspended.", $batch);
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,13 @@ public interface TasksManager {
*/
List<Task> listAllTasksInState(PerunSession perunSession, Task.TaskStatus state) throws PrivilegeException;

/**
* Suspends tasks propagation to engine.
*
* @param perunSession
* @param suspend True to suspend propagation, false to resume propagation
* @throws PrivilegeException
*/
void suspendTasksPropagation(PerunSession perunSession, boolean suspend) throws PrivilegeException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,18 @@ public interface TasksManagerBl {
*/
void updateTask(PerunSession sess, Task task);

/**
* Suspend propagating tasks to engine.
*
* @param sess
* @param suspend True for suspending propagation, false for resuming propagation
*/
void suspendTasksPropagation(PerunSession sess, boolean suspend);

/**
* Check if propagating tasks to engine is suspended.
*
* @return True if suspended, false if propagating
*/
boolean isSuspendedTasksPropagation();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class TasksManagerBlImpl implements TasksManagerBl {
@Autowired
protected TasksManagerImplApi tasksManagerImpl;

private static boolean suspendedTasksPropagation = false; //are tasks stopped from propagating to engine

// -------------- constructors

public TasksManagerBlImpl(TasksManagerImplApi tasksManagerImpl) {
Expand Down Expand Up @@ -400,4 +402,18 @@ public void updateTask(PerunSession sess, Task task) {
getTasksManagerImpl().updateTask(task);
}

@Override
public void suspendTasksPropagation(PerunSession perunSession, boolean suspend) {
synchronized(TasksManagerBlImpl.class) {
suspendedTasksPropagation = suspend;
}
}

@Override
public boolean isSuspendedTasksPropagation() {
synchronized(TasksManagerBlImpl.class) {
return suspendedTasksPropagation;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ public List<Task> listAllTasksInState(PerunSession perunSession, Task.TaskStatus
return tasksManagerBl.listAllTasksInState(perunSession, state);
}

@Override
public void suspendTasksPropagation(PerunSession perunSession, boolean suspend) throws PrivilegeException {
// Authorization
if (!AuthzResolver.authorizedInternal(perunSession, "suspendTasksPropagation_policy")) {
throw new PrivilegeException(perunSession, "suspendTasksPropagation");
}
tasksManagerBl.suspendTasksPropagation(perunSession, suspend);
}


public void setPerunBl(PerunBl perunBl) {
this.perun = perunBl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ public void run() {
TaskSchedule schedule;
while (!shouldStop()) {
try {
if (tasksManagerBl.isSuspendedTasksPropagation()) {
// do not continue sending tasks to the engine until propagation is set to resume
waitForResumingPropagation();
}
schedule = getWaitingTaskSchedule();
} catch (InterruptedException e) {
String message = "Thread was interrupted, cannot continue.";
Expand Down Expand Up @@ -214,6 +218,22 @@ private TaskSchedule getWaitingTaskSchedule() throws InterruptedException {
return taskSchedule;
}

/**
* Method waiting for propagation of tasks to the engine to be resumed.
* Called when propagation was suspended.
* @throws InterruptedException Waiting thread was interrupted.
*/
private void waitForResumingPropagation() throws InterruptedException {
int sleepTime = 10000;
while (tasksManagerBl.isSuspendedTasksPropagation()) {
log.debug("Propagation of tasks is suspended.");
log.debug(schedulingPool.getReport());
log.debug("WaitingTasksQueue has {} normal Tasks and {} forced Tasks.", waitingTasksQueue.size(), waitingForcedTasksQueue.size());
Thread.sleep(sleepTime);
}
log.debug("Propagation of tasks is resumed.");
}

/**
* Send Task to Engine. Called when it waited long enough in a waiting queue (listening for other changes).
*
Expand Down
14 changes: 14 additions & 0 deletions perun-openapi/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15398,6 +15398,20 @@ paths:
destinationName: { type: string, description: "destination name", nullable: false }
destinationType: { type: string, description: "destination type", nullable: false }

/json/tasksManager/suspendTasksPropagation:
post:
tags:
- TasksManager
operationId: suspendTasksPropagation
summary: Suspends waiting tasks propagation to the engine. Does not affect already propagated tasks.
parameters:
- { name: suspend, description: "if true stops propagating waiting tasks to the engine, if false resumes propagation", schema: { type: boolean }, in: query, required: true }
responses:
'200':
$ref: '#/components/responses/VoidResponse'
default:
$ref: '#/components/responses/ExceptionResponse'

#################################################
# #
# RTMessagesManager #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,33 @@ public Void call(ApiCaller ac, Deserializer parms) throws PerunException {
}
return null;
}
},

/*#
* Stops dispatcher from propagating waiting tasks to the engine.
* Tasks which were sent to the engine before won't be affected and will be finished.
*/
suspendTasksPropagation {
public Void call(ApiCaller ac, Deserializer parms) throws PerunException {
parms.stateChangingCheck();
ac.getTasksManager().suspendTasksPropagation(
ac.getSession(),
true);
return null;
}
},

/*#
* Resumes dispatcher's tasks propagation to the engine.
*/
resumeTasksPropagation {
public Void call(ApiCaller ac, Deserializer parms) throws PerunException {
parms.stateChangingCheck();
ac.getTasksManager().suspendTasksPropagation(
ac.getSession(),
false);
return null;
}
};

}

0 comments on commit be7057e

Please sign in to comment.