From 4573371e092a06c7c65c4e677333f90473a682e0 Mon Sep 17 00:00:00 2001 From: Guillaume Boutry Date: Fri, 26 Jul 2024 16:54:52 +0200 Subject: [PATCH] Implement ensure-ha Implement ensure-ha utility both as an action and as an automated operation. Use action when queue management is disabled, or auto-ha-frequency is too long. Ensure-ha will add members to queue based on how many queues a member already has, to try to balance number of queues in larger clusters. Signed-off-by: Guillaume Boutry --- actions.yaml | 2 +- config.yaml | 4 + src/charm.py | 233 +++++++++++++++++++++++++++++++- src/interface_rabbitmq_peers.py | 2 +- tests/unit/test_charm.py | 112 +++++++++++++++ 5 files changed, 350 insertions(+), 3 deletions(-) diff --git a/actions.yaml b/actions.yaml index df79619..3afb20f 100644 --- a/actions.yaml +++ b/actions.yaml @@ -22,7 +22,7 @@ get-service-account: ensure-queue-ha: description: | - Check for queues that have insufficent memebers for high + Check for queues that have insufficent members for high availability and, if possible, add members to them. params: dry-run: diff --git a/config.yaml b/config.yaml index 108a5a6..9e68681 100644 --- a/config.yaml +++ b/config.yaml @@ -6,3 +6,7 @@ options: default: 3 description: Minimum number of queues replicas, set to 0 to disable charm automatically managing queue replicas type: int + auto-ha-frequency: + default: 30 + description: Frequency in minutes to check for queues that need HA members added + type: int diff --git a/src/charm.py b/src/charm.py index 5376d9b..65adb8f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,12 +17,15 @@ """RabbitMQ Operator Charm.""" +import collections import logging +import textwrap from ipaddress import ( IPv4Address, IPv6Address, ) from typing import ( + Dict, List, Union, ) @@ -45,6 +48,7 @@ from ops.charm import ( ActionEvent, CharmBase, + PebbleCustomNoticeEvent, ) from ops.framework import ( EventBase, @@ -83,6 +87,12 @@ SELECTOR_INDIVIDUAL = "individual" EPMD_SERVICE = "epmd" +NOTIFIER_SERVICE = "notifier" +TIMER_NOTICE = "rabbitmq.local/timer" + + +class RabbitOperatorError(Exception): + """Common base class for all RabbitMQ Operator exceptions.""" class RabbitMQOperatorCharm(CharmBase): @@ -165,6 +175,15 @@ def __init__(self, *args): self.on.get_service_account_action, self._get_service_account ) + self.framework.observe( + self.on.ensure_queue_ha_action, self._ensure_queue_ha_action + ) + + self.framework.observe( + self.on[RABBITMQ_CONTAINER].pebble_custom_notice, + self._on_pebble_custom_notice, + ) + def _pebble_ready(self) -> bool: """Check whether RabbitMQ container is up and configurable.""" return self.unit.get_container(RABBITMQ_CONTAINER).can_connect() @@ -208,6 +227,9 @@ def _on_config_changed(self, event: EventBase) -> None: # Render and push configuration files self._render_and_push_config_files() + # Render and push notifier script + notifier_changed = self._render_and_push_pebble_notifier() + # Get the rabbitmq container so we can configure/manipulate it container = self.unit.get_container(RABBITMQ_CONTAINER) @@ -232,6 +254,10 @@ def _on_config_changed(self, event: EventBase) -> None: else: logging.debug("RabbitMQ service is running") + # If the notifier script has changed, restart the notifier service + if notifier_changed: + container.restart(NOTIFIER_SERVICE) + @tenacity.retry( wait=tenacity.wait_exponential(multiplier=1, min=4, max=10) ) @@ -272,6 +298,12 @@ def _rabbitmq_layer(self) -> dict: "user": RABBITMQ_USER, "group": RABBITMQ_GROUP, }, + NOTIFIER_SERVICE: { + "override": "replace", + "summary": "Pebble notifier", + "command": "/usr/bin/notifier", + "startup": "enabled", + }, }, } @@ -482,6 +514,27 @@ def _on_gone_away_amqp_clients(self, event) -> None: self.peers.delete_user(username) + def _on_pebble_custom_notice(self, event: PebbleCustomNoticeEvent): + """Handle pebble custom notice event.""" + if event.notice.key == TIMER_NOTICE: + if not self.unit.is_leader(): + logger.debug("Not a leader unit, nothing to do") + return + if not self._manage_queues(): + logger.debug("Queue management disabled, nothing to do") + return + if not self.rabbit_running: + logger.debug("RabbitMQ not running, deferring") + event.defer() + return + if not self.peers.operator_user_created: + logger.debug("Operator user not created, deferring") + event.defer() + return + self.ensure_queue_ha() + self._on_update_status(event) + return + @property def amqp_rel(self) -> Relation: """AMQP relation.""" @@ -808,6 +861,38 @@ def _render_and_push_rabbitmq_conf(self) -> None: "/etc/rabbitmq/rabbitmq.conf", rabbitmq_conf, make_dirs=True ) + def _render_and_push_pebble_notifier(self) -> bool: + """Render notifier script and push to workload container.""" + auto_ha_frequency = int(self.config["auto-ha-frequency"]) + if auto_ha_frequency < 1: + msg = "auto-ha-frequency must be greater than 0" + logger.error(msg) + raise RabbitOperatorError(msg) + container = self.unit.get_container(RABBITMQ_CONTAINER) + notifier = textwrap.dedent( + f"""#!/bin/bash + while true; do + echo "Next event at $(date -d '+{auto_ha_frequency} minutes')" + sleep {auto_ha_frequency*60} + echo "Notifying operator of timer event" + /charm/bin/pebble notify {TIMER_NOTICE} + done + """ + ) + try: + with container.pull("/usr/bin/notifier") as stream: + content = stream.read() + except PathError: + content = None + if content == notifier: + logger.debug("Notifier script unchanged, skipping push") + return False + logger.info("Pushing new notifier script") + container.push( + "/usr/bin/notifier", notifier, make_dirs=True, permissions=0o755 + ) + return True + def generate_nodename(self, unit_name) -> str: """K8S DNS nodename for local unit.""" return ( @@ -853,7 +938,7 @@ def min_replicas(self) -> int: """The minimum number of replicas a queue should have.""" return self.config.get("minimum-replicas") - def get_undersized_queues(self) -> List[str]: + def get_undersized_queues(self) -> List[dict]: """Return a list of queues which have fewer members than minimum.""" api = self._get_admin_api() undersized_queues = [ @@ -990,6 +1075,152 @@ def _get_service_account(self, event: ActionEvent) -> None: logging.error(msg) event.fail(msg) + def _add_members_to_undersized_queues( + self, + api: rabbit_extended_api.ExtendedAdminApi, + nodes: list[str], + undersized_queues: List[dict], + replicas: int, + dry_run: bool, + ) -> List[str]: + """Add members to undersized queues. + + Simple algorithm to select nodes with fewest queues to add replicas on. + """ + queues = api.list_quorum_queues() + + nodes_queues_count = collections.Counter({node: 0 for node in nodes}) + # Get a count of how many queues each node is a member of + for queue in queues: + for member in queue["members"]: + nodes_queues_count[member] += 1 + + replicated_queues = [] + for queue in undersized_queues: + needed_replicas = replicas - len(queue["members"]) + # select node with fewest queues + sorted_count = nodes_queues_count.most_common() + node_candidates = [] + for node, _ in reversed(sorted_count): + if node not in queue["members"]: + node_candidates.append(node) + if len(node_candidates) >= needed_replicas: + break + if len(node_candidates) < needed_replicas: + logger.warning( + "Not enough nodes found to replicate queue %s to HA," + " availables nodes: %s, needed nodes %s", + queue["name"], + len(node_candidates), + needed_replicas, + ) + logger.debug( + "Replicating queue %r to nodes %s, dry_run=%s", + queue["name"], + ", ".join(node_candidates), + dry_run, + ) + if not dry_run: + for node in node_candidates: + api.add_member(node, queue["vhost"], queue["name"]) + nodes_queues_count[node] += 1 + replicated_queues.append(queue["name"]) + logger.info( + "Replicated %s queues to ensure HA, dry_run=%s", + len(replicated_queues), + dry_run, + ) + return replicated_queues + + def ensure_queue_ha(self, dry_run: bool = False) -> Dict[str, int]: + """Ensure queue has HA. + + The role of this function is to ensure that all queues are available on + at least the minimum number of replicas. If there is not enough nodes to + support the minimum number of replicas, then this function will early + exit and not replicate any queues. + + Must be called on the leader. + + :param dry_run: Whether to perform a dry run + """ + undersized_queues = self.get_undersized_queues() + if len(undersized_queues) < 1: + msg = "No undersized queues found" + logger.debug(msg) + return { + "undersized-queues": 0, + "replicated-queues": 0, + } + + api = self._get_admin_api() + nodes = [node["name"] for node in api.list_nodes()] + min_replicas = self.min_replicas() + + if len(nodes) < min_replicas: + msg = ( + "Not enough nodes to ensure queue HA, availables nodes:" + f" {len(nodes)}, needed nodes {min_replicas}" + ) + logger.debug(msg) + raise RabbitOperatorError(msg) + + replicated_queues = self._add_members_to_undersized_queues( + api, nodes, undersized_queues, min_replicas, dry_run + ) + + if len(replicated_queues) > 0: + logger.debug("Rebalancing queues") + api.rebalance_queues() + + return { + "undersized-queues": len(undersized_queues), + "replicated-queues": len(replicated_queues), + } + + def _ensure_queue_ha_action(self, event: ActionEvent) -> None: + """Ensure queue has HA action. + + :param event: The current event + """ + if not self.unit.is_leader(): + msg = "Not leader unit, unable to ensure queue HA" + logger.error(msg) + event.fail(msg) + return + + if not self.rabbit_running: + msg = "RabbitMQ not running, unable to ensure queue HA" + logger.error(msg) + event.fail(msg) + return + + if not self.peers.operator_user_created: + msg = "Operator user not created, unable to ensure queue HA" + logger.error(msg) + event.fail(msg) + return + + if not self._manage_queues(): + msg = "Queue management is disabled, unable to ensure queue HA" + logger.error(msg) + event.fail(msg) + return + + dry_run = event.params.get("dry-run", False) + try: + result = self.ensure_queue_ha(dry_run=dry_run) + except RabbitOperatorError as e: + event.fail(str(e)) + return + event.set_results( + { + **result, + "dry-run": dry_run, + } + ) + self._on_update_status(event) + if __name__ == "__main__": main(RabbitMQOperatorCharm) diff --git a/src/interface_rabbitmq_peers.py b/src/interface_rabbitmq_peers.py index 227d6bb..f607045 100644 --- a/src/interface_rabbitmq_peers.py +++ b/src/interface_rabbitmq_peers.py @@ -131,7 +131,7 @@ def on_created(self, event): def on_broken(self, event): """Relation broken event handler.""" logging.debug("RabbitMQOperatorPeers on_broken") - self.on.gonewaway.emit() + self.on.goneaway.emit() def on_departed(self, event): """Relation broken event handler.""" diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index dc48975..203a479 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -83,6 +83,12 @@ def test_rabbitmq_pebble_ready(self): "group": "rabbitmq", "requires": ["epmd"], }, + "notifier": { + "command": "/usr/bin/notifier", + "override": "replace", + "startup": "enabled", + "summary": "Pebble notifier", + }, "epmd": { "override": "replace", "summary": "Erlang EPM service", @@ -308,3 +314,109 @@ def test_delete_member_action(self): self.mock_admin_api.delete_member.assert_called_once_with( "rabbit@unit-1.rabbitmq-k8s-endpoints", "/", "test_queue" ) + + def test_ensure_ha_is_called_when_unit_is_leader_and_ready(self): + """Test the notifier custom notice.""" + self.harness.set_leader(True) + self.harness.set_can_connect(charm.RABBITMQ_CONTAINER, True) + peers_relation_id = self.harness.add_relation("peers", "rabbitmq-k8s") + self.harness.add_relation_unit(peers_relation_id, "rabbitmq-k8s/0") + self.harness.update_relation_data( + peers_relation_id, + self.harness.charm.app.name, + { + "operator_password": "foobar", + "operator_user_created": "rmqadmin", + "erlang_cookie": "magicsecurity", + }, + ) + self.harness.charm.ensure_queue_ha = Mock() + self.harness.pebble_notify( + charm.RABBITMQ_CONTAINER, charm.TIMER_NOTICE + ) + self.harness.charm.ensure_queue_ha.assert_called_once() + + def test_ensure_ha_is_not_called_when_unit_is_not_leader(self): + """Test the notifier custom notice when not leader.""" + self.harness.set_leader(False) + self.harness.set_can_connect(charm.RABBITMQ_CONTAINER, True) + peers_relation_id = self.harness.add_relation("peers", "rabbitmq-k8s") + self.harness.add_relation_unit(peers_relation_id, "rabbitmq-k8s/0") + self.harness.update_relation_data( + peers_relation_id, + self.harness.charm.app.name, + { + "operator_password": "foobar", + "operator_user_created": "rmqadmin", + "erlang_cookie": "magicsecurity", + }, + ) + self.harness.charm.ensure_queue_ha = Mock() + self.harness.pebble_notify( + charm.RABBITMQ_CONTAINER, charm.TIMER_NOTICE + ) + self.harness.charm.ensure_queue_ha.assert_not_called() + + def test_no_undersized_queues(self): + """Test nothing is done when no undersized queues.""" + self.mock_admin_api.list_quorum_queues.return_value = [] + nodes = ["node1", "node2", "node3"] + undersized_queues = [] + + result = self.harness.charm._add_members_to_undersized_queues( + self.mock_admin_api, nodes, undersized_queues, 3, False + ) + self.assertEqual(result, []) + + def test_not_enough_nodes_to_replicate(self): + """Test that the queues are still added to existing nodes. + + Even if there are not enough nodes to replicate, the charm will add to + existing nodes if they are some available. + """ + queues = [{"name": "queue1", "members": ["node1"], "vhost": "/"}] + self.mock_admin_api.list_quorum_queues.return_value = queues + nodes = ["node1", "node2"] + undersized_queues = queues + + result = self.harness.charm._add_members_to_undersized_queues( + self.mock_admin_api, nodes, undersized_queues, 3, False + ) + self.assertEqual(result, ["queue1"]) + self.mock_admin_api.add_member.assert_called_once_with( + "node2", "/", "queue1" + ) + + def test_exact_number_of_nodes_needed(self): + """Test that the queues are added to the correct nodes. + + Order matters since the algorithm will to the node with the least + members first. + """ + self.mock_admin_api.list_quorum_queues.return_value = [ + {"name": "queue1", "members": ["node1"], "vhost": "/"}, + {"name": "queue2", "members": ["node2"], "vhost": "/"}, + { + "name": "queue3", + "members": ["node1", "node2", "node3"], + "vhost": "/", + }, + ] + nodes = ["node1", "node2", "node3"] + undersized_queues = [ + {"name": "queue1", "members": ["node1"], "vhost": "/"}, + {"name": "queue2", "members": ["node2"], "vhost": "/"}, + ] + + result = self.harness.charm._add_members_to_undersized_queues( + self.mock_admin_api, nodes, undersized_queues, 3, False + ) + self.assertEqual(result, ["queue1", "queue2"]) + self.mock_admin_api.add_member.assert_has_calls( + [ + call("node3", "/", "queue1"), + call("node2", "/", "queue1"), + call("node3", "/", "queue2"), + call("node1", "/", "queue2"), + ] + )