diff --git a/.github/workflows/release-edge.yaml b/.github/workflows/release-edge.yaml index 397f489..fd21b98 100644 --- a/.github/workflows/release-edge.yaml +++ b/.github/workflows/release-edge.yaml @@ -48,4 +48,4 @@ jobs: with: credentials: "${{ secrets.CHARMHUB_TOKEN }}" github-token: "${{ secrets.GITHUB_TOKEN }}" - channel: 3.9/edge + channel: 3.12/edge diff --git a/.gitignore b/.gitignore index 6af8cd9..d76b442 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ venv/ build/ *.charm +*.swp .tox venv .coverage diff --git a/actions.yaml b/actions.yaml index a159207..df79619 100644 --- a/actions.yaml +++ b/actions.yaml @@ -19,3 +19,94 @@ get-service-account: - username - vhost additionalProperties: False + +ensure-queue-ha: + description: | + Check for queues that have insufficent memebers for high + availability and, if possible, add members to them. + params: + dry-run: + type: boolean + default: false + description: | + Report on what memberships would change if the command was + run and warn about any queues that cannot be fixed. + +rebalance-quorum: + description: | + Rebalance queue leaders to that they are evenly balanced + across the cluster. + +add-member: + description: Add member to queue + params: + queue-name: + type: string + description: | + Name of queue to add the member to. + unit-name: + type: string + description: | + Juju unit name of the node to be added to the queue + vhost: + type: string + description: | + vhost that the queue resides in. + required: [queue-name, unit-name] + +delete-member: + description: Remove member from queue + params: + queue-name: + type: string + description: | + Name of queue to remove the member from. + unit-name: + type: string + description: | + Juju unit name of the node to be removed from the queue + vhost: + type: string + description: | + vhost that the queue resides in. + required: [queue-name, unit-name] + +grow: + description: | + Adds a new replica on the given node for all or a half + of matching quorum queues. + params: + selector: + type: string + description: | + Valid values are "all" or even" + unit-name: + type: string + description: | + Juju unit name of the node to have queues added + vhost-pattern: + type: string + description: | + Match vhosts to be added to the node + queue-pattern: + type: string + description: | + Match queues to be added to the node + required: [unit-name, selector] + +shrink: + description: | + Shrinks quorum queue clusters by removing any members (replicas) + on the given node. + params: + unit-name: + type: string + description: | + Juju unit name of the node to have queues added + error-only: + type: boolean + default: false + description: | + Only list queues which reported an error + required: [unit-name] + diff --git a/config.yaml b/config.yaml index 396d769..108a5a6 100644 --- a/config.yaml +++ b/config.yaml @@ -2,4 +2,7 @@ # See LICENSE file for licensing details. options: - {} + minimum-replicas: + default: 3 + description: Minimum number of queues replicas, set to 0 to disable charm automatically managing queue replicas + type: int diff --git a/metadata.yaml b/metadata.yaml index be81722..30ee772 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -17,7 +17,7 @@ resources: rabbitmq-image: type: oci-image description: OCI image for rabbitmq - upstream-source: ghcr.io/openstack-snaps/rabbitmq:3.9.13 + upstream-source: ghcr.io/openstack-snaps/rabbitmq:3.12.1 storage: rabbitmq-data: diff --git a/src/charm.py b/src/charm.py index bebf9e2..101bf00 100755 --- a/src/charm.py +++ b/src/charm.py @@ -23,11 +23,11 @@ IPv6Address, ) from typing import ( + List, Union, ) import pwgen -import rabbitmq_admin import requests import tenacity from charms.observability_libs.v1.kubernetes_service_patch import ( @@ -61,10 +61,12 @@ WaitingStatus, ) from ops.pebble import ( + ExecError, PathError, ) import interface_rabbitmq_peers +import rabbit_extended_api logger = logging.getLogger(__name__) @@ -74,6 +76,11 @@ RABBITMQ_GROUP = "rabbitmq" RABBITMQ_COOKIE_PATH = "/var/lib/rabbitmq/.erlang.cookie" +SELECTOR_ALL = "all" +SELECTOR_NONE = "none" +SELECTOR_EVEN = "even" +SELECTOR_INDIVIDUAL = "individual" + EPMD_SERVICE = "epmd" @@ -93,6 +100,12 @@ def __init__(self, *args): self.framework.observe( self.on.get_operator_info_action, self._on_get_operator_info_action ) + self.framework.observe( + self.on.add_member_action, self._on_add_member_action + ) + self.framework.observe( + self.on.delete_member_action, self._on_delete_member_action + ) self.framework.observe(self.on.update_status, self._on_update_status) # Peers self.peers = interface_rabbitmq_peers.RabbitMQOperatorPeers( @@ -102,6 +115,14 @@ def __init__(self, *args): self.peers.on.connected, self._on_peer_relation_connected, ) + self.framework.observe( + self.peers.on.ready, + self._on_peer_relation_ready, + ) + self.framework.observe( + self.peers.on.leaving, + self._on_peer_relation_leaving, + ) # AMQP Provides self.amqp_provider = RabbitMQProvides( self, "amqp", self.create_amqp_credentials @@ -242,6 +263,31 @@ def _rabbitmq_layer(self) -> dict: }, } + def _on_peer_relation_leaving( # noqa: C901 + self, event: EventBase + ) -> None: + if self.unit.is_leader(): + leaving_node = self.generate_nodename(event.nodename) + container = self.unit.get_container(RABBITMQ_CONTAINER) + logging.info(f"Removing {leaving_node} from queues") + try: + # forget_cluster_node not currently supported by HTTP API + process = container.exec( + ["rabbitmqctl", "forget_cluster_node", leaving_node], + timeout=5 * 60, + ) + output, _ = process.wait_output() + logging.info(output) + except ExecError as e: + if "The node selected is not in the cluster" in e.stderr: + logging.warning( + f"Removal of {leaving_node} failed, node not found" + ) + else: + logging.error(f"Removal of {leaving_node} failed") + logging.error(e.stdout) + logging.error(e.stderr) + # TODO: refactor this method to reduce complexity. def _on_peer_relation_connected( # noqa: C901 self, event: EventBase @@ -296,6 +342,116 @@ def _on_peer_relation_connected( # noqa: C901 self._on_update_status(event) + def get_queue_growth_selector(self, min_q_len: int, max_q_len: int): + """Select a queue growth strategy. + + Select a queue growth strategy from: + ALL: All queues add a new replica + NONE: No queues have additional replica added + EVEN: Queues with an even number of replicas have additional replica added + INDIVIDUAL: Each queue is expanded individually + + NOTE: INDIVIDUAL is expensive as an api call needs to be made + for each queue. + """ + if min_q_len == max_q_len: + if max_q_len < self.min_replicas(): + # 1 -> 2 + # 2 -> 3 + selector = SELECTOR_ALL + else: + # All queues have enough members but queues should + # not have an even number of replicas + selector = SELECTOR_EVEN + elif min_q_len > 1: + # 2->3 + # 3->3 + # 4->5 (no even queues) + selector = SELECTOR_EVEN + elif min_q_len == 1: + if max_q_len < self.min_replicas(): + # 1 -> 2 + # 2 -> 3 + selector = SELECTOR_ALL + else: + # Cannot use "even" as the queues with 1 node need expanding, + # cannot use "all" as there are queues with 3+ members + selector = SELECTOR_INDIVIDUAL + return selector + + def unit_in_cluster(self, unit: str) -> bool: + """Is unit in cluster according to rabbit api.""" + api = self._get_admin_api() + joining_node = self.generate_nodename(unit) + clustered_nodes = [n["name"] for n in api.list_nodes()] + logging.debug(f"Found cluster nodes {clustered_nodes}") + return joining_node in clustered_nodes + + def grow_queues_onto_unit(self, unit) -> None: + """Grow any undersized queues onto unit.""" + api = self._get_admin_api() + joining_node = self.generate_nodename(unit) + queue_members = [len(q["members"]) for q in api.list_queues()] + if not queue_members: + logging.debug("No queues found, queue growth skipped") + queue_members.sort() + selector = self.get_queue_growth_selector( + queue_members[0], queue_members[-1] + ) + logging.debug(f"selector: {selector}") + if selector in [SELECTOR_ALL, SELECTOR_EVEN]: + api.grow_queue(joining_node, selector) + elif selector == SELECTOR_INDIVIDUAL: + undersized_queues = self.get_undersized_queues() + for q in undersized_queues: + if joining_node not in q["members"]: + api.add_member(joining_node, q["vhost"], q["name"]) + elif selector == SELECTOR_NONE: + logging.debug("No queues need new replicas") + else: + logging.error(f"Unknown selectore type {selector}") + + def _on_peer_relation_ready(self, event: EventBase) -> None: + """Event handler on peers relation ready.""" + if not self._rabbitmq_running(): + event.defer() + return + + if not self.peers.operator_user_created: + event.defer() + return + + if not self.unit_in_cluster(event.nodename): + logging.debug(f"{event.nodename} is not in cluster yet.") + event.defer() + return + + if self.unit.is_leader(): + self.grow_queues_onto_unit(event.nodename) + api = self._get_admin_api() + api.rebalance_queues() + self._on_update_status(event) + + def _on_add_member_action(self, event) -> None: + """Handle add_member charm action.""" + api = self._get_admin_api() + api.add_member( + self.generate_nodename(event.params["unit-name"]), + event.params.get("vhost"), + event.params["queue-name"], + ) + self._on_update_status(event) + + def _on_delete_member_action(self, event) -> None: + """Handle delete_member charm action.""" + api = self._get_admin_api() + api.delete_member( + self.generate_nodename(event.params["unit-name"]), + event.params.get("vhost"), + event.params["queue-name"], + ) + self._on_update_status(event) + def _on_ready_amqp_clients(self, event) -> None: """Event handler on AMQP clients ready.""" self._on_update_status(event) @@ -490,7 +646,7 @@ def rabbit_running(self) -> bool: def _get_admin_api( self, username: str = None, password: str = None - ) -> rabbitmq_admin.AdminAPI: + ) -> rabbit_extended_api.ExtendedAdminApi: """Return an administrative API for RabbitMQ. :username: Username to access RMQ API @@ -501,7 +657,7 @@ def _get_admin_api( """ username = username or self._operator_user password = password or self._operator_password - return rabbitmq_admin.AdminAPI( + return rabbit_extended_api.ExtendedAdminApi( url=self._rabbitmq_mgmt_url, auth=(username, password) ) @@ -594,10 +750,16 @@ def _render_and_push_rabbitmq_conf(self) -> None: "/etc/rabbitmq/rabbitmq.conf", rabbitmq_conf, make_dirs=True ) + def generate_nodename(self, unit_name) -> str: + """K8S DNS nodename for local unit.""" + return ( + f"rabbit@{unit_name.replace('/', '-')}.{self.app.name}-endpoints" + ) + @property def nodename(self) -> str: """K8S DNS nodename for local unit.""" - return f"{self.unit.name.replace('/', '-')}.{self.app.name}-endpoints" + return self.generate_nodename(self.unit.name) def _render_and_push_rabbitmq_env(self) -> None: """Render and push rabbitmq-env conf. @@ -607,7 +769,7 @@ def _render_and_push_rabbitmq_env(self) -> None: container = self.unit.get_container(RABBITMQ_CONTAINER) rabbitmq_env = f""" # Sane configuration defaults for running under K8S -NODENAME=rabbit@{self.nodename} +NODENAME={self.nodename} USE_LONGNAME=true """ logger.info("Pushing new rabbitmq-env.conf") @@ -625,6 +787,24 @@ def _on_get_operator_info_action(self, event) -> None: } event.set_results(data) + def _manage_queues(self) -> bool: + """Whether the charm should manage queue membership.""" + return bool(self.config.get("minimum-replicas")) + + def min_replicas(self) -> int: + """The minimum number of replicas a queue should have.""" + return self.config.get("minimum-replicas") + + def get_undersized_queues(self) -> List[str]: + """Return a list of queues which have fewer members than minimum.""" + api = self._get_admin_api() + undersized_queues = [ + q + for q in api.list_queues() + if len(q["members"]) < self.min_replicas() + ] + return undersized_queues + def _on_update_status(self, event) -> None: """Update status. @@ -648,6 +828,15 @@ def _on_update_status(self, event) -> None: if self._stored.rabbitmq_version: self.unit.set_workload_version(self._stored.rabbitmq_version) + + if self.unit.is_leader() and self._manage_queues(): + undersized_queues = self.get_undersized_queues() + if undersized_queues: + self.unit.status = ActiveStatus( + f"WARNING: {len(undersized_queues)} Queue(s) with insufficient members" + ) + return + self.unit.status = ActiveStatus() def create_amqp_credentials( diff --git a/src/interface_rabbitmq_peers.py b/src/interface_rabbitmq_peers.py index 69716f5..a4eef25 100644 --- a/src/interface_rabbitmq_peers.py +++ b/src/interface_rabbitmq_peers.py @@ -38,7 +38,24 @@ class PeersConnectedEvent(EventBase): """ -class ReadyPeersEvent(EventBase): +class PeersNodeEvent(EventBase): + """Peer Event which stores the nodename triggering the event.""" + + def __init__(self, handle, nodename): + super().__init__(handle) + self.nodename = nodename + + def snapshot(self): + """Store event data into snapshot.""" + return {"nodename": self.nodename} + + def restore(self, snapshot): + """Restore data from snapshot into event.""" + super().restore(snapshot) + self.nodename = snapshot["nodename"] + + +class ReadyPeersEvent(PeersNodeEvent): """Event triggered when peer relation is ready for use. This event is triggered when the peer relation has been configured @@ -48,6 +65,14 @@ class ReadyPeersEvent(EventBase): """ +class PeersLeavingEvent(PeersNodeEvent): + """Event triggered when the peer unit leaves the relation. + + This event is triggered when a peer unit leaves the relation. + This is almost certainly a scale-back event. + """ + + class PeersBrokenEvent(EventBase): """Event triggered when the peer relation is destroyed. @@ -64,6 +89,7 @@ class RabbitMQOperatorPeersEvents(ObjectEvents): connected = EventSource(PeersConnectedEvent) ready = EventSource(ReadyPeersEvent) goneaway = EventSource(PeersBrokenEvent) + leaving = EventSource(PeersLeavingEvent) class RabbitMQOperatorPeers(Object): @@ -74,6 +100,7 @@ class RabbitMQOperatorPeers(Object): OPERATOR_PASSWORD = "operator_password" OPERATOR_USER_CREATED = "operator_user_created" ERLANG_COOKIE = "erlang_cookie" + NODENAME = "nodename" def __init__(self, charm, relation_name): super().__init__(charm, relation_name) @@ -84,6 +111,9 @@ def __init__(self, charm, relation_name): self.framework.observe( charm.on[relation_name].relation_changed, self.on_changed ) + self.framework.observe( + charm.on[relation_name].relation_departed, self.on_departed + ) self.framework.observe( charm.on[relation_name].relation_broken, self.on_broken ) @@ -103,11 +133,17 @@ def on_broken(self, event): logging.debug("RabbitMQOperatorPeers on_broken") self.on.gonewaway.emit() + def on_departed(self, event): + """Relation broken event handler.""" + logging.debug("RabbitMQOperatorPeers on_departed") + self.on.leaving.emit(event.departing_unit.name) + def on_changed(self, event): """Relation changed event handler.""" logging.debug("RabbitMQOperatorPeers on_changed") if self.operator_password and self.erlang_cookie: - self.on.ready.emit() + if event.unit: + self.on.ready.emit(event.unit.name) def set_operator_password(self, password: str): """Set admin operator password in relation data bag.""" @@ -133,6 +169,11 @@ def store_password(self, username: str, password: str): logging.debug(f"Storing password for {username}") self.peers_rel.data[self.peers_rel.app][username] = password + def set_nodename(self, nodename: str): + """Advertise nodename to peers.""" + logging.debug(f"Setting nodename {nodename}") + self.peers_rel.data[self.model.unit][self.NODENAME] = nodename + def retrieve_password(self, username: str) -> str: """Retrieve persisted password for provided username.""" if not self.peers_rel: diff --git a/src/rabbit_extended_api.py b/src/rabbit_extended_api.py new file mode 100644 index 0000000..8a8fd84 --- /dev/null +++ b/src/rabbit_extended_api.py @@ -0,0 +1,85 @@ +# +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Learn more at: https://juju.is/docs/sdk + +"""Add missing functionality to rabbitmq_admin.AdminAPI.""" + +import json +import urllib + +import rabbitmq_admin + + +class ExtendedAdminApi(rabbitmq_admin.AdminAPI): + """Extend rabbitmq_admin.AdminAPI to cover missing endpoints the charm needs.""" + + def list_queues(self): + """A list of nodes in the RabbitMQ cluster.""" + return self._api_get("/api/queues") + + def get_queue(self, vhost, queue): + """A list of nodes in the RabbitMQ cluster.""" + return self._api_get( + "/api/queues/{}/{}".format( + urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(queue) + ) + ) + + def rebalance_queues(self): + """Rebalance the queues leaders.""" + return self._api_post("/api/rebalance/queues") + + def grow_queue( + self, node, selector, vhost_pattern=None, queue_pattern=None + ): + """Add a member to queues. + + Which queues have the member added is decided by the selector, + vhost_pattern and queue_pattern + """ + if not vhost_pattern: + vhost_pattern = ".*" + if not queue_pattern: + queue_pattern = ".*" + data = { + "strategy": selector, + "queue_pattern": queue_pattern, + "vhost_pattern": vhost_pattern, + } + self._api_post( + "/api/queues/quorum/replicas/on/{}/grow".format(node), data=data + ) + + def add_member(self, node, vhost, queue): + """Add a member to a queue.""" + data = {"node": node} + self._api_post( + "/api/queues/quorum/{}/{}/replicas/add".format( + urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(queue) + ), + data=data, + ) + + def delete_member(self, node, vhost, queue): + """Remove a member to a queue.""" + # rabbitmq_admin does not seem to handle json encoding for DELETE requests + data = json.dumps({"node": node}) + self._api_delete( + "/api/queues/quorum/{}/{}/replicas/delete".format( + urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(queue) + ), + data=data, + ) diff --git a/tests/bundles/smoke.yaml b/tests/bundles/smoke.yaml index 3c245bd..6c27d92 100644 --- a/tests/bundles/smoke.yaml +++ b/tests/bundles/smoke.yaml @@ -6,4 +6,4 @@ applications: scale: 1 trust: true resources: - rabbitmq-image: ghcr.io/openstack-snaps/rabbitmq:3.9.13 + rabbitmq-image: ghcr.io/openstack-snaps/rabbitmq:3.12.1 diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 64ef556..19b94f7 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -19,6 +19,7 @@ from unittest.mock import ( MagicMock, Mock, + call, patch, ) @@ -41,10 +42,12 @@ def setUp(self, *unused): self.harness.begin() # Setup RabbitMQ API mocking - mock_admin_api = MagicMock() - mock_admin_api.overview.return_value = {"product_version": "3.19.2"} + self.mock_admin_api = MagicMock() + self.mock_admin_api.overview.return_value = { + "product_version": "3.19.2" + } self.harness.charm._get_admin_api = Mock() - self.harness.charm._get_admin_api.return_value = mock_admin_api + self.harness.charm._get_admin_api.return_value = self.mock_admin_api # network_get is not implemented in the testing harness # so mock out for now @@ -158,3 +161,147 @@ def test_update_status(self): self.assertEqual( self.harness.model.unit.status, ops.model.ActiveStatus() ) + + def test_get_queue_growth_selector(self): + """Test the method chosen to grow a queue.""" + # 1->2 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(1, 1), "all" + ) + + # 1->2 + # 2->3 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(1, 2), "all" + ) + + # 1->2 + # 2->3 + # 3->3 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(1, 3), "individual" + ) + + # 2->3 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(2, 2), "all" + ) + + # 2->3 + # 3->3 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(2, 3), "even" + ) + + # 3->3 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(3, 3), "even" + ) + + # 3->3 + # 4->5 + # 5->5 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(3, 5), "even" + ) + + # 4->5 + self.assertEqual( + self.harness.charm.get_queue_growth_selector(4, 4), "even" + ) + + def test_generate_nodename(self): + """Test conversion of unit name to rabbit node name.""" + self.assertEqual( + self.harness.charm.generate_nodename("unit/1"), + "rabbit@unit-1.rabbitmq-k8s-endpoints", + ) + + def test_unit_in_cluster(self): + """Test check whether unit is in rabbit cluster.""" + self.mock_admin_api.list_nodes.return_value = [ + {"name": "rabbit@unit-1.rabbitmq-k8s-endpoints"} + ] + self.assertTrue(self.harness.charm.unit_in_cluster("unit/1")) + self.assertFalse(self.harness.charm.unit_in_cluster("unit/2")) + + def test_grow_queues_onto_unit(self): + """Test growing a queue onto a unit.""" + queue_one_member = { + "name": "queue1", + "vhost": "openstack", + "members": ["node1"], + } + queue_two_member = { + "name": "queue2", + "vhost": "openstack", + "members": ["node1", "node2"], + } + queue_three_member = { + "name": "queue3", + "vhost": "openstack", + "members": ["node1", "node2", "node3"], + } + self.mock_admin_api.list_queues.return_value = [queue_one_member] + self.harness.charm.grow_queues_onto_unit("unit/1") + self.mock_admin_api.grow_queue.assert_called_once_with( + "rabbit@unit-1.rabbitmq-k8s-endpoints", "all" + ) + + self.mock_admin_api.grow_queue.reset_mock() + self.mock_admin_api.list_queues.return_value = [ + queue_two_member, + queue_three_member, + ] + self.harness.charm.grow_queues_onto_unit("unit/1") + self.mock_admin_api.grow_queue.assert_called_once_with( + "rabbit@unit-1.rabbitmq-k8s-endpoints", "even" + ) + + self.mock_admin_api.grow_queue.reset_mock() + self.mock_admin_api.list_queues.return_value = [ + queue_one_member, + queue_two_member, + queue_three_member, + ] + self.harness.charm.grow_queues_onto_unit("unit/1") + self.mock_admin_api.add_member.assert_has_calls( + [ + call( + "rabbit@unit-1.rabbitmq-k8s-endpoints", + "openstack", + "queue1", + ), + call( + "rabbit@unit-1.rabbitmq-k8s-endpoints", + "openstack", + "queue2", + ), + ] + ) + + def test_add_member_action(self): + """Test actions for adding member to queue.""" + action_event = MagicMock() + action_event.params = { + "unit-name": "unit/1", + "vhost": "/", + "queue-name": "test_queue", + } + self.harness.charm._on_add_member_action(action_event) + self.mock_admin_api.add_member.assert_called_once_with( + "rabbit@unit-1.rabbitmq-k8s-endpoints", "/", "test_queue" + ) + + def test_delete_member_action(self): + """Test actions for adding member to queue.""" + action_event = MagicMock() + action_event.params = { + "unit-name": "unit/1", + "vhost": "/", + "queue-name": "test_queue", + } + self.harness.charm._on_delete_member_action(action_event) + self.mock_admin_api.delete_member.assert_called_once_with( + "rabbit@unit-1.rabbitmq-k8s-endpoints", "/", "test_queue" + )