diff --git a/CHANGELOG.md b/CHANGELOG.md index 049e626..f459bdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## [Unreleased][] +### Added + +* MSK probes `chaosaws.msk.probes.describe_msk_cluster` `chaosaws.msk.probes.get_bootstrap_servers` +* MSK actions `chaosaws.msk.actions.reboot_msk_broker` `chaosaws.msk.actions.delete_cluster` + [Unreleased]: https://github.com/chaostoolkit-incubator/chaostoolkit-aws/compare/0.33.0...HEAD ## [0.33.0][] - 2024-02-26 diff --git a/chaosaws/__init__.py b/chaosaws/__init__.py index c57e82d..3e13d6e 100644 --- a/chaosaws/__init__.py +++ b/chaosaws/__init__.py @@ -307,6 +307,8 @@ def load_exported_activities() -> List[DiscoveredActivities]: activities.extend(discover_actions("chaosaws.fis.actions")) activities.extend(discover_probes("chaosaws.s3.probes")) activities.extend(discover_actions("chaosaws.s3.actions")) + activities.extend(discover_probes("chaosaws.msk.probes")) + activities.extend(discover_actions("chaosaws.msk.actions")) activities.extend( discover_activities("chaosaws.s3.controls.upload", "control") ) diff --git a/chaosaws/msk/__init__.py b/chaosaws/msk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chaosaws/msk/actions.py b/chaosaws/msk/actions.py new file mode 100644 index 0000000..a364781 --- /dev/null +++ b/chaosaws/msk/actions.py @@ -0,0 +1,49 @@ +from typing import List + +from chaoslib.types import Configuration, Secrets +from chaoslib.exceptions import FailedActivity + +from chaosaws import aws_client, get_logger +from chaosaws.types import AWSResponse + +__all__ = ["reboot_msk_broker", "delete_cluster"] + +logger = get_logger() + + +def reboot_msk_broker( + cluster_arn: str, + broker_ids: List[str], + configuration: Configuration = None, + secrets: Secrets = None, +) -> AWSResponse: + """ + Reboot the specified brokers in an MSK cluster. + """ + client = aws_client("kafka", configuration, secrets) + logger.debug( + f"Rebooting MSK brokers: {broker_ids} in cluster {cluster_arn}" + ) + try: + return client.reboot_broker( + ClusterArn=cluster_arn, + BrokerIds=broker_ids + ) + except client.exceptions.NotFoundException: + raise FailedActivity("The specified cluster was not found" ) + + +def delete_cluster( + cluster_arn: str, + configuration: Configuration = None, + secrets: Secrets = None, +) -> AWSResponse: + """ + Delete the given MSK cluster. + """ + client = aws_client("kafka", configuration, secrets) + logger.debug(f"Deleting MSK cluster: {cluster_arn}") + try: + return client.delete_cluster(ClusterArn=cluster_arn) + except client.exceptions.NotFoundException: + raise FailedActivity("The specified cluster was not found") \ No newline at end of file diff --git a/chaosaws/msk/probes.py b/chaosaws/msk/probes.py new file mode 100644 index 0000000..d1b3d7d --- /dev/null +++ b/chaosaws/msk/probes.py @@ -0,0 +1,44 @@ +from typing import List + +from chaoslib.types import Configuration, Secrets +from chaoslib.exceptions import FailedActivity + +from chaosaws import aws_client, get_logger +from chaosaws.types import AWSResponse + +__all__ = ["describe_msk_cluster", "get_bootstrap_servers"] + +logger = get_logger() + + +def describe_msk_cluster( + cluster_arn: str, + configuration: Configuration = None, + secrets: Secrets = None, +) -> AWSResponse: + """ + Describe an MSK cluster. + """ + client = aws_client("kafka", configuration, secrets) + logger.debug(f"Describing MSK cluster: {cluster_arn}") + try: + return client.describe_cluster(ClusterArn=cluster_arn) + except client.exceptions.NotFoundException: + raise FailedActivity("The specified cluster was not found") + + +def get_bootstrap_servers( + cluster_arn: str, + configuration: Configuration = None, + secrets: Secrets = None, +) -> List[str]: + """ + Get the bootstrap servers for an MSK cluster. + """ + client = aws_client("kafka", configuration, secrets) + logger.debug(f"Getting bootstrap servers for MSK cluster: {cluster_arn}") + try: + response = client.get_bootstrap_brokers(ClusterArn=cluster_arn) + return response["BootstrapBrokerString"].split(",") if response else [] + except client.exceptions.NotFoundException: + raise FailedActivity("The specified cluster was not found") diff --git a/tests/msk/test_msk_actions.py b/tests/msk/test_msk_actions.py new file mode 100644 index 0000000..ffdf1fa --- /dev/null +++ b/tests/msk/test_msk_actions.py @@ -0,0 +1,91 @@ +from unittest.mock import MagicMock, patch +import pytest +from chaosaws.msk.actions import reboot_msk_broker, delete_cluster +from chaoslib.exceptions import FailedActivity + + +class NotFoundException(Exception): + def __init__(self, message="Cluster not found"): + super().__init__(f"{message}") + + +@patch("chaosaws.msk.actions.aws_client", autospec=True) +def test_reboot_msk_broker_success(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + broker_ids = ["1"] + client.reboot_broker.return_value = { + "ResponseMetadata": { + "HTTPStatusCode": 200 + } + } + + response = reboot_msk_broker(cluster_arn=cluster_arn, + broker_ids=broker_ids) + + client.reboot_broker.assert_called_with(ClusterArn=cluster_arn, + BrokerIds=broker_ids) + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + +@patch("chaosaws.msk.actions.aws_client", autospec=True) +def test_reboot_msk_broker_not_found(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + broker_ids = ["1"] + + client.exceptions = MagicMock() + client.exceptions.NotFoundException = NotFoundException + client.reboot_broker.side_effect = NotFoundException( + "Cluster not found" + ) + + expected_error_message = "The specified cluster was not found" + + with pytest.raises(FailedActivity) as exc_info: + reboot_msk_broker(cluster_arn=cluster_arn, broker_ids=broker_ids) + + assert expected_error_message in str( + exc_info.value + ) + + +@patch("chaosaws.msk.actions.aws_client", autospec=True) +def test_delete_cluster_success(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + client.delete_cluster.return_value = { + "ResponseMetadata": { + "HTTPStatusCode": 200 + } + } + + response = delete_cluster(cluster_arn=cluster_arn) + + client.delete_cluster.assert_called_with(ClusterArn=cluster_arn) + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + +@patch("chaosaws.msk.actions.aws_client", autospec=True) +def test_delete_cluster_not_found(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + + client.exceptions = MagicMock() + client.exceptions.NotFoundException = NotFoundException + client.delete_cluster.side_effect = NotFoundException( + "Cluster not found" + ) + + expected_error_message = "The specified cluster was not found" + + with pytest.raises(FailedActivity) as exc_info: + delete_cluster(cluster_arn=cluster_arn) + + assert expected_error_message in str( + exc_info.value + ) \ No newline at end of file diff --git a/tests/msk/test_msk_probes.py b/tests/msk/test_msk_probes.py new file mode 100644 index 0000000..8ed3e4c --- /dev/null +++ b/tests/msk/test_msk_probes.py @@ -0,0 +1,84 @@ +from unittest.mock import MagicMock, patch +import pytest +from chaosaws.msk.probes import describe_msk_cluster, get_bootstrap_servers +from chaoslib.exceptions import FailedActivity + + +class NotFoundException(Exception): + def __init__(self, message="Cluster not found"): + super().__init__(f"{message}") + + +@patch("chaosaws.msk.probes.aws_client", autospec=True) +def test_describe_msk_cluster_success(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + expected_response = {"ClusterInfo": {"ClusterArn": cluster_arn}} + + client.describe_cluster.return_value = expected_response + + response = describe_msk_cluster(cluster_arn=cluster_arn) + + client.describe_cluster.assert_called_with(ClusterArn=cluster_arn) + assert response == expected_response + + +@patch("chaosaws.msk.probes.aws_client", autospec=True) +def test_describe_msk_cluster_not_found(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + + client.exceptions = MagicMock() + client.exceptions.NotFoundException = NotFoundException + client.describe_cluster.side_effect = NotFoundException( + "Cluster not found" + ) + + expected_error_message = "The specified cluster was not found" + + with pytest.raises(FailedActivity) as exc_info: + describe_msk_cluster(cluster_arn=cluster_arn) + + assert expected_error_message in str( + exc_info.value + ) + + +@patch("chaosaws.msk.probes.aws_client", autospec=True) +def test_get_bootstrap_servers_success(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + bootstrap_servers = "broker1,broker2,broker3" + expected_response = {"BootstrapBrokerString": bootstrap_servers} + + client.get_bootstrap_brokers.return_value = expected_response + + response = get_bootstrap_servers(cluster_arn=cluster_arn) + + client.get_bootstrap_brokers.assert_called_with(ClusterArn=cluster_arn) + assert response == bootstrap_servers.split(",") + + +@patch("chaosaws.msk.probes.aws_client", autospec=True) +def test_get_bootstrap_server_cluster_not_found(aws_client): + client = MagicMock() + aws_client.return_value = client + cluster_arn = "arn_msk_cluster" + + client.exceptions = MagicMock() + client.exceptions.NotFoundException = NotFoundException + client.get_bootstrap_brokers.side_effect = NotFoundException( + "Cluster not found" + ) + + expected_error_message = "The specified cluster was not found" + + with pytest.raises(FailedActivity) as exc_info: + get_bootstrap_servers(cluster_arn=cluster_arn) + + assert expected_error_message in str( + exc_info.value + )