Skip to content

Commit

Permalink
add msk probes and actions (#148)
Browse files Browse the repository at this point in the history
* chaosaws msk ready Signed-off-by: Jorge Tapicha <[email protected]>

Signed-off-by: Jorge Tapicha <[email protected]>

* add changelog and activities extends
Signed-off-by: Jorge Tapicha <[email protected]>

Signed-off-by: Jorge Tapicha <[email protected]>

* remove unnecesary empty response test

Signed-off-by: Jorge Tapicha <[email protected]>

* first corrections Pull Request

Signed-off-by: Jorge Tapicha <[email protected]>

* fix f-string without any placeholders

Signed-off-by: Jorge Tapicha <[email protected]>

---------

Signed-off-by: Jorge Tapicha <[email protected]>
  • Loading branch information
jitapichab authored Jun 10, 2024
1 parent d11e8fd commit 51c066f
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased][]

### Added

* MSK probes `chaosaws.msk.probes.describe_msk_cluster` `chaosaws.msk.probes.get_bootstrap_servers`
* MSK actions `chaosaws.msk.actions.reboot_msk_broker` `chaosaws.msk.actions.delete_cluster`

[Unreleased]: https://github.com/chaostoolkit-incubator/chaostoolkit-aws/compare/0.33.0...HEAD

## [0.33.0][] - 2024-02-26
Expand Down
2 changes: 2 additions & 0 deletions chaosaws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ def load_exported_activities() -> List[DiscoveredActivities]:
activities.extend(discover_actions("chaosaws.fis.actions"))
activities.extend(discover_probes("chaosaws.s3.probes"))
activities.extend(discover_actions("chaosaws.s3.actions"))
activities.extend(discover_probes("chaosaws.msk.probes"))
activities.extend(discover_actions("chaosaws.msk.actions"))
activities.extend(
discover_activities("chaosaws.s3.controls.upload", "control")
)
Expand Down
Empty file added chaosaws/msk/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions chaosaws/msk/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List

from chaoslib.types import Configuration, Secrets
from chaoslib.exceptions import FailedActivity

from chaosaws import aws_client, get_logger
from chaosaws.types import AWSResponse

__all__ = ["reboot_msk_broker", "delete_cluster"]

logger = get_logger()


def reboot_msk_broker(
cluster_arn: str,
broker_ids: List[str],
configuration: Configuration = None,
secrets: Secrets = None,
) -> AWSResponse:
"""
Reboot the specified brokers in an MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(
f"Rebooting MSK brokers: {broker_ids} in cluster {cluster_arn}"
)
try:
return client.reboot_broker(
ClusterArn=cluster_arn,
BrokerIds=broker_ids
)
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found" )


def delete_cluster(
cluster_arn: str,
configuration: Configuration = None,
secrets: Secrets = None,
) -> AWSResponse:
"""
Delete the given MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(f"Deleting MSK cluster: {cluster_arn}")
try:
return client.delete_cluster(ClusterArn=cluster_arn)
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found")
44 changes: 44 additions & 0 deletions chaosaws/msk/probes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List

from chaoslib.types import Configuration, Secrets
from chaoslib.exceptions import FailedActivity

from chaosaws import aws_client, get_logger
from chaosaws.types import AWSResponse

__all__ = ["describe_msk_cluster", "get_bootstrap_servers"]

logger = get_logger()


def describe_msk_cluster(
cluster_arn: str,
configuration: Configuration = None,
secrets: Secrets = None,
) -> AWSResponse:
"""
Describe an MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(f"Describing MSK cluster: {cluster_arn}")
try:
return client.describe_cluster(ClusterArn=cluster_arn)
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found")


def get_bootstrap_servers(
cluster_arn: str,
configuration: Configuration = None,
secrets: Secrets = None,
) -> List[str]:
"""
Get the bootstrap servers for an MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(f"Getting bootstrap servers for MSK cluster: {cluster_arn}")
try:
response = client.get_bootstrap_brokers(ClusterArn=cluster_arn)
return response["BootstrapBrokerString"].split(",") if response else []
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found")
91 changes: 91 additions & 0 deletions tests/msk/test_msk_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from unittest.mock import MagicMock, patch
import pytest
from chaosaws.msk.actions import reboot_msk_broker, delete_cluster
from chaoslib.exceptions import FailedActivity


class NotFoundException(Exception):
def __init__(self, message="Cluster not found"):
super().__init__(f"{message}")


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_reboot_msk_broker_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
broker_ids = ["1"]
client.reboot_broker.return_value = {
"ResponseMetadata": {
"HTTPStatusCode": 200
}
}

response = reboot_msk_broker(cluster_arn=cluster_arn,
broker_ids=broker_ids)

client.reboot_broker.assert_called_with(ClusterArn=cluster_arn,
BrokerIds=broker_ids)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_reboot_msk_broker_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
broker_ids = ["1"]

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.reboot_broker.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
reboot_msk_broker(cluster_arn=cluster_arn, broker_ids=broker_ids)

assert expected_error_message in str(
exc_info.value
)


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_delete_cluster_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
client.delete_cluster.return_value = {
"ResponseMetadata": {
"HTTPStatusCode": 200
}
}

response = delete_cluster(cluster_arn=cluster_arn)

client.delete_cluster.assert_called_with(ClusterArn=cluster_arn)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_delete_cluster_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.delete_cluster.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
delete_cluster(cluster_arn=cluster_arn)

assert expected_error_message in str(
exc_info.value
)
84 changes: 84 additions & 0 deletions tests/msk/test_msk_probes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from unittest.mock import MagicMock, patch
import pytest
from chaosaws.msk.probes import describe_msk_cluster, get_bootstrap_servers
from chaoslib.exceptions import FailedActivity


class NotFoundException(Exception):
def __init__(self, message="Cluster not found"):
super().__init__(f"{message}")


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_describe_msk_cluster_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
expected_response = {"ClusterInfo": {"ClusterArn": cluster_arn}}

client.describe_cluster.return_value = expected_response

response = describe_msk_cluster(cluster_arn=cluster_arn)

client.describe_cluster.assert_called_with(ClusterArn=cluster_arn)
assert response == expected_response


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_describe_msk_cluster_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.describe_cluster.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
describe_msk_cluster(cluster_arn=cluster_arn)

assert expected_error_message in str(
exc_info.value
)


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_get_bootstrap_servers_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
bootstrap_servers = "broker1,broker2,broker3"
expected_response = {"BootstrapBrokerString": bootstrap_servers}

client.get_bootstrap_brokers.return_value = expected_response

response = get_bootstrap_servers(cluster_arn=cluster_arn)

client.get_bootstrap_brokers.assert_called_with(ClusterArn=cluster_arn)
assert response == bootstrap_servers.split(",")


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_get_bootstrap_server_cluster_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.get_bootstrap_brokers.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
get_bootstrap_servers(cluster_arn=cluster_arn)

assert expected_error_message in str(
exc_info.value
)

0 comments on commit 51c066f

Please sign in to comment.