Skip to content

Commit

Permalink
Feature/rcs/application dependencies (#96)
Browse files Browse the repository at this point in the history
* add application depends_on declaration

* generalise dependency monitor

* integrate application dependencies
  • Loading branch information
blueskyjunkie authored Nov 3, 2021
1 parent 39fe7e7 commit d12acf4
Show file tree
Hide file tree
Showing 11 changed files with 428 additions and 176 deletions.
62 changes: 34 additions & 28 deletions foodx_devops_tools/deploy_me/_dependency_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import click

from foodx_devops_tools.pipeline_config import (
DependencyDeclarations,
IterationContext,
SingularFrameDefinition,
)

from ._exceptions import DeploymentTerminatedError
Expand Down Expand Up @@ -45,27 +45,31 @@ def _generate_dependency_contexts(
return dependency_context


async def _confirm_dependency_frame_status(
async def _confirm_dependency_entity_status(
dependency_names: typing.Set[str],
frame_status: DeploymentStatus,
entity_status: DeploymentStatus,
this_context: IterationContext,
status_monitor_sleep_seconds: float,
) -> typing.Set[str]:
"""Check that dependency frames are present in frame status."""
"""
Check that dependency entities are present in entity status.
Entities here are either frames or applications.
"""

async def report_missing_dependencies(
this_state: DeploymentState.ResultType,
console_report: bool,
) -> str:
nonlocal dependency_names, frame_status, this_context
nonlocal dependency_names, entity_status, this_context

message = "dependency frames not in frame status, {0}, {1}".format(
message = "dependency not in entity status, {0}, {1}".format(
this_context, str(dependency_names)
)
log.warning(message)
if console_report:
click.echo(click.style(message, fg="yellow"))
await frame_status.write(
await entity_status.write(
str(this_context),
this_state,
message=message,
Expand All @@ -84,13 +88,13 @@ async def report_missing_dependencies(
not_found = True
attempt_count = 0
while not_found and (attempt_count < STATUS_KEY_MAX_RETRIES):
frame_contexts = await frame_status.names()
if all([x in frame_contexts for x in dependency_contexts]):
entity_contexts = await entity_status.names()
if all([x in entity_contexts for x in dependency_contexts]):
not_found = False
log.debug(
"all dependencies found in status, {0}, {1}, "
"({2} retries)".format(
frame_contexts, dependency_contexts, attempt_count
entity_contexts, dependency_contexts, attempt_count
)
)
else:
Expand All @@ -109,43 +113,43 @@ async def report_missing_dependencies(
return dependency_contexts


async def process_dependencies(
async def wait_for_dependencies(
iteration_context: IterationContext,
frame_data: SingularFrameDefinition,
frame_status: DeploymentStatus,
dependency_data: DependencyDeclarations,
entity_status: DeploymentStatus,
) -> None:
"""
Wait for dependency frame status to succeed or fail.
Wait for dependency entity status to succeed or fail.
A timeout occurs if the specified duration is exceeded.
Args:
iteration_context: Current deployment hierarchy object.
frame_data: Frame configuration data.
frame_status: Status reporting object for frames.
dependency_data: Entity dependency data.
entity_status: Status reporting object for entities.
Raises:
DeploymentTerminatedError: If any dependencies don't complete or the
maximum wait duration is exceeded.
"""

async def cancel_deployment() -> None:
nonlocal frame_data, frame_status, iteration_context
nonlocal dependency_data, entity_status, iteration_context

message = "cancelled due to dependency failure, {0}, {1}".format(
iteration_context, str(frame_data.depends_on)
iteration_context, str(dependency_data)
)
log.error(message)
click.echo(click.style(message, fg="red"))
await frame_status.write(
await entity_status.write(
str(iteration_context),
DeploymentState.ResultType.cancelled,
message=message,
)
raise DeploymentTerminatedError(message)

async def report_success() -> None:
nonlocal frame_status, iteration_context
nonlocal entity_status, iteration_context

message = (
"dependencies completed. proceeding with deployment, {0}".format(
Expand All @@ -154,16 +158,16 @@ async def report_success() -> None:
)
log.info(message)
click.echo(click.style(message, fg="cyan"))
await frame_status.write(
await entity_status.write(
str(iteration_context), DeploymentState.ResultType.in_progress
)

# if there are no dependencies just skip dependency processing.
if frame_data.depends_on:
dependency_names = set(frame_data.depends_on)
dependency_contexts = await _confirm_dependency_frame_status(
if dependency_data:
dependency_names = set(dependency_data)
dependency_contexts = await _confirm_dependency_entity_status(
dependency_names,
frame_status,
entity_status,
iteration_context,
STATUS_KEY_RETRY_SLEEP_SECONDS,
)
Expand All @@ -176,20 +180,20 @@ async def report_success() -> None:
try:
await asyncio.gather(
*[
frame_status.wait_for_completion(x)
entity_status.wait_for_completion(x)
for x in dependency_contexts
]
)

dependency_status = [
await frame_status.read(x) for x in dependency_contexts
await entity_status.read(x) for x in dependency_contexts
]
if all_success(dependency_status):
await report_success()
elif any_completed_dirty(dependency_status):
await cancel_deployment()
except asyncio.TimeoutError:
await frame_status.write(
await entity_status.write(
str(iteration_context),
DeploymentState.ResultType.cancelled,
message=message,
Expand All @@ -201,3 +205,5 @@ async def report_success() -> None:
log.error(message)
click.echo(click.style(message, fg="red"))
raise DeploymentTerminatedError(message)
else:
log.debug("Skipping empty dependencies for status")
28 changes: 19 additions & 9 deletions foodx_devops_tools/deploy_me/_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@
deploy as deploy_resource_group,
)
from foodx_devops_tools.pipeline_config import (
ApplicationDefinition,
ApplicationDeploymentSteps,
ApplicationStepDelay,
FlattenedDeployment,
PipelineConfiguration,
SingularFrameDefinition,
)
from foodx_devops_tools.pipeline_config.frames import (
ApplicationDeploymentDefinition,
ApplicationStepDeploymentDefinition,
)
from foodx_devops_tools.pipeline_config.puff_map import PuffMapPaths
from foodx_devops_tools.puff import PuffError
from foodx_devops_tools.utilities.templates import prepare_deployment_files

from ._dependency_monitor import process_dependencies
from ._dependency_monitor import wait_for_dependencies
from ._exceptions import DeploymentError
from ._state import PipelineCliOptions
from ._status import DeploymentState, DeploymentStatus, all_success
Expand Down Expand Up @@ -166,7 +167,7 @@ def _construct_override_parameters(


async def _do_step_deployment(
this_step: ApplicationDeploymentDefinition,
this_step: ApplicationStepDeploymentDefinition,
deployment_data: FlattenedDeployment,
puff_parameter_paths: PuffMapPaths,
this_context: str,
Expand Down Expand Up @@ -242,7 +243,7 @@ async def _do_step_deployment(


async def _deploy_step(
this_step: ApplicationDeploymentDefinition,
this_step: ApplicationStepDeploymentDefinition,
deployment_data: FlattenedDeployment,
puff_parameter_data: PuffMapPaths,
this_context: str,
Expand Down Expand Up @@ -285,7 +286,7 @@ async def _do_application_deployment(
][deployment_data.context.azure_subscription_name]

for this_step in application_data:
if isinstance(this_step, ApplicationDeploymentDefinition):
if isinstance(this_step, ApplicationStepDeploymentDefinition):
await _deploy_step(
this_step,
deployment_data,
Expand Down Expand Up @@ -333,7 +334,7 @@ async def _do_application_deployment(


async def deploy_application(
application_data: ApplicationDeploymentSteps,
application_data: ApplicationDefinition,
deployment_data: FlattenedDeployment,
application_status: DeploymentStatus,
enable_validation: bool,
Expand All @@ -349,6 +350,15 @@ async def deploy_application(
log.info(message)
click.echo(message)
await application_status.initialize(this_context)

await wait_for_dependencies(
deployment_data.data.iteration_context,
application_data.depends_on
if application_data.depends_on
else list(),
application_status,
)

await application_status.write(
this_context, DeploymentState.ResultType.in_progress
)
Expand All @@ -368,7 +378,7 @@ async def deploy_application(
else:
await _do_application_deployment(
this_context,
application_data,
application_data.steps,
deployment_data,
application_status,
enable_validation,
Expand Down Expand Up @@ -406,9 +416,9 @@ async def _do_frame_deployment(
)
application_status.start_monitor()

await process_dependencies(
await wait_for_dependencies(
deployment_data.data.iteration_context,
frame_data,
frame_data.depends_on if frame_data.depends_on else list(),
frame_status,
)

Expand Down
2 changes: 2 additions & 0 deletions foodx_devops_tools/pipeline_config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from .clients import ClientsDefinition, load_clients # noqa: F401
from .deployments import DeploymentsDefinition, load_deployments # noqa: F401
from .frames import ( # noqa: F401
ApplicationDefinition,
ApplicationDeploymentSteps,
ApplicationStepDelay,
DependencyDeclarations,
FramesDefinition,
SingularFrameDefinition,
StructuredName,
Expand Down
72 changes: 65 additions & 7 deletions foodx_devops_tools/pipeline_config/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ApplicationStepDelay(pydantic.BaseModel):
delay_seconds: int


class ApplicationDeploymentDefinition(pydantic.BaseModel):
class ApplicationStepDeploymentDefinition(pydantic.BaseModel):
"""Application resource group deployment definition."""

mode: DeploymentMode
Expand All @@ -61,10 +61,18 @@ class ApplicationDeploymentDefinition(pydantic.BaseModel):


ApplicationDeploymentSteps = typing.List[
typing.Union[ApplicationDeploymentDefinition, ApplicationStepDelay]
typing.Union[ApplicationStepDeploymentDefinition, ApplicationStepDelay]
]

ApplicationDeclarations = typing.Dict[str, ApplicationDeploymentSteps]

class ApplicationDefinition(pydantic.BaseModel):
"""A single application definition."""

depends_on: typing.Optional[DependencyDeclarations]
steps: ApplicationDeploymentSteps


ApplicationDeclarations = typing.Dict[str, ApplicationDefinition]


class SingularFrameDefinition(pydantic.BaseModel):
Expand All @@ -76,6 +84,52 @@ class SingularFrameDefinition(pydantic.BaseModel):
depends_on: typing.Optional[DependencyDeclarations]
triggers: typing.Optional[TriggersDefinition]

@pydantic.validator("applications")
def check_applications(
cls: pydantic.BaseModel, applications_candidate: dict
) -> dict:
"""Validate ``applications`` field."""
application_names: list = list(applications_candidate.keys())
if any([not x for x in application_names]):
message = "Empty {0} names prohibited".format(ENTITY_SINGULAR)
# log the message here because pydantic exception handling
# masks the true exception that caused a validation failure.
log.error(message)
raise ValueError(message)
if len(set(application_names)) != len(application_names):
message = "Duplicate {0} names prohibited".format(ENTITY_SINGULAR)
# log the message here because pydantic exception handling
# masks the true exception that caused a validation failure.
log.error(message)
raise ValueError(message)

return applications_candidate

@pydantic.root_validator
def check_dependencies(
cls: pydantic.BaseModel, applications_candidate: dict
) -> dict:
"""
Validate frame dependencies.
Frames must only declare dependencies on other frames.
"""
applications: dict = applications_candidate.get("applications", dict())
frame_names: set = set(applications.keys())
for name, item in applications.items():
if item.depends_on and (
any([x not in frame_names for x in item.depends_on])
):
message = "Unknown application dependency, {0}".format(
str(item.depends_on)
)
# log the message here because pydantic exception handling
# masks the true exception that caused a validation failure.
log.error(message)
raise ValueError(message)

return applications_candidate


FrameDeclarations = typing.Dict[str, SingularFrameDefinition]

Expand Down Expand Up @@ -154,8 +208,10 @@ def arm_file_paths(self: U) -> StructuredPathCollection:
) in frame_data.applications.items():
app_structure = copy.deepcopy(frame_structure)
app_structure.append(application_name)
for this_step in application_data:
if isinstance(this_step, ApplicationDeploymentDefinition):
for this_step in application_data.steps:
if isinstance(
this_step, ApplicationStepDeploymentDefinition
):
step_structure = copy.deepcopy(app_structure)
step_structure.append(this_step.name)

Expand Down Expand Up @@ -193,8 +249,10 @@ def puff_file_paths(self: U) -> StructuredPathCollection:
) in frame_data.applications.items():
app_structure = copy.deepcopy(frame_structure)
app_structure.append(application_name)
for this_step in application_data:
if isinstance(this_step, ApplicationDeploymentDefinition):
for this_step in application_data.steps:
if isinstance(
this_step, ApplicationStepDeploymentDefinition
):
step_structure = copy.deepcopy(app_structure)
step_structure.append(this_step.name)

Expand Down
2 changes: 1 addition & 1 deletion foodx_devops_tools/pipeline_config/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def check_frames_puff_map(
for subscription_data in state_data.values():
frame_step_names = {
x.name
for x in application_data
for x in application_data.steps
if hasattr(x, "name")
}
if frame_step_names != set(subscription_data.keys()):
Expand Down
Loading

0 comments on commit d12acf4

Please sign in to comment.