diff --git a/lib/charms/hpc_libs/v0/juju_systemd_notices.py b/lib/charms/operator_libs_linux/v0/juju_systemd_notices.py similarity index 53% rename from lib/charms/hpc_libs/v0/juju_systemd_notices.py rename to lib/charms/operator_libs_linux/v0/juju_systemd_notices.py index 831f64e..08157c9 100644 --- a/lib/charms/hpc_libs/v0/juju_systemd_notices.py +++ b/lib/charms/operator_libs_linux/v0/juju_systemd_notices.py @@ -13,7 +13,71 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Systemd notices daemon for emitting Juju events.""" +"""Library for utilizing systemd to observe and emit notices when services change state. + +This library provides both the public API for observing systemd services from within +charmed operators, and utilities for running a minimal juju-systemd-notices daemon that watches +observed services running on the machine. The juju-systemd-notices daemon watches observed +services using DBus; it observes messages received on the DBus message bus and evaluates the +contents of the messages to determine if a service state-change event must be emitted. + +## How to use within a charmed operator (machine only) + +Here is an example of subscribing a charmed operator to observe the state of an internal +systemd service and handle events based on the current emitted state: + +```python +from charms.operator_libs_linux.v0.juju_systemd_notices import ( + ServiceStartedEvent, + ServiceStoppedEvent, + SystemdNotices, +) + + +class ApplicationCharm(CharmBase): + # Application charm that needs to observe the state of an internal service. + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + # Register services with charm. This adds the events to observe. + self._systemd_notices = SystemdNotices(self, ["slurmd"]) + self.framework.observe(self.on.install, self._on_install) + self.framework.observe(self.on.stop, self._on_stop) + self.framework.observe(self.on.service_slurmd_started, self._on_slurmd_started) + self.framework.observe(self.on.service_slurmd_stopped, self._on_slurmd_stopped) + + def _on_install(self, _: InstallEvent) -> None: + # Subscribe the charmed operator to the services on the machine. + # .subscribe() configures the notices hooks and starts the juju-systemd-notices daemon. + # The juju-systemd-notices daemon is per unit. This means that the unit name will be + # meshed into the service name. E.g. juju-systemd-notices becomes + # juju-{unit_name}-{unit_number}-systemd-notices. + self._systemd_notices.subscribe() + + def _on_start(self, _: StartEvent) -> None: + # This will trigger the juju-systemd-notices daemon to + # emit a `service-slurmd-started` event. + systemd.service_start("slurmd") + + def _on_stop(self, _: StopEvent) -> None: + # To stop the juju-systemd-notices service running in the background. + # .stop() also disables the juju-systemd-notices so that it does not + # start back up if the underlying machine is rebooted. + self._systemd_notices.stop() + + def _on_slurmd_started(self, _: ServiceStartedEvent) -> None: + self.unit.status = ActiveStatus() + time.sleep(60) + + # This will trigger the juju-systemd-notices daemon to + # emit a `service-slurmd-stopped` event. + systemd.service_stop("slurmd") + + def _on_slurmd_stopped(self, _: ServiceStoppedEvent) -> None: + self.unit.status = BlockedStatus("slurmd not running") +``` +""" __all__ = ["ServiceStartedEvent", "ServiceStoppedEvent", "SystemdNotices"] @@ -24,11 +88,9 @@ import re import signal import subprocess -import sys import textwrap from pathlib import Path -from types import MappingProxyType -from typing import Any, List, Union +from typing import List from dbus_fast.aio import MessageBus from dbus_fast.constants import BusType, MessageType @@ -37,51 +99,53 @@ from ops.charm import CharmBase from ops.framework import EventBase -LIBID = "HPCTEAMUSEONLY" +# The unique Charmhub library identifier, never change it. +LIBID = "2bb6ecd037e64c899033113abab02e01" + +# Increment this major API version when introducing breaking changes. LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version. LIBPATCH = 1 + +# juju-systemd-notices charm library dependencies. +# Charm library dependencies are installed when the consuming charm is packed. PYDEPS = ["dbus-fast>=1.90.2"] _logger = logging.getLogger(__name__) _juju_unit = None _service_states = {} _service_hook_regex_filter = re.compile(r"service-(?P[\w\\:-]*)-(?:started|stopped)") -_DBUS_CHAR_MAPPINGS = MappingProxyType( - { - "_40": "@", - "_2e": ".", - "_5f": "_", - "_2d": "-", - "_5c": "\\", - } -) +_DBUS_CHAR_MAPPINGS = { + "_5f": "_", # _ must be first since char mappings contain _. + "_40": "@", + "_2e": ".", + "_2d": "-", + "_5c": "\\", +} def _systemctl(*args) -> None: """Control systemd by via executed `systemctl ...` commands. - + Raises: subprocess.CalledProcessError: Raised if systemctl command fails. """ cmd = ["systemctl", *args] - _logger.debug(f"systemd: Executing command {cmd}") + _logger.debug("systemd: Executing command %s", cmd) try: subprocess.check_output(cmd) except subprocess.CalledProcessError as e: - _logger.error(f"systemctl command failed: {e}") - raise e + _logger.error("systemctl command failed: %s", e) + raise _daemon_reload = functools.partial(_systemctl, "daemon-reload") -_daemon_reload.__doc__ = "Reload systemd manager configuration." _start_service = functools.partial(_systemctl, "start") -_start_service.__doc__ = "Start systemd service unit." _stop_service = functools.partial(_systemctl, "stop") -_stop_service.__doc__ = "Stop systemd service unit." _enable_service = functools.partial(_systemctl, "enable") -_enable_service.__doc__ = "Enable systemd service." _disable_service = functools.partial(_systemctl, "disable") -_disable_service.__doc__ = "Disable systemd service." class ServiceStartedEvent(EventBase): @@ -93,31 +157,36 @@ class ServiceStoppedEvent(EventBase): class SystemdNotices: + """Observe systemd services on your machine base.""" - def __init__(self, charm: CharmBase, services: Union[str, List[str]]) -> None: - """Instantiate systemd notices service""" + def __init__(self, charm: CharmBase, services: List[str]) -> None: + """Instantiate systemd notices service.""" self._charm = charm - self._services = [services] if isinstance(services, str) else services - unit_name = self._charm.unit.name.replace('/', '-') + self._services = services + unit_name = self._charm.unit.name.replace("/", "-") self._service_file = Path(f"/etc/systemd/system/juju-{unit_name}-systemd-notices.service") - _logger.debug(f"Attaching systemd notice events to charm {self._charm.__class__.__name__}") + _logger.debug( + "Attaching systemd notice events to charm %s", self._charm.__class__.__name__ + ) for service in self._services: self._charm.on.define_event(f"service_{service}_started", ServiceStartedEvent) self._charm.on.define_event(f"service_{service}_stopped", ServiceStoppedEvent) def subscribe(self) -> None: """Subscribe charmed operator to observe status of systemd services.""" - _logger.debug(f"Generating systemd notice hooks for {self._services}") - for service in self._services: - for hook in {"started", "stopped"}: - hook = Path(f"hooks/service-{service}-{hook}") - if not hook.exists(): - hook.symlink_to(f"{Path.cwd()}/dispatch") - - _logger.debug(f"Starting {self._service_file.name} daemon") + _logger.debug("Generating systemd notice hooks for %s", self._services) + start_hooks = [Path(f"hooks/service-{service}-started") for service in self._services] + stop_hooks = [Path(f"hooks/service-{service}-stopped") for service in self._services] + for hook in start_hooks + stop_hooks: + if hook.exists(): + _logger.debug("Hook %s already exists. Skipping...", hook.name) + else: + hook.symlink_to(self._charm.framework.charm_dir / "dispatch") + + _logger.debug("Starting %s daemon", self._service_file.name) if self._service_file.exists(): - _logger.debug(f"Overwriting existing service file {self._service_file.name}") + _logger.debug("Overwriting existing service file %s", self._service_file.name) self._service_file.write_text( textwrap.dedent( f""" @@ -128,8 +197,8 @@ def subscribe(self) -> None: [Service] Type=simple Restart=always - WorkingDirectory={Path.cwd()} - Environment="PYTHONPATH={Path.cwd() / "venv"}" + WorkingDirectory={self._charm.framework.charm_dir} + Environment="PYTHONPATH={self._charm.framework.charm_dir / "venv"}" ExecStart=/usr/bin/python3 {__file__} {self._charm.unit.name} [Install] @@ -137,13 +206,13 @@ def subscribe(self) -> None: """ ).strip() ) - _logger.debug(f"Service file {self._service_file.name} written. Reloading systemd") + _logger.debug("Service file %s written. Reloading systemd", self._service_file.name) _daemon_reload() # Notices daemon is enabled so that the service will start even after machine reboot. # This functionality is needed in the event that a charm is rebooted to apply updates. _enable_service(self._service_file.name) _start_service(self._service_file.name) - _logger.debug(f"Started {self._service_file.name} daemon") + _logger.debug("Started %s daemon", self._service_file.name) def stop(self) -> None: """Stop charmed operator from observing the status of subscribed services.""" @@ -188,7 +257,7 @@ def _dbus_path_to_name(path: str) -> str: def _systemd_unit_changed(msg: Message) -> bool: - """Callback for systemd unit changes on the DBus bus. + """Send Juju notification if systemd unit state changes on the DBus bus. Invoked when a PropertiesChanged event occurs on an org.freedesktop.systemd1.Unit object across the dbus. These events are sent whenever a unit changes state, including @@ -201,27 +270,30 @@ def _systemd_unit_changed(msg: Message) -> bool: True if the event is processed. False if otherwise. """ _logger.debug( - f"Received message: path: {msg.path}, interface: {msg.interface}, member: {msg.member}" + "Received message: path: %s, interface: %s, member: %s", + msg.path, + msg.interface, + msg.member, ) service = _dbus_path_to_name(msg.path) properties = msg.body[1] - if 'ActiveState' not in properties: + if "ActiveState" not in properties: return False global _service_states if service not in _service_states: - _logger.debug(f"Dropping event for unwatched service: {service}") + _logger.debug("Dropping event for unwatched service: %s", service) return False - curr_state = properties['ActiveState'].value + curr_state = properties["ActiveState"].value prev_state = _service_states[service] # Drop transitioning and duplicate events if curr_state.endswith("ing") or curr_state == prev_state: - _logger.debug(f"Dropping event - service: {service}, state: {curr_state}") + _logger.debug("Dropping event - service: %s, state: %s", service, curr_state) return False _service_states[service] = curr_state - _logger.debug(f"Service {service} changed state to {curr_state}") + _logger.debug("Service %s changed state to %s", service, curr_state) # Run the hook in a separate thread so the dbus notifications aren't # blocked from being received. asyncio.create_task(_send_juju_notification(service, curr_state)) @@ -229,38 +301,32 @@ def _systemd_unit_changed(msg: Message) -> bool: async def _send_juju_notification(service: str, state: str) -> None: - """Invokes a Juju hook to notify that a service state has changed. + """Invoke a Juju hook to notify an operator that a service state has changed. Args: service: The name of the service which has changed state. state: The state of the service. """ if service.endswith(".service"): - service = service[0:-len(".service")] - if state == "active": - event_name = "started" - else: - event_name = "stopped" + service = service[0:-len(".service")] # fmt: skip + + event_name = "started" if state == "active" else "stopped" hook = f"service-{service}-{event_name}" - cmd = [ - "/usr/bin/juju-exec", - _juju_unit, - f"hooks/{hook}" - ] - - _logger.debug(f"Invoking hook {hook} with command: {' '.join(cmd)}") - process = await asyncio.create_subprocess_exec(*cmd, ) + cmd = ["/usr/bin/juju-exec", _juju_unit, f"hooks/{hook}"] + + _logger.debug("Invoking hook %s with command: %s", hook, " ".join(cmd)) + process = await asyncio.create_subprocess_exec(*cmd) await process.wait() if process.returncode: _logger.error( - f"Hook command '{' '.join(cmd)}' failed with returncode {process.returncode}" + "Hook command '%s' failed with returncode %s", " ".join(cmd), process.returncode ) else: - _logger.info(f"Hook command '{' '.join(cmd)}' succeeded.") + _logger.info("Hook command '%s' succeeded.", " ".join(cmd)) -async def _get_state(bus: MessageBus, service: str) -> str: - """Retrieves the current state of the specified service. +async def _get_service_state(bus: MessageBus, service: str) -> str: + """Report the current state of a service. Args: bus: The message bus to query on. @@ -271,11 +337,11 @@ async def _get_state(bus: MessageBus, service: str) -> str: """ obj_path = _name_to_dbus_path(service) try: - _logger.debug(f"Retrieving state for service {service} at object path: {obj_path}") + _logger.debug("Retrieving state for service %s at object path: %s", service, obj_path) introspection = await bus.introspect("org.freedesktop.systemd1", obj_path) proxy = bus.get_proxy_object("org.freedesktop.systemd1", obj_path, introspection) - properties = proxy.get_interface('org.freedesktop.DBus.Properties') - state = await properties.call_get('org.freedesktop.systemd1.Unit', 'ActiveState') # noqa + properties = proxy.get_interface("org.freedesktop.DBus.Properties") + state = await properties.call_get("org.freedesktop.systemd1.Unit", "ActiveState") # noqa return state.value except DBusError: # This will be thrown if the unit specified does not currently exist, @@ -284,7 +350,7 @@ async def _get_state(bus: MessageBus, service: str) -> str: async def _async_load_services() -> None: - """Loads the services from hooks for the unit. + """Load names of services to observe from legacy Juju hooks. Parses the hook names found in the charm hooks directory and determines if this is one of the services that the charm is interested in observing. @@ -298,20 +364,21 @@ async def _async_load_services() -> None: will be queried from systemd to determine it's initial state. """ global _juju_unit - hooks_dir = Path(f"{Path.cwd()}/hooks") - _logger.info(f"Loading services from hooks in {hooks_dir}") + hooks_dir = Path.cwd() / "hooks" + _logger.info("Loading services from hooks in %s", hooks_dir) if not hooks_dir.exists(): - _logger.warning(f"Hooks dir {hooks_dir} does not exist.") + _logger.warning("Hooks dir %s does not exist.", hooks_dir) return watched_services = [] # Get service-{service}-(started|stopped) hooks defined by the charm. - for hook in filter(lambda p: _service_hook_regex_filter.match(p.name), hooks_dir.iterdir()): + for hook in hooks_dir.iterdir(): match = _service_hook_regex_filter.match(hook.name) - watched_services.append(match.group("service")) + if match: + watched_services.append(match.group("service")) - _logger.info(f"Services from hooks are {watched_services}") + _logger.info("Services from hooks are %s", watched_services) if not watched_services: return @@ -320,19 +387,17 @@ async def _async_load_services() -> None: # Loop through all the services and be sure that a new watcher is # started for new ones. for service in watched_services: - # The .service suffix is not necessary and will cause lookup - # failures of the service unit when readying the watcher. - if not service.endswith(".service"): - service = f"{service}.service" - + # The .service suffix is necessary and will cause lookup failures of the + # service unit when readying the watcher if absent from the service name. + service = f"{service}.service" if service not in _service_states: - state = await _get_state(bus, service) - _logger.debug(f"Adding service '{service}' with initial state: {state}") + state = await _get_service_state(bus, service) + _logger.debug("Adding service '%s' with initial state: %s", service, state) _service_states[service] = state -def _load_services(loop: asyncio.AbstractEventLoop) -> None: - """Synchronous method for _async_load_services. +def _load_services(loop: asyncio.AbstractEventLoop) -> None: # pragma: no cover + """Load services synchronously using _async_load_services. This is a synchronous form of the _load_services method. This is called from a signal handler which cannot take coroutines, thus this method will schedule a @@ -357,36 +422,53 @@ async def _juju_systemd_notices_daemon() -> None: loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, stop_event.set) loop.add_signal_handler(signal.SIGTERM, stop_event.set) - loop.add_signal_handler(signal.SIGHUP, lambda loop=loop: _load_services(loop)) + # The event loop must be early bound to the lambda, otherwise the event loop + # will not exist within the lambda when the SIGHUP signal is received + # by the running notices daemon. + loop.add_signal_handler( # pragma: no branch + signal.SIGHUP, lambda loop=loop: _load_services(loop) + ) sysbus = await MessageBus(bus_type=BusType.SYSTEM).connect() await _async_load_services() - reply = await sysbus.call(Message( - destination='org.freedesktop.DBus', - path='/org/freedesktop/DBus', - interface='org.freedesktop.DBus', - member='AddMatch', - signature='s', - body=["path_namespace='/org/freedesktop/systemd1/unit',type='signal'," - "interface='org.freedesktop.DBus.Properties'"], - serial=sysbus.next_serial(), - )) + reply = await sysbus.call( + Message( + destination="org.freedesktop.DBus", + path="/org/freedesktop/DBus", + interface="org.freedesktop.DBus", + member="AddMatch", + signature="s", + body=[ + "path_namespace='/org/freedesktop/systemd1/unit',type='signal'," + "interface='org.freedesktop.DBus.Properties'" + ], + serial=sysbus.next_serial(), + ) + ) assert reply.message_type == MessageType.METHOD_RETURN sysbus.add_message_handler(_systemd_unit_changed) await stop_event.wait() -def _start_notices_service(): - """Main entry point to start the Juju systemd notices daemon from the shell. +def _main(): + """Invoke the Juju systemd notices daemon. - This method is invoked when this file is executed as a script by systemd. + This method is used to start the Juju systemd notices daemon when + juju_systemd_notices.py is executed as a script, not imported as a module. + + Raises: + argparse.ArgumentError: Raised if unit argument is absent. """ parser = argparse.ArgumentParser() parser.add_argument("-d", "--debug", action="store_true") parser.add_argument("unit", type=str) args = parser.parse_args() + # Intentionally set as global. + global _juju_unit + _juju_unit = args.unit + console_handler = logging.StreamHandler() if args.debug: _logger.setLevel(logging.DEBUG) @@ -394,18 +476,11 @@ def _start_notices_service(): else: _logger.setLevel(logging.INFO) console_handler.setLevel(logging.DEBUG) - _logger.addHandler(console_handler) - - # Intentionally set as global. - global _juju_unit - _juju_unit = args.unit - if not _juju_unit: - parser.print_usage() - sys.exit(2) + _logger.addHandler(console_handler) _logger.info("Starting juju systemd notices service") asyncio.run(_juju_systemd_notices_daemon()) if __name__ == "__main__": # pragma: nocover - _start_notices_service() + _main() diff --git a/src/charm.py b/src/charm.py index 3020ae8..3a84d37 100755 --- a/src/charm.py +++ b/src/charm.py @@ -8,7 +8,7 @@ from pathlib import Path from charms.fluentbit.v0.fluentbit import FluentbitClient -from charms.hpc_libs.v0.juju_systemd_notices import ( +from charms.operator_libs_linux.v0.juju_systemd_notices import ( ServiceStartedEvent, ServiceStoppedEvent, SystemdNotices, diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 3e10719..8b70fd0 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -58,7 +58,7 @@ def test_install_fail(self, defer) -> None: @patch("ops.model.Resources.fetch") @patch("utils.slurmd.override_default") @patch("utils.slurmd.override_service") - @patch("charms.hpc_libs.v0.juju_systemd_notices.SystemdNotices.subscribe") + @patch("charms.operator_libs_linux.v0.juju_systemd_notices.SystemdNotices.subscribe") @patch("ops.framework.EventBase.defer") def test_install_success(self, defer, *_) -> None: """Test install success behavior."""