Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid race condition in max running processes #675

Merged
merged 7 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/aap_eda/services/activation/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class ActivationManagerError(Exception):
"""Base class for exceptions for the ActivationManager."""


class MaxRunningProcessesError(ActivationManagerError):
"""Exception raised when the maximum number processes is reached."""


class ActivationStartError(ActivationManagerError):
"""Exception raised when an activation fails to start."""

Expand Down
39 changes: 32 additions & 7 deletions src/aap_eda/services/activation/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,14 +1033,25 @@ def _create_activation_instance(self):
if hasattr(self.db_instance, "git_hash")
else ""
)
try:
args = {
"name": self.db_instance.name,
"status": ActivationStatus.STARTING,
"git_hash": git_hash,
}
args[f"{self.db_instance_type}"] = self.db_instance

if not self.check_new_process_allowed(
self.db_instance_type,
self.db_instance.id,
):
msg = (
"Failed to create rulebook process. "
"Reason: Max running processes reached. "
"Waiting for a free slot."
)
self._set_activation_status(ActivationStatus.PENDING, msg)
raise exceptions.MaxRunningProcessesError
args = {
"name": self.db_instance.name,
"status": ActivationStatus.STARTING,
"git_hash": git_hash,
}
args[f"{self.db_instance_type}"] = self.db_instance
try:
models.RulebookProcess.objects.create(**args)
except IntegrityError as exc:
msg = (
Expand All @@ -1060,3 +1071,17 @@ def _get_container_request(self) -> ContainerRequest:
)
LOGGER.exception(msg)
raise exceptions.ActivationManagerError(msg)

@staticmethod
def check_new_process_allowed(parent_type: str, parent_id: int) -> bool:
"""Check if a new process is allowed."""
num_running_processes = models.RulebookProcess.objects.filter(
status__in=[ActivationStatus.RUNNING, ActivationStatus.STARTING],
).count()
if num_running_processes >= settings.MAX_RUNNING_ACTIVATIONS:
LOGGER.info(
"No capacity to start a new rulebook process. "
f"{parent_type} {parent_id} is postponed",
)
return False
return True
28 changes: 9 additions & 19 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import logging
from typing import Union

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist

import aap_eda.tasks.activation_request_queue as requests_queue
Expand All @@ -27,6 +26,7 @@
)
from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream
from aap_eda.core.tasking import unique_enqueue
from aap_eda.services.activation import exceptions
from aap_eda.services.activation.manager import ActivationManager

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,8 +90,12 @@ def _run_request(
f"{process_parent.id}",
)
start_commands = [ActivationRequest.START, ActivationRequest.AUTO_START]
if request.request in start_commands and not _can_start_new_process(
process_parent,
if (
request.request in start_commands
and not ActivationManager.check_new_process_allowed(
process_parent_type,
process_parent.id,
)
):
return False

Expand All @@ -107,6 +111,8 @@ def _run_request(
manager.restart()
elif request.request == ActivationRequest.DELETE:
manager.delete()
except exceptions.MaxRunningProcessesError:
return False
except Exception as e:
LOGGER.exception(
f"Failed to process request {request.request} for "
Expand All @@ -115,22 +121,6 @@ def _run_request(
return True


def _can_start_new_process(
process_parent: Union[Activation, EventStream]
) -> bool:
num_running_processes = models.RulebookProcess.objects.filter(
status__in=[ActivationStatus.RUNNING, ActivationStatus.STARTING],
).count()
if num_running_processes >= settings.MAX_RUNNING_ACTIVATIONS:
process_parent_type = type(process_parent).__name__
LOGGER.info(
"No capacity to start a new rulebook process. "
f"{process_parent_type} {process_parent.id} is postponed",
)
return False
return True


def _make_user_request(
process_parent_type: ProcessParentType,
id: int,
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/services/activation/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ def basic_activation(
)


@pytest.fixture
def new_activation_with_instance(
default_user: models.User,
default_decision_environment: models.DecisionEnvironment,
default_rulebook: models.Rulebook,
) -> models.Activation:
"""Return an activation with an instance."""
activation = models.Activation.objects.create(
name="new_activation_with_instance",
user=default_user,
decision_environment=default_decision_environment,
rulebook=default_rulebook,
# rulebook_rulesets is populated by the serializer
rulebook_rulesets=default_rulebook.rulesets,
)
models.RulebookProcess.objects.create(
activation=activation,
status=ActivationStatus.RUNNING,
)
return activation


@pytest.fixture
def container_engine_mock() -> MagicMock:
return create_autospec(ContainerEngine, instance=True)
Expand Down Expand Up @@ -554,3 +576,19 @@ def test_delete_with_exception(
assert (
models.Activation.objects.filter(id=running_activation.id).count() == 0
)


@pytest.mark.django_db
def test_start_max_running_activations(
new_activation_with_instance: models.Activation,
basic_activation: models.Activation,
settings: SettingsWrapper,
eda_caplog: LogCaptureFixture,
):
"""Test start verb when max running activations is reached."""
apply_settings(settings, MAX_RUNNING_ACTIVATIONS=1)
activation_manager = ActivationManager(basic_activation)

with pytest.raises(exceptions.MaxRunningProcessesError):
activation_manager.start()
assert "No capacity to start a new rulebook process" in eda_caplog.text
81 changes: 74 additions & 7 deletions tests/integration/tasks/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,58 @@
from django.conf import settings

import aap_eda.tasks.activation_request_queue as queue
import aap_eda.tasks.orchestrator as orchestrator
from aap_eda.core import models
from aap_eda.core.enums import (
ActivationRequest,
ActivationStatus,
ProcessParentType,
)
from aap_eda.tasks import orchestrator


@pytest.fixture
def default_rulebook() -> models.Rulebook:
"""Return a default rulebook."""
rulesets = """
---
- name: Hello World
hosts: all
sources:
- ansible.eda.range:
limit: 5
rules:
- name: Say Hello
condition: event.i == 1
action:
debug:
msg: "Hello World!"

"""
return models.Rulebook.objects.create(
name="test-rulebook",
rulesets=rulesets,
)


@pytest.fixture()
def activation():
def activation(default_rulebook):
user = models.User.objects.create_user(
username="luke.skywalker",
first_name="Luke",
last_name="Skywalker",
email="[email protected]",
password="secret",
)
decision_environment = models.DecisionEnvironment.objects.create(
name="test-decision-environment",
image_url="localhost:14000/test-image-url",
)
return models.Activation.objects.create(
name="test1",
user=user,
decision_environment=decision_environment,
rulebook=default_rulebook,
rulebook_rulesets=default_rulebook.rulesets,
)


Expand Down Expand Up @@ -106,17 +137,15 @@ def test_manage_request(manager_mock, activation, verb):


@pytest.mark.django_db
@mock.patch("aap_eda.tasks.orchestrator.ActivationManager")
def test_manage_not_start(manager_mock, activation, max_running_processes):
@mock.patch.object(orchestrator.ActivationManager, "start", autospec=True)
def test_manage_not_start(start_mock, activation, max_running_processes):
queue.push(
ProcessParentType.ACTIVATION, activation.id, ActivationRequest.START
)
manager_instance_mock = mock.Mock()
manager_mock.return_value = manager_instance_mock

orchestrator._manage(ProcessParentType.ACTIVATION, activation.id)

manager_instance_mock.start.assert_not_called()
start_mock.assert_not_called()
assert (
len(queue.peek_all(ProcessParentType.ACTIVATION, activation.id)) == 1
)
Expand Down Expand Up @@ -194,3 +223,41 @@ def test_monitor_rulebook_processes(
orchestrator.monitor_rulebook_processes()

enqueue_mock.assert_has_calls(call_args, any_order=True)


original_start_method = orchestrator.ActivationManager.start


@pytest.mark.django_db
@mock.patch.object(orchestrator.ActivationManager, "start", autospec=True)
def test_max_running_activation_after_start_job(
start_mock,
activation,
max_running_processes,
):
"""Check if the max running processes error is handled correctly
when the limit is reached after the request is started."""

def side_effect(*args, **kwargs):
# Recreate the process and run the original start method
instance = args[0]
models.RulebookProcess.objects.create(
name="running",
activation=max_running_processes[0].activation,
status=ActivationStatus.RUNNING,
)
original_start_method(instance, *args[1:], **kwargs)

start_mock.side_effect = side_effect

max_running_processes[0].delete()

queue.push(
ProcessParentType.ACTIVATION, activation.id, ActivationRequest.START
)
orchestrator._manage(ProcessParentType.ACTIVATION, activation.id)
assert start_mock.call_count == 1
running_processes = models.RulebookProcess.objects.filter(
Alex-Izquierdo marked this conversation as resolved.
Show resolved Hide resolved
status__in=[ActivationStatus.STARTING, ActivationStatus.RUNNING]
).count()
assert running_processes == settings.MAX_RUNNING_ACTIVATIONS
Loading