Skip to content

Commit

Permalink
Merge pull request #16 from gboutry/feat/ensure-ha
Browse files Browse the repository at this point in the history
Implement ensure-ha
  • Loading branch information
hemanthnakkina authored Jul 30, 2024
2 parents 54d248a + 4573371 commit ab9d01b
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 3 deletions.
2 changes: 1 addition & 1 deletion actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ get-service-account:

ensure-queue-ha:
description: |
Check for queues that have insufficent memebers for high
Check for queues that have insufficent members for high
availability and, if possible, add members to them.
params:
dry-run:
Expand Down
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ options:
default: 3
description: Minimum number of queues replicas, set to 0 to disable charm automatically managing queue replicas
type: int
auto-ha-frequency:
default: 30
description: Frequency in minutes to check for queues that need HA members added
type: int
233 changes: 232 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

"""RabbitMQ Operator Charm."""

import collections
import logging
import textwrap
from ipaddress import (
IPv4Address,
IPv6Address,
)
from typing import (
Dict,
List,
Union,
)
Expand All @@ -45,6 +48,7 @@
from ops.charm import (
ActionEvent,
CharmBase,
PebbleCustomNoticeEvent,
)
from ops.framework import (
EventBase,
Expand Down Expand Up @@ -83,6 +87,12 @@
SELECTOR_INDIVIDUAL = "individual"

EPMD_SERVICE = "epmd"
NOTIFIER_SERVICE = "notifier"
TIMER_NOTICE = "rabbitmq.local/timer"


class RabbitOperatorError(Exception):
"""Common base class for all RabbitMQ Operator exceptions."""


class RabbitMQOperatorCharm(CharmBase):
Expand Down Expand Up @@ -165,6 +175,15 @@ def __init__(self, *args):
self.on.get_service_account_action, self._get_service_account
)

self.framework.observe(
self.on.ensure_queue_ha_action, self._ensure_queue_ha_action
)

self.framework.observe(
self.on[RABBITMQ_CONTAINER].pebble_custom_notice,
self._on_pebble_custom_notice,
)

def _pebble_ready(self) -> bool:
"""Check whether RabbitMQ container is up and configurable."""
return self.unit.get_container(RABBITMQ_CONTAINER).can_connect()
Expand Down Expand Up @@ -208,6 +227,9 @@ def _on_config_changed(self, event: EventBase) -> None:
# Render and push configuration files
self._render_and_push_config_files()

# Render and push notifier script
notifier_changed = self._render_and_push_pebble_notifier()

# Get the rabbitmq container so we can configure/manipulate it
container = self.unit.get_container(RABBITMQ_CONTAINER)

Expand All @@ -232,6 +254,10 @@ def _on_config_changed(self, event: EventBase) -> None:
else:
logging.debug("RabbitMQ service is running")

# If the notifier script has changed, restart the notifier service
if notifier_changed:
container.restart(NOTIFIER_SERVICE)

@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10)
)
Expand Down Expand Up @@ -272,6 +298,12 @@ def _rabbitmq_layer(self) -> dict:
"user": RABBITMQ_USER,
"group": RABBITMQ_GROUP,
},
NOTIFIER_SERVICE: {
"override": "replace",
"summary": "Pebble notifier",
"command": "/usr/bin/notifier",
"startup": "enabled",
},
},
}

Expand Down Expand Up @@ -482,6 +514,27 @@ def _on_gone_away_amqp_clients(self, event) -> None:

self.peers.delete_user(username)

def _on_pebble_custom_notice(self, event: PebbleCustomNoticeEvent):
"""Handle pebble custom notice event."""
if event.notice.key == TIMER_NOTICE:
if not self.unit.is_leader():
logger.debug("Not a leader unit, nothing to do")
return
if not self._manage_queues():
logger.debug("Queue management disabled, nothing to do")
return
if not self.rabbit_running:
logger.debug("RabbitMQ not running, deferring")
event.defer()
return
if not self.peers.operator_user_created:
logger.debug("Operator user not created, deferring")
event.defer()
return
self.ensure_queue_ha()
self._on_update_status(event)
return

@property
def amqp_rel(self) -> Relation:
"""AMQP relation."""
Expand Down Expand Up @@ -808,6 +861,38 @@ def _render_and_push_rabbitmq_conf(self) -> None:
"/etc/rabbitmq/rabbitmq.conf", rabbitmq_conf, make_dirs=True
)

def _render_and_push_pebble_notifier(self) -> bool:
"""Render notifier script and push to workload container."""
auto_ha_frequency = int(self.config["auto-ha-frequency"])
if auto_ha_frequency < 1:
msg = "auto-ha-frequency must be greater than 0"
logger.error(msg)
raise RabbitOperatorError(msg)
container = self.unit.get_container(RABBITMQ_CONTAINER)
notifier = textwrap.dedent(
f"""#!/bin/bash
while true; do
echo "Next event at $(date -d '+{auto_ha_frequency} minutes')"
sleep {auto_ha_frequency*60}
echo "Notifying operator of timer event"
/charm/bin/pebble notify {TIMER_NOTICE}
done
"""
)
try:
with container.pull("/usr/bin/notifier") as stream:
content = stream.read()
except PathError:
content = None
if content == notifier:
logger.debug("Notifier script unchanged, skipping push")
return False
logger.info("Pushing new notifier script")
container.push(
"/usr/bin/notifier", notifier, make_dirs=True, permissions=0o755
)
return True

def generate_nodename(self, unit_name) -> str:
"""K8S DNS nodename for local unit."""
return (
Expand Down Expand Up @@ -853,7 +938,7 @@ def min_replicas(self) -> int:
"""The minimum number of replicas a queue should have."""
return self.config.get("minimum-replicas")

def get_undersized_queues(self) -> List[str]:
def get_undersized_queues(self) -> List[dict]:
"""Return a list of queues which have fewer members than minimum."""
api = self._get_admin_api()
undersized_queues = [
Expand Down Expand Up @@ -990,6 +1075,152 @@ def _get_service_account(self, event: ActionEvent) -> None:
logging.error(msg)
event.fail(msg)

def _add_members_to_undersized_queues(
self,
api: rabbit_extended_api.ExtendedAdminApi,
nodes: list[str],
undersized_queues: List[dict],
replicas: int,
dry_run: bool,
) -> List[str]:
"""Add members to undersized queues.
Simple algorithm to select nodes with fewest queues to add replicas on.
"""
queues = api.list_quorum_queues()

nodes_queues_count = collections.Counter({node: 0 for node in nodes})
# Get a count of how many queues each node is a member of
for queue in queues:
for member in queue["members"]:
nodes_queues_count[member] += 1

replicated_queues = []
for queue in undersized_queues:
needed_replicas = replicas - len(queue["members"])
# select node with fewest queues
sorted_count = nodes_queues_count.most_common()
node_candidates = []
for node, _ in reversed(sorted_count):
if node not in queue["members"]:
node_candidates.append(node)
if len(node_candidates) >= needed_replicas:
break
if len(node_candidates) < needed_replicas:
logger.warning(
"Not enough nodes found to replicate queue %s to HA,"
" availables nodes: %s, needed nodes %s",
queue["name"],
len(node_candidates),
needed_replicas,
)
logger.debug(
"Replicating queue %r to nodes %s, dry_run=%s",
queue["name"],
", ".join(node_candidates),
dry_run,
)
if not dry_run:
for node in node_candidates:
api.add_member(node, queue["vhost"], queue["name"])
nodes_queues_count[node] += 1
replicated_queues.append(queue["name"])
logger.info(
"Replicated %s queues to ensure HA, dry_run=%s",
len(replicated_queues),
dry_run,
)
return replicated_queues

def ensure_queue_ha(self, dry_run: bool = False) -> Dict[str, int]:
"""Ensure queue has HA.
The role of this function is to ensure that all queues are available on
at least the minimum number of replicas. If there is not enough nodes to
support the minimum number of replicas, then this function will early
exit and not replicate any queues.
Must be called on the leader.
:param dry_run: Whether to perform a dry run
"""
undersized_queues = self.get_undersized_queues()
if len(undersized_queues) < 1:
msg = "No undersized queues found"
logger.debug(msg)
return {
"undersized-queues": 0,
"replicated-queues": 0,
}

api = self._get_admin_api()
nodes = [node["name"] for node in api.list_nodes()]
min_replicas = self.min_replicas()

if len(nodes) < min_replicas:
msg = (
"Not enough nodes to ensure queue HA, availables nodes:"
f" {len(nodes)}, needed nodes {min_replicas}"
)
logger.debug(msg)
raise RabbitOperatorError(msg)

replicated_queues = self._add_members_to_undersized_queues(
api, nodes, undersized_queues, min_replicas, dry_run
)

if len(replicated_queues) > 0:
logger.debug("Rebalancing queues")
api.rebalance_queues()

return {
"undersized-queues": len(undersized_queues),
"replicated-queues": len(replicated_queues),
}

def _ensure_queue_ha_action(self, event: ActionEvent) -> None:
"""Ensure queue has HA action.
:param event: The current event
"""
if not self.unit.is_leader():
msg = "Not leader unit, unable to ensure queue HA"
logger.error(msg)
event.fail(msg)
return

if not self.rabbit_running:
msg = "RabbitMQ not running, unable to ensure queue HA"
logger.error(msg)
event.fail(msg)
return

if not self.peers.operator_user_created:
msg = "Operator user not created, unable to ensure queue HA"
logger.error(msg)
event.fail(msg)
return

if not self._manage_queues():
msg = "Queue management is disabled, unable to ensure queue HA"
logger.error(msg)
event.fail(msg)
return

dry_run = event.params.get("dry-run", False)
try:
result = self.ensure_queue_ha(dry_run=dry_run)
except RabbitOperatorError as e:
event.fail(str(e))
return
event.set_results(
{
**result,
"dry-run": dry_run,
}
)
self._on_update_status(event)


if __name__ == "__main__":
main(RabbitMQOperatorCharm)
2 changes: 1 addition & 1 deletion src/interface_rabbitmq_peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def on_created(self, event):
def on_broken(self, event):
"""Relation broken event handler."""
logging.debug("RabbitMQOperatorPeers on_broken")
self.on.gonewaway.emit()
self.on.goneaway.emit()

def on_departed(self, event):
"""Relation broken event handler."""
Expand Down
Loading

0 comments on commit ab9d01b

Please sign in to comment.