From 461299d852dd30bdc9894ae72c1bf9951d679931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johana=20Sup=C3=ADkov=C3=A1?= Date: Fri, 26 Mar 2021 10:38:36 +0100 Subject: [PATCH] Suspending propagating tasks to engine - From CLI allows stopping sending waiting tasks to the engine - Added to OpenAPI --- perun-base/src/main/resources/perun-roles.yml | 5 +++ perun-cli/Perun/TasksAgent.pm | 10 ++++++ perun-cli/resumeTasksPropagation | 31 +++++++++++++++++++ perun-cli/suspendTasksPropagation | 31 +++++++++++++++++++ .../perun/core/api/TasksManager.java | 8 +++++ .../perun/core/bl/TasksManagerBl.java | 14 +++++++++ .../perun/core/blImpl/TasksManagerBlImpl.java | 16 ++++++++++ .../perun/core/entry/TasksManagerEntry.java | 10 ++++++ .../dispatcher/scheduling/TaskScheduler.java | 20 ++++++++++++ perun-openapi/openapi.yml | 14 +++++++++ .../perun/rpc/methods/TasksManagerMethod.java | 27 ++++++++++++++++ 11 files changed, 186 insertions(+) create mode 100755 perun-cli/resumeTasksPropagation create mode 100755 perun-cli/suspendTasksPropagation diff --git a/perun-base/src/main/resources/perun-roles.yml b/perun-base/src/main/resources/perun-roles.yml index a1c0b9f183..5f3b04de68 100644 --- a/perun-base/src/main/resources/perun-roles.yml +++ b/perun-base/src/main/resources/perun-roles.yml @@ -4120,6 +4120,11 @@ perun_policies: include_policies: - default_policy + suspendTasksPropagation_policy: + policy_roles: [] + include_policies: + - default_policy + getFacilityAssignedServicesForGUI_Facility_policy: policy_roles: - FACILITYADMIN: Facility diff --git a/perun-cli/Perun/TasksAgent.pm b/perun-cli/Perun/TasksAgent.pm index 8f59d09b6f..9ff975d87d 100644 --- a/perun-cli/Perun/TasksAgent.pm +++ b/perun-cli/Perun/TasksAgent.pm @@ -23,4 +23,14 @@ sub getTaskResultsForDestinations return Perun::Common::callManagerMethod('getTaskResultsForDestinations', '[]TaskResult', @_); } +sub suspendTasksPropagation +{ + return Perun::Common::callManagerMethod('suspendTasksPropagation', '', @_); +} + +sub resumeTasksPropagation +{ + return Perun::Common::callManagerMethod('resumeTasksPropagation', '', @_); +} + 1; diff --git a/perun-cli/resumeTasksPropagation b/perun-cli/resumeTasksPropagation new file mode 100755 index 0000000000..0a14e31e99 --- /dev/null +++ b/perun-cli/resumeTasksPropagation @@ -0,0 +1,31 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use Getopt::Long qw(:config no_ignore_case); +use Perun::Agent; +use Perun::Common qw(printMessage); + +sub help { + return qq{ + Resumes tasks propagation to engine. + -------------------------------------- + Available options: + --batch | -b batch + --help | -h prints this help + + }; +} + +my ($batch); +GetOptions ("help|h" => sub { + print help(); + exit 0; +}, "batch|b" => \$batch) || die help(); + +my $agent = Perun::Agent->new(); +my $tasksAgent = $agent->getTasksAgent; + +$tasksAgent->resumeTasksPropagation(); + +printMessage("Tasks propagation resumed.", $batch); diff --git a/perun-cli/suspendTasksPropagation b/perun-cli/suspendTasksPropagation new file mode 100755 index 0000000000..4d116468bc --- /dev/null +++ b/perun-cli/suspendTasksPropagation @@ -0,0 +1,31 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use Getopt::Long qw(:config no_ignore_case); +use Perun::Agent; +use Perun::Common qw(printMessage); + +sub help { + return qq{ + Suspends tasks propagation to engine. + -------------------------------------- + Available options: + --batch | -b batch + --help | -h prints this help + + }; +} + +my ($batch); +GetOptions ("help|h" => sub { + print help(); + exit 0; +}, "batch|b" => \$batch) || die help(); + +my $agent = Perun::Agent->new(); +my $tasksAgent = $agent->getTasksAgent; + +$tasksAgent->suspendTasksPropagation(); + +printMessage("Tasks propagation is suspended.", $batch); diff --git a/perun-core/src/main/java/cz/metacentrum/perun/core/api/TasksManager.java b/perun-core/src/main/java/cz/metacentrum/perun/core/api/TasksManager.java index f48e4af800..c9a86f7501 100644 --- a/perun-core/src/main/java/cz/metacentrum/perun/core/api/TasksManager.java +++ b/perun-core/src/main/java/cz/metacentrum/perun/core/api/TasksManager.java @@ -244,5 +244,13 @@ public interface TasksManager { */ List listAllTasksInState(PerunSession perunSession, Task.TaskStatus state) throws PrivilegeException; + /** + * Suspends tasks propagation to engine. + * + * @param perunSession + * @param suspend True to suspend propagation, false to resume propagation + * @throws PrivilegeException + */ + void suspendTasksPropagation(PerunSession perunSession, boolean suspend) throws PrivilegeException; } diff --git a/perun-core/src/main/java/cz/metacentrum/perun/core/bl/TasksManagerBl.java b/perun-core/src/main/java/cz/metacentrum/perun/core/bl/TasksManagerBl.java index e5bf250c61..487fddffa5 100644 --- a/perun-core/src/main/java/cz/metacentrum/perun/core/bl/TasksManagerBl.java +++ b/perun-core/src/main/java/cz/metacentrum/perun/core/bl/TasksManagerBl.java @@ -311,4 +311,18 @@ public interface TasksManagerBl { */ void updateTask(PerunSession sess, Task task); + /** + * Suspend propagating tasks to engine. + * + * @param sess + * @param suspend True for suspending propagation, false for resuming propagation + */ + void suspendTasksPropagation(PerunSession sess, boolean suspend); + + /** + * Check if propagating tasks to engine is suspended. + * + * @return True if suspended, false if propagating + */ + boolean isSuspendedTasksPropagation(); } diff --git a/perun-core/src/main/java/cz/metacentrum/perun/core/blImpl/TasksManagerBlImpl.java b/perun-core/src/main/java/cz/metacentrum/perun/core/blImpl/TasksManagerBlImpl.java index 7a571f2eb5..34acd4a402 100644 --- a/perun-core/src/main/java/cz/metacentrum/perun/core/blImpl/TasksManagerBlImpl.java +++ b/perun-core/src/main/java/cz/metacentrum/perun/core/blImpl/TasksManagerBlImpl.java @@ -41,6 +41,8 @@ public class TasksManagerBlImpl implements TasksManagerBl { @Autowired protected TasksManagerImplApi tasksManagerImpl; + private static boolean suspendedTasksPropagation = false; //are tasks stopped from propagating to engine + // -------------- constructors public TasksManagerBlImpl(TasksManagerImplApi tasksManagerImpl) { @@ -400,4 +402,18 @@ public void updateTask(PerunSession sess, Task task) { getTasksManagerImpl().updateTask(task); } + @Override + public void suspendTasksPropagation(PerunSession perunSession, boolean suspend) { + synchronized(TasksManagerBlImpl.class) { + suspendedTasksPropagation = suspend; + } + } + + @Override + public boolean isSuspendedTasksPropagation() { + synchronized(TasksManagerBlImpl.class) { + return suspendedTasksPropagation; + } + } + } diff --git a/perun-core/src/main/java/cz/metacentrum/perun/core/entry/TasksManagerEntry.java b/perun-core/src/main/java/cz/metacentrum/perun/core/entry/TasksManagerEntry.java index e4a4ba5988..c998af2dbc 100644 --- a/perun-core/src/main/java/cz/metacentrum/perun/core/entry/TasksManagerEntry.java +++ b/perun-core/src/main/java/cz/metacentrum/perun/core/entry/TasksManagerEntry.java @@ -262,6 +262,16 @@ public List listAllTasksInState(PerunSession perunSession, Task.TaskStatus return tasksManagerBl.listAllTasksInState(perunSession, state); } + @Override + public void suspendTasksPropagation(PerunSession perunSession, boolean suspend) throws PrivilegeException { + // Authorization + if (!AuthzResolver.authorizedInternal(perunSession, "suspendTasksPropagation_policy")) { + throw new PrivilegeException(perunSession, "suspendTasksPropagation"); + } + tasksManagerBl.suspendTasksPropagation(perunSession, suspend); + } + + public void setPerunBl(PerunBl perunBl) { this.perun = perunBl; } diff --git a/perun-dispatcher/src/main/java/cz/metacentrum/perun/dispatcher/scheduling/TaskScheduler.java b/perun-dispatcher/src/main/java/cz/metacentrum/perun/dispatcher/scheduling/TaskScheduler.java index f97736e858..1ea6b2b6de 100644 --- a/perun-dispatcher/src/main/java/cz/metacentrum/perun/dispatcher/scheduling/TaskScheduler.java +++ b/perun-dispatcher/src/main/java/cz/metacentrum/perun/dispatcher/scheduling/TaskScheduler.java @@ -148,6 +148,10 @@ public void run() { TaskSchedule schedule; while (!shouldStop()) { try { + if (tasksManagerBl.isSuspendedTasksPropagation()) { + // do not continue sending tasks to the engine until propagation is set to resume + waitForResumingPropagation(); + } schedule = getWaitingTaskSchedule(); } catch (InterruptedException e) { String message = "Thread was interrupted, cannot continue."; @@ -214,6 +218,22 @@ private TaskSchedule getWaitingTaskSchedule() throws InterruptedException { return taskSchedule; } + /** + * Method waiting for propagation of tasks to the engine to be resumed. + * Called when propagation was suspended. + * @throws InterruptedException Waiting thread was interrupted. + */ + private void waitForResumingPropagation() throws InterruptedException { + int sleepTime = 10000; + while (tasksManagerBl.isSuspendedTasksPropagation()) { + log.debug("Propagation of tasks is suspended."); + log.debug(schedulingPool.getReport()); + log.debug("WaitingTasksQueue has {} normal Tasks and {} forced Tasks.", waitingTasksQueue.size(), waitingForcedTasksQueue.size()); + Thread.sleep(sleepTime); + } + log.debug("Propagation of tasks is resumed."); + } + /** * Send Task to Engine. Called when it waited long enough in a waiting queue (listening for other changes). * diff --git a/perun-openapi/openapi.yml b/perun-openapi/openapi.yml index 10b98b0bc8..017d951355 100644 --- a/perun-openapi/openapi.yml +++ b/perun-openapi/openapi.yml @@ -15339,6 +15339,20 @@ paths: destinationName: { type: string, description: "destination name", nullable: false } destinationType: { type: string, description: "destination type", nullable: false } + /json/tasksManager/suspendTasksPropagation: + post: + tags: + - TasksManager + operationId: suspendTasksPropagation + summary: Suspends waiting tasks propagation to the engine. Does not affect already propagated tasks. + parameters: + - { name: suspend, description: "if true stops propagating waiting tasks to the engine, if false resumes propagation", schema: { type: boolean }, in: query, required: true } + responses: + '200': + $ref: '#/components/responses/VoidResponse' + default: + $ref: '#/components/responses/ExceptionResponse' + ################################################# # # # RTMessagesManager # diff --git a/perun-rpc/src/main/java/cz/metacentrum/perun/rpc/methods/TasksManagerMethod.java b/perun-rpc/src/main/java/cz/metacentrum/perun/rpc/methods/TasksManagerMethod.java index 74900baa38..471f81a4bc 100644 --- a/perun-rpc/src/main/java/cz/metacentrum/perun/rpc/methods/TasksManagerMethod.java +++ b/perun-rpc/src/main/java/cz/metacentrum/perun/rpc/methods/TasksManagerMethod.java @@ -326,6 +326,33 @@ public Void call(ApiCaller ac, Deserializer parms) throws PerunException { } return null; } + }, + + /*# + * Stops dispatcher from propagating waiting tasks to the engine. + * Tasks which were sent to the engine before won't be affected and will be finished. + */ + suspendTasksPropagation { + public Void call(ApiCaller ac, Deserializer parms) throws PerunException { + parms.stateChangingCheck(); + ac.getTasksManager().suspendTasksPropagation( + ac.getSession(), + true); + return null; + } + }, + + /*# + * Resumes dispatcher's tasks propagation to the engine. + */ + resumeTasksPropagation { + public Void call(ApiCaller ac, Deserializer parms) throws PerunException { + parms.stateChangingCheck(); + ac.getTasksManager().suspendTasksPropagation( + ac.getSession(), + false); + return null; + } }; }