Skip to content

Commit

Permalink
Merge pull request #6 from aws-solutions/feature/v1.0.1
Browse files Browse the repository at this point in the history
Features for v1.0.1
  • Loading branch information
aassadza authored Oct 7, 2021
2 parents 7929652 + b64f08f commit 124079c
Show file tree
Hide file tree
Showing 18 changed files with 293 additions and 93 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.1] - 2021-10-01
### Added
- The solution now exports the Amazon SNS Topic ARN as `SNSTopicArn`.

### Changed
- The SNS message format will change based on the protocol used. For Amazon SQS and Email-JSON endpoints, a JSON payload
will be sent. The message sent to subscribed Email endpoints is unchanged.
- The Amazon CloudWatch dashboard deployed by the solution will be replaced with a dashboard containing the stack's
region name.

## [1.0.0] - 2021-09-23
### Added
- All files, initial version
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ After running the command, you can deploy the template:
## Collection of operational metrics
This solution collects anonymous operational metrics to help AWS improve the quality of features of the solution.
For more information, including how to disable this capability, please see the [implementation guide](https://aws.amazon.com/solutions/implementations/maintaining-personalized-experiences-with-ml).
For more information, including how to disable this capability, please see the [implementation guide](https://docs.aws.amazon.com/solutions/latest/maintaining-personalized-experiences-with-ml/collection-of-operational-metrics.html).

***

Expand Down
50 changes: 39 additions & 11 deletions source/aws_lambda/s3_event/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for #
# the specific language governing permissions and limitations under the License. #
# ######################################################################################################################

import json
import os
from typing import List

from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
Expand Down Expand Up @@ -43,21 +42,50 @@ def solution_name() -> str:
return os.environ["SOLUTION_NAME"]


def send_configuration_error(errors: List[str]):
def send_configuration_error(configuration: Configuration):
errors = configuration.errors
sns = get_service_client("sns")
dataset_group = configuration.dataset_group

subject = f"{solution_name()} Notifications"

message = "There were errors detected when reading a personalization job configuration file:\n\n"
for error in errors:
logger.error(f"Personalization job configuration error: {error}")
message += f" - {error}\n"
message += "\nPlease correct these errors and upload the configuration again."
def build_default_message():
f"The personalization workflow for {configuration.dataset_group} completed with errors."

def build_json_message():
return json.dumps(
{
"datasetGroup": dataset_group,
"status": "UPDATE FAILED",
"summary": "There were errors detected when reading a personalization job configuration file",
"description": [error for error in errors],
}
)

def build_long_message():
message = "There were errors detected when reading a personalization job configuration file:\n\n"
for error in errors:
logger.error(f"Personalization job configuration error: {error}")
message += f" - {error}\n"
message += "\nPlease correct these errors and upload the configuration again."
return message

logger.error("publishing configuration error to SQS")
sns.publish(
TopicArn=topic_arn(),
Message=message,
Message=json.dumps(
{
"default": build_default_message(),
"sms": build_default_message(),
"email": build_long_message(),
"email-json": build_json_message(),
"sqs": build_json_message(),
}
),
MessageStructure="json",
Subject=subject,
)
logger.error("published configuration error to SQS")


@metrics.log_metrics
Expand Down Expand Up @@ -86,7 +114,7 @@ def lambda_handler(event, context):
configuration = Configuration()
configuration.load(config_text)
if configuration.errors:
send_configuration_error(configuration.errors)
send_configuration_error(configuration)
metrics.add_metric(
"ConfigurationsProcessedFailures", unit=MetricUnit.Count, value=1
)
Expand All @@ -98,7 +126,7 @@ def lambda_handler(event, context):
metrics.add_metric(
"ConfigurationsProcessedFailures", unit=MetricUnit.Count, value=1
)
send_configuration_error(configuration.errors)
send_configuration_error(configuration)
else:
config = configuration.config_dict
config = set_bucket(config, bucket, key)
Expand Down
3 changes: 3 additions & 0 deletions source/aws_lambda/shared/personalize_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ class Configuration:
def __init__(self):
self._configuration_errors = []
self.config_dict = {}
self.dataset_group = "UNKNOWN"

def load(self, content: Union[Path, str]):
if isinstance(content, Path):
Expand Down Expand Up @@ -670,6 +671,8 @@ def _validate_dataset_group(self, path="datasetGroup.serviceConfig"):
)
else:
self._validate_resource(DatasetGroup(), dataset_group)
if isinstance(dataset_group, dict):
self.dataset_group = dataset_group.get("name", self.dataset_group)

def _validate_event_tracker(self, path="eventTracker.serviceConfig"):
event_tracker = jmespath.search(path, self.config_dict)
Expand Down
5 changes: 4 additions & 1 deletion source/aws_lambda/shared/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ def list(self) -> Generator[str, None, None]:
:return: Generator[str] of the schedules (by name)
"""
done = False
scan_kwargs = {"ProjectionExpression": TASK_PK}
scan_kwargs = {
"ProjectionExpression": "#name",
"ExpressionAttributeNames": {"#name": TASK_PK},
}
start_key = None
discovered = set()
while not done:
Expand Down
6 changes: 5 additions & 1 deletion source/aws_lambda/shared/sfn_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ def check_status( # NOSONAR - allow higher complexity
expected_value = expected_value.lower()

# some parameters don't require checking:
if self.resource == "datasetImportJob" and expected_key == "jobName":
if self.resource == "datasetImportJob" and expected_key in {
"jobName",
"dataSource",
"roleArn",
}:
continue
if self.resource == "batchInferenceJob" and expected_key in {
"jobName",
Expand Down
33 changes: 31 additions & 2 deletions source/aws_lambda/sns_notification/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ def __init__(self, event: Dict, context: LambdaContext):
metrics.add_metric("JobSuccess", unit=MetricUnit.Count, value=1)
self.message = self._build_success_message()

self.default = self._build_default_message()
self.sms = self._build_sms_message()
self.json = self._build_json_message()

def _build_json_message(self):
return json.dumps(
{
"datasetGroup": self.dataset_group,
"status": "UPDATE FAILED" if self.error else "UPDATE COMPLETE",
"summary": self._build_default_message(),
"description": self.message,
}
)

def _build_default_message(self) -> str:
return f"The personalization workflow for {self.dataset_group} completed {'with errors' if self.error else 'successfully'}"

def _build_sms_message(self) -> str:
return self._build_default_message()

def _build_error_message(self) -> str:
"""
Build the error message
Expand Down Expand Up @@ -116,12 +136,21 @@ def lambda_handler(event, context):
:return: None
"""
sns = get_service_client("sns")
message = MessageBuilder(event, context).message
message_builder = MessageBuilder(event, context)
subject = f"{solution_name()} Notifications"

logger.info("publishing message for event", extra={"event": event})
sns.publish(
TopicArn=topic_arn(),
Message=message,
Message=json.dumps(
{
"default": message_builder.default,
"sms": message_builder.sms,
"email": message_builder.message,
"email-json": message_builder.json,
"sqs": message_builder.json,
}
),
MessageStructure="json",
Subject=subject,
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,13 @@ class CDKSolution:
"""

def __init__(self, cdk_json_path: Path, qualifier="hnb659fds"):
self.qualifier = qualifier
self.context = SolutionContext(cdk_json_path=cdk_json_path)
self.synthesizer = SolutionStackSubstitions(qualifier=qualifier)
self.synthesizer = SolutionStackSubstitions(qualifier=self.qualifier)

def reset(self) -> None:
"""
Get a new synthesizer for this CDKSolution - useful for testing
:return: None
"""
self.synthesizer = SolutionStackSubstitions(qualifier=self.qualifier)
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ def metadata(self) -> dict:
return self._metadata

def _get_metadata(self) -> dict:
parameter_groups = list(
set([parameter.group for parameter in self._parameters])
)
pgs = set()
parameter_groups = [
p.group
for p in self._parameters
if p.group not in pgs and not pgs.add(p.group)
]
metadata = {
"AWS::CloudFormation::Interface": {
"ParameterGroups": [
Expand Down
4 changes: 2 additions & 2 deletions source/cdk_solution_helper_py/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
aws-cdk.core>=1.120.0
aws-cdk.aws_lambda>=1.120.0
aws-cdk.core>=1.123.0
aws-cdk.aws_lambda>=1.123.0
black
boto3>=1.17.49
requests>=2.24.0
Expand Down
2 changes: 1 addition & 1 deletion source/infrastructure/cdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"context": {
"SOLUTION_NAME": "Maintaining Personalized Experiences with Machine Learning",
"SOLUTION_ID": "SO0170",
"SOLUTION_VERSION": "1.0.0",
"SOLUTION_VERSION": "1.0.1",
"@aws-cdk/core:newStyleStackSynthesis": "true",
"@aws-cdk/core:enableStackNameDuplicates": "true",
"aws-cdk:enableDiffNoFail": "true",
Expand Down
2 changes: 1 addition & 1 deletion source/infrastructure/personalize/cloudwatch/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
self.dashboard = cw.Dashboard(
self,
"PersonalizeDashboard",
dashboard_name=f"PersonalizeSolution-{Aws.STACK_NAME}",
dashboard_name=f"PersonalizeSolution-{Aws.STACK_NAME}-{Aws.REGION}",
period_override=cw.PeriodOverride.AUTO,
start="-PT1D",
)
Expand Down
44 changes: 25 additions & 19 deletions source/infrastructure/personalize/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ def __init__(
super().__init__(scope, construct_id, *args, **kwargs)

# CloudFormation Parameters
self.email = cdk.CfnParameter(
self,
id="Email",
type="String",
description="Email to notify with personalize workflow results",
default="",
max_length=50,
allowed_pattern=r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$|^$)",
constraint_description="Must be a valid email address or blank",
)
self.solutions_template_options.add_parameter(
self.email, "Email", "Solution Configuration"
)
self.email_provided = CfnCondition(
self,
"EmailProvided",
expression=Fn.condition_not(Fn.condition_equals(self.email, "")),
)

self.personalize_kms_key_arn = cdk.CfnParameter(
self,
id="PersonalizeKmsKeyArn",
Expand All @@ -88,25 +107,6 @@ def __init__(
),
)

self.email = cdk.CfnParameter(
self,
id="Email",
type="String",
description="Email to notify with personalize workflow results",
default="",
max_length=50,
allowed_pattern=r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$|^$)",
constraint_description="Must be a valid email address or blank",
)
self.solutions_template_options.add_parameter(
self.email, "Email", "Solution Configuration"
)
self.email_provided = CfnCondition(
self,
"EmailProvided",
expression=Fn.condition_not(Fn.condition_equals(self.email, "")),
)

# layers
layer_powertools = PowertoolsLayer.get_or_create(self)
layer_solutions = SolutionsLayer.get_or_create(self)
Expand Down Expand Up @@ -413,3 +413,9 @@ def __init__(
value=self.dashboard.name,
export_name=f"{Aws.STACK_NAME}-Dashboard",
)
cdk.CfnOutput(
self,
"SNSTopicArn",
value=notifications.topic.topic_arn,
export_name=f"{Aws.STACK_NAME}-SNSTopicArn",
)
7 changes: 6 additions & 1 deletion source/infrastructure/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# ######################################################################################################################


import json
from pathlib import Path

import setuptools
Expand All @@ -20,10 +21,14 @@
with open(readme_path) as fp:
long_description = fp.read()

cdk_json_path = Path(__file__).resolve().parent / "cdk.json"
cdk_json = json.loads(cdk_json_path.read_text())
VERSION = cdk_json["context"]["SOLUTION_VERSION"]


setuptools.setup(
name="infrastructure",
version="1.0.0",
version=VERSION,
description="AWS CDK stack to deploy the AWS MLOps for Amazon Personalize solution.",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
6 changes: 3 additions & 3 deletions source/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
avro==1.10.2
black
boto3
aws_cdk.core>=1.120.0
aws_cdk.aws_stepfunctions_tasks>=1.120.0
aws_solutions_constructs.aws_lambda_sns>=1.120.0
aws_cdk.core>=1.123.0
aws_cdk.aws_stepfunctions_tasks>=1.123.0
aws_solutions_constructs.aws_lambda_sns>=1.123.0
requests==2.24.0
crhelper==2.0.6
cronex==0.1.3.1
Expand Down
Loading

0 comments on commit 124079c

Please sign in to comment.