From 87b21319e80abe8b89f7800e45cd1ad48d6e1355 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 14 Feb 2025 10:18:42 +0100 Subject: [PATCH 1/7] fix: Another attempt at fixing this weird issue --- .pre-commit-config.yaml | 10 +++++----- perun/backend/__init__.py | 18 ++++++++++++++++++ perun/backend/backend.py | 5 +++++ perun/backend/nvml.py | 3 ++- perun/backend/rocmsmi.py | 3 ++- perun/core.py | 24 +++++++++--------------- perun/monitoring/monitor.py | 2 +- perun/monitoring/subprocess.py | 20 ++++++++++++++++++-- 8 files changed, 60 insertions(+), 25 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 861bf35..2fd77f1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,4 @@ repos: - - repo: https://github.com/commitizen-tools/commitizen - rev: v4.2.1 - hooks: - - id: commitizen - stages: [commit-msg] - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.15.0 # Use the sha / tag you want to point at hooks: @@ -37,3 +32,8 @@ repos: - id: "validate-cff" args: - "--verbose" + - repo: https://github.com/commitizen-tools/commitizen + rev: v4.2.1 + hooks: + - id: commitizen + stages: [commit-msg] diff --git a/perun/backend/__init__.py b/perun/backend/__init__.py index bbc4de9..adca1b2 100644 --- a/perun/backend/__init__.py +++ b/perun/backend/__init__.py @@ -1 +1,19 @@ """Backend module.""" + +from typing import Dict, Type + +from .backend import Backend +from .nvml import NVMLBackend +from .powercap_rapl import PowercapRAPLBackend +from .psutil import PSUTILBackend +from .rocmsmi import ROCMBackend +from .util import getBackendMetadata, getHostMetadata + +available_backends: Dict[str, Type[Backend]] = { + "NVMLBackend": NVMLBackend, + "PowercapRAPLBackend": PowercapRAPLBackend, + "PSUTILBackend": PSUTILBackend, + "ROCMBackend": ROCMBackend, +} + +__all__ = ["getBackendMetadata", "getHostMetadata"] diff --git a/perun/backend/backend.py b/perun/backend/backend.py index 707063d..8994aca 100644 --- a/perun/backend/backend.py +++ b/perun/backend/backend.py @@ -26,6 +26,11 @@ def __init__(self) -> None: self._metadata: Dict = {} log.info(f"Initialized {self.name} backend") + def __del__(self): + """Backend cleanup method.""" + log.debug("Deleting backend.") + self.close() + @property def metadata(self) -> Dict: """Return backend metadata.""" diff --git a/perun/backend/nvml.py b/perun/backend/nvml.py index f848eac..97a3ec4 100644 --- a/perun/backend/nvml.py +++ b/perun/backend/nvml.py @@ -44,7 +44,8 @@ def setup(self): def close(self): """Backend shutdown code.""" - self.pynvml.nvmlShutdown() + if hasattr(self, "pynvml"): + self.pynvml.nvmlShutdown() def availableSensors(self) -> Dict[str, Tuple]: """Return string ids of visible devices. diff --git a/perun/backend/rocmsmi.py b/perun/backend/rocmsmi.py index 031046e..921d68e 100644 --- a/perun/backend/rocmsmi.py +++ b/perun/backend/rocmsmi.py @@ -44,7 +44,8 @@ def setup(self): def close(self): """Backend cleanup.""" - self.amdsmi.amdsmi_shut_down() + if hasattr(self, "amdsmi"): + self.amdsmi.amdsmi_shut_down() def availableSensors(self) -> Dict[str, Tuple]: """Return string ids of visible devices. diff --git a/perun/core.py b/perun/core.py index 55f20d9..998a501 100644 --- a/perun/core.py +++ b/perun/core.py @@ -9,15 +9,15 @@ from configparser import ConfigParser from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, Tuple from perun import __version__ -from perun.backend.backend import Backend -from perun.backend.nvml import NVMLBackend -from perun.backend.powercap_rapl import PowercapRAPLBackend -from perun.backend.psutil import PSUTILBackend -from perun.backend.rocmsmi import ROCMBackend -from perun.backend.util import getBackendMetadata, getHostMetadata +from perun.backend import ( + Backend, + available_backends, + getBackendMetadata, + getHostMetadata, +) from perun.comm import Comm from perun.configuration import sanitize_config from perun.coordination import assignSensors, getHostRankDict @@ -104,15 +104,9 @@ def backends(self) -> Dict[str, Backend]: """ if not self._backends: self._backends = {} - classList: Dict[str, Type[Backend]] = { - "PowercapRAPL": PowercapRAPLBackend, - "NVML": NVMLBackend, - "PSUTIL": PSUTILBackend, - "ROCM": ROCMBackend, - } - for name, backend in classList.items(): + for name, backend_class in available_backends.items(): try: - backend_instance = backend() + backend_instance = backend_class() self._backends[backend_instance.id] = backend_instance except ImportError as ie: log.info(f"Missing dependencies for backend {name}") diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index d38c5ca..3e00668 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -96,6 +96,7 @@ def __init__( self._l_assigned_sensors = l_assigned_sensors self._config = config self.status = MonitorStatus.SETUP + multiprocessing.set_start_method("spawn") self._reset_subprocess_handlers() def _reset_subprocess_handlers(self) -> None: @@ -197,7 +198,6 @@ def _run_python_app( args=[ self.queue, self._comm.Get_rank(), - self._backends, self._l_assigned_sensors, self._config, self.sp_ready_event, diff --git a/perun/monitoring/subprocess.py b/perun/monitoring/subprocess.py index 2c82cbf..5ce347b 100644 --- a/perun/monitoring/subprocess.py +++ b/perun/monitoring/subprocess.py @@ -9,7 +9,7 @@ import numpy as np -from perun.backend.backend import Backend +from perun.backend import Backend, available_backends from perun.data_model.data import DataNode, NodeType, RawData from perun.data_model.measurement_type import Magnitude, MetricMetaData, Unit from perun.data_model.sensor import DeviceType, Sensor @@ -78,6 +78,7 @@ def _monitoringLoop( delta = (time.time_ns() - timesteps[-1]) * 1e-9 while not stopCondition(delta): + log.debug("Loop") timesteps.append(time.time_ns()) for idx, device in enumerate(lSensors): rawValues[idx].append(device.read()) @@ -167,7 +168,6 @@ def createNode( def perunSubprocess( queue: Queue, rank: int, - backends: Dict[str, Backend], l_assigned_sensors: Dict[str, Tuple], perunConfig: ConfigParser, sp_ready_event, @@ -197,6 +197,18 @@ def perunSubprocess( Sampling period in seconds """ log.debug(f"Rank {rank}: Subprocess: Entered perunSubprocess") + backends: Dict[str, Backend] = {} + for name, backend_class in available_backends.items(): + try: + backend_instance = backend_class() + backends[backend_instance.id] = backend_instance + except ImportError as ie: + log.info(f"Missing dependencies for backend {name}") + log.info(ie) + except Exception as e: + log.info(f"Unknown error loading dependecy {name}") + log.info(e) + log.debug("Initialized backends.") ( timesteps, t_metadata, @@ -227,4 +239,8 @@ def perunSubprocess( # This should send a single processed node for the current computational node queue.put(hostNode, block=True) log.info(f"Rank {rank}: Subprocess: Sent data") + # Close backends + for backend in backends: + log.debug(f"Closing backend {backend}") + del backend return hostNode From b6b1cfc4dc498925d66aa39034fbfe1029f17af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 14 Feb 2025 10:32:21 +0100 Subject: [PATCH 2/7] fix: no double initialization of mp start method --- perun/monitoring/monitor.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index 3e00668..361a214 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -49,6 +49,9 @@ class MonitorStatus(enum.Enum): SUCCESS = enum.auto() +PERUN_MP_START_METHOD = "spawn" + + class PerunMonitor: """ The PerunMonitor class is responsible for monitoring the application and collecting data. @@ -96,7 +99,9 @@ def __init__( self._l_assigned_sensors = l_assigned_sensors self._config = config self.status = MonitorStatus.SETUP - multiprocessing.set_start_method("spawn") + + if multiprocessing.get_start_method() != PERUN_MP_START_METHOD: + multiprocessing.set_start_method(PERUN_MP_START_METHOD) self._reset_subprocess_handlers() def _reset_subprocess_handlers(self) -> None: From 21204f4eef620b52cd779243fd3811aaa3f49154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 14 Feb 2025 10:34:05 +0100 Subject: [PATCH 3/7] fix: removed double pre-commit run --- .pre-commit-config.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2fd77f1..f462131 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,5 @@ +default_stages: + - pre-commit repos: - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.15.0 # Use the sha / tag you want to point at From 01068e1f5dd35730f427ce8129f8d0fe1fbc4fb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 14 Feb 2025 10:39:58 +0100 Subject: [PATCH 4/7] fix: catching the exception --- perun/monitoring/monitor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index 361a214..80809d0 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -49,7 +49,7 @@ class MonitorStatus(enum.Enum): SUCCESS = enum.auto() -PERUN_MP_START_METHOD = "spawn" +PERUN_MP_START_METHOD: str = "spawn" class PerunMonitor: @@ -100,8 +100,13 @@ def __init__( self._config = config self.status = MonitorStatus.SETUP + log.debug(f"MP Start methods: {multiprocessing.get_start_method()}") if multiprocessing.get_start_method() != PERUN_MP_START_METHOD: - multiprocessing.set_start_method(PERUN_MP_START_METHOD) + try: + multiprocessing.set_start_method(PERUN_MP_START_METHOD) + except Exception as e: + log.warning(e) + self._reset_subprocess_handlers() def _reset_subprocess_handlers(self) -> None: From 05265c15140ceeaef8bd4eeedadad3e1c3ca0a9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 14 Feb 2025 10:40:04 +0100 Subject: [PATCH 5/7] fix: catching the exception --- perun/monitoring/monitor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index 80809d0..bd68107 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -105,6 +105,7 @@ def __init__( try: multiprocessing.set_start_method(PERUN_MP_START_METHOD) except Exception as e: + print(e) log.warning(e) self._reset_subprocess_handlers() From 40ae7c86054ca6c90ed4a6c0d502db2b8556dffb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 14 Feb 2025 11:38:08 +0100 Subject: [PATCH 6/7] fix: catching nvml not initialized exception when shuttind down --- perun/backend/nvml.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/perun/backend/nvml.py b/perun/backend/nvml.py index 97a3ec4..77c6e90 100644 --- a/perun/backend/nvml.py +++ b/perun/backend/nvml.py @@ -45,7 +45,10 @@ def setup(self): def close(self): """Backend shutdown code.""" if hasattr(self, "pynvml"): - self.pynvml.nvmlShutdown() + try: + self.pynvml.nvmlShutdown() + except Exception as e: + log.warning(e) def availableSensors(self) -> Dict[str, Tuple]: """Return string ids of visible devices. From 70ce6ae82b3e23c4f91c309c13015f9f39d96315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Sat, 15 Feb 2025 22:19:37 +0100 Subject: [PATCH 7/7] feat: new and improved monitoring --- perun/core.py | 7 +- perun/monitoring/monitor.py | 182 +++++++++++++++++---------------- perun/monitoring/subprocess.py | 66 +++++++----- 3 files changed, 138 insertions(+), 117 deletions(-) diff --git a/perun/core.py b/perun/core.py index 998a501..3e2c878 100644 --- a/perun/core.py +++ b/perun/core.py @@ -352,11 +352,6 @@ def monitor_application( log.error( f"Rank {self.comm.Get_rank()}: Failed to start run {i}, saving previous runs (if any), and exiting." ) - self._monitor.status = MonitorStatus.PROCESSING - # Ideally this should just retry to run the application again, hopping for the perunSubprocess to work, but this is not working as expected, because of heat's incrementalSVD, so we will just exit out of the loop for now. This should be fixed in the future. - # This should still save the data from the previous run, so it should be fine. - - # continue break if self.comm.Get_rank() == 0 and runNode: @@ -370,6 +365,8 @@ def monitor_application( i += 1 + self._monitor.close() + # Get app node data if it exists if self.comm.Get_rank() == 0 and len(multirun_nodes) > 0: multirun_node = self._process_multirun(multirun_nodes) diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index bd68107..d9d9a98 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -3,14 +3,14 @@ import enum import logging import multiprocessing -import pprint as pp import time from configparser import ConfigParser from multiprocessing import Event, Process, Queue -from multiprocessing.synchronize import Event as EventClass from subprocess import Popen from typing import Any, Dict, List, Optional, Tuple +import numpy as np + from perun.backend.backend import Backend from perun.comm import Comm from perun.data_model.data import DataNode, LocalRegions, NodeType @@ -35,10 +35,10 @@ class MonitorStatus(enum.Enum): PERUN_ERROR: An error occurred in the Perun system. MPI_ERROR: An error occurred in the MPI system. FILE_NOT_FOUND: The required file was not found. - SUCCESS: The monitor completed successfully. """ SETUP = enum.auto() + READY = enum.auto() RUNNING = enum.auto() PROCESSING = enum.auto() SCRIPT_ERROR = enum.auto() @@ -46,7 +46,7 @@ class MonitorStatus(enum.Enum): SP_ERROR = enum.auto() MPI_ERROR = enum.auto() FILE_NOT_FOUND = enum.auto() - SUCCESS = enum.auto() + CLOSED = enum.auto PERUN_MP_START_METHOD: str = "spawn" @@ -108,16 +108,80 @@ def __init__( print(e) log.warning(e) + self.sp_ready_event = Event() + self.start_event = Event() + self.stop_event = Event() + self.close_event = Event() + self.queue: Optional[Queue] = None + self.perunSP: Optional[Process] = None + self._reset_subprocess_handlers() + if len(self._l_assigned_sensors.keys()) > 0: + self._create_subprocess() + else: + self.sp_ready_event.set() # + + self._check_subprocess_health() + + def close(self): + """Close the monitor.""" + self._close_subprocess() + self.status = MonitorStatus.CLOSED def _reset_subprocess_handlers(self) -> None: """Reset subprocess handlers.""" - self.sp_ready_event: Optional[EventClass] = None - self.start_event: Optional[EventClass] = None - self.stop_event: Optional[EventClass] = None - self.queue: Optional[Queue] = None - self.perunSP: Optional[Process] = None + def _create_subprocess(self): + self.queue = Queue() + self.perunSP = Process( + target=perunSubprocess, + args=[ + self.queue, + self._comm.Get_rank(), + self._l_assigned_sensors, + self._config, + self.sp_ready_event, + self.start_event, + self.stop_event, + self.close_event, + self._config.getfloat("monitor", "sampling_period"), + ], + ) + log.info(f"Rank {self._comm.Get_rank()}: Starting monitoring subprocess") + self.perunSP.start() + log.info(f"Rank {self._comm.Get_rank()}: Started monitoring subprocess") + log.debug(f"Rank {self._comm.Get_rank()}: Alive: {self.perunSP.is_alive()}") + log.debug(f"Rank {self._comm.Get_rank()}: SP PID: {self.perunSP.pid}") + log.debug( + f"Rank {self._comm.Get_rank()}: SP Exit Code: {self.perunSP.exitcode}" + ) + log.info(f"Rank {self._comm.Get_rank()}: Monitoring subprocess started") + + def _check_subprocess_health(self): + event_set = self.sp_ready_event.wait(30) # type: ignore + if self.perunSP and not event_set: + log.error( + f"Rank {self._comm.Get_rank()}: Children: {multiprocessing.active_children()}" + ) + log.error( + f"Rank {self._comm.Get_rank()}: Monitoring subprocess did not start in time" + ) + log.error(f"Rank {self._comm.Get_rank()}: Alive: {self.perunSP.is_alive()}") + log.error(f"Rank {self._comm.Get_rank()}: SP PID: {self.perunSP.pid}") + log.error(f"Rank {self._comm.Get_rank()}: SP Exit Code: {self.perunSP}") + self.status = MonitorStatus.SP_ERROR + self._close_subprocess() + + log.info(f"Rank {self._comm.Get_rank()}: Waiting for everyones status") + self.all_status = self._comm.allgather(self.status) + if MonitorStatus.SP_ERROR in self.all_status: + log.error(f"Rank {self._comm.Get_rank()}: Stopping run") + log.error( + f"Rank {self._comm.Get_rank()}: Children: {multiprocessing.active_children()}" + ) + + self.status = MonitorStatus.SP_ERROR + self.status = MonitorStatus.READY def run_application( self, @@ -158,18 +222,22 @@ def run_application( """ log.info(f"Rank {self._comm.Get_rank()}: _run_application") + if self.status != MonitorStatus.READY: + raise SystemError("Not ready for monitoring, exiting!") + if record: if self._app.is_binary: return self._run_binary_app(run_id) else: + self._comm.barrier() return self._run_python_app(run_id) else: try: self.status = MonitorStatus.RUNNING result = self._app.run() - self.status = MonitorStatus.PROCESSING + self.status = MonitorStatus.READY except SystemExit: - self.status = MonitorStatus.PROCESSING + self.status = MonitorStatus.SCRIPT_ERROR log.warning( "The application exited using exit(), quit() or sys.exit(). This is not the recommended way to exit an application, as it complicates the data collection process. Please refactor your code." ) @@ -187,74 +255,6 @@ def run_application( def _run_python_app( self, run_id: str ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: - # 1) Get sensor configuration - self.sp_ready_event = Event() - self.start_event = Event() - self.stop_event = Event() - - self.queue = None - self.perunSP = None - - # 2) If assigned devices, create subprocess - if len(self._l_assigned_sensors.keys()) > 0: - log.debug( - f"Rank {self._comm.Get_rank()} - Local Backendens : {pp.pformat(self._l_assigned_sensors)}" - ) - self.queue = Queue() - log.info( - f"Rank {self._comm.Get_rank()}: {self.queue}, {self._backends}, {self._l_assigned_sensors}, {self._config}, {self.sp_ready_event}, {self.start_event}, {self.stop_event}, {self._config.getfloat('monitor', 'sampling_period')}" - ) - self.perunSP = Process( - target=perunSubprocess, - args=[ - self.queue, - self._comm.Get_rank(), - self._l_assigned_sensors, - self._config, - self.sp_ready_event, - self.start_event, - self.stop_event, - self._config.getfloat("monitor", "sampling_period"), - ], - ) - log.info(f"Rank {self._comm.Get_rank()}: Starting monitoring subprocess") - self.perunSP.start() - log.debug(f"Rank {self._comm.Get_rank()}: Alive: {self.perunSP.is_alive()}") - log.debug(f"Rank {self._comm.Get_rank()}: SP PID: {self.perunSP.pid}") - log.debug( - f"Rank {self._comm.Get_rank()}: SP Exit Code: {self.perunSP.exitcode}" - ) - log.info(f"Rank {self._comm.Get_rank()}: Monitoring subprocess started") - else: - self.sp_ready_event.set() # type: ignore - - event_set = self.sp_ready_event.wait(30) # type: ignore - if self.perunSP and not event_set: - log.error( - f"Rank {self._comm.Get_rank()}: Children: {multiprocessing.active_children()}" - ) - log.error( - f"Rank {self._comm.Get_rank()}: Monitoring subprocess did not start in time" - ) - log.error(f"Rank {self._comm.Get_rank()}: Alive: {self.perunSP.is_alive()}") - log.error(f"Rank {self._comm.Get_rank()}: SP PID: {self.perunSP.pid}") - log.error(f"Rank {self._comm.Get_rank()}: SP Exit Code: {self.perunSP}") - self.status = MonitorStatus.SP_ERROR - self._close_subprocess() - - log.info(f"Rank {self._comm.Get_rank()}: Waiting for everyones status") - self.all_status = self._comm.allgather(self.status) - if MonitorStatus.SP_ERROR in self.all_status: - log.error(f"Rank {self._comm.Get_rank()}: Stopping run") - log.error( - f"Rank {self._comm.Get_rank()}: Children: {multiprocessing.active_children()}" - ) - - self.status = MonitorStatus.SP_ERROR - self._reset_subprocess_handlers() - - return self.status, None, None - # 3) Start application log.info(f"Rank {self._comm.Get_rank()}: Starting App") self.local_regions = LocalRegions() @@ -264,9 +264,12 @@ def _run_python_app( try: app_result = self._app.run() except SystemExit: + self.status = MonitorStatus.SCRIPT_ERROR log.info( "The application exited using exit(), quit() or sys.exit(). This is not the recommended way to exit an application, as it complicates the data collection process. Please refactor your code." ) + recoveredNodes = self._handle_failed_run() + return self.status, recoveredNodes, None except Exception as e: self.status = MonitorStatus.SCRIPT_ERROR @@ -283,28 +286,33 @@ def _run_python_app( recoveredNodes = self._handle_failed_run() return self.status, recoveredNodes, None + self.stop_event.set() # type: ignore self.status = MonitorStatus.PROCESSING # run_stoptime = datetime.utcnow() log.info(f"Rank {self._comm.Get_rank()}: App Stopped") - self.stop_event.set() # type: ignore + node = self._process_single_run(run_id, starttime_ns) # 4) App finished, stop subrocess and get data - return self.status, self._process_single_run(run_id, starttime_ns), app_result + self.status = MonitorStatus.READY + return self.status, node, app_result def _run_binary_app( self, run_id: str ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: # 1) Prepare sensors ( - timesteps, t_metadata, - rawValues, lSensors, ) = prepSensors(self._backends, self._l_assigned_sensors) log.debug(f"SP: backends -- {self._backends}") log.debug(f"SP: l_sensor_config -- {self._l_assigned_sensors}") log.debug(f"Rank {self._comm.Get_rank()}: perunSP lSensors: {lSensors}") + timesteps: List[int] = [] + rawValues: List[List[np.number]] = [] + for _ in lSensors: + rawValues.append([]) + sampling_period = self._config.getfloat("monitor", "sampling_period") # 2) Start monitoring process @@ -339,7 +347,7 @@ def _run_binary_app( runNode = DataNode(id=run_id, type=NodeType.RUN, nodes={hostNode.id: hostNode}) runNode.addRegionData(globalRegions, starttime_ns) - return MonitorStatus.SUCCESS, runNode, None + return MonitorStatus.READY, runNode, None def _handle_failed_run(self) -> Optional[DataNode]: availableRanks = self._comm.check_available_ranks() @@ -387,10 +395,8 @@ def _process_single_run( log.info(f"Rank {self._comm.Get_rank()}: Collecting queue data.") nodeData = self.queue.get(block=True) log.info(f"Rank {self._comm.Get_rank()}: Closing subprocess.") - self._close_subprocess() else: nodeData = None - self._reset_subprocess_handlers() log.info(f"Rank {self._comm.Get_rank()}: Gathering data.") @@ -427,8 +433,10 @@ def _process_single_run( def _close_subprocess(self) -> None: """Close the subprocess.""" + self.close_event.set() if self.perunSP and self.queue: self.perunSP.join(30) + log.debug("SP exit code {self.perunSP.exitcode}") if self.perunSP.exitcode is None: log.warning( f"Rank {self._comm.Get_rank()}: Monitoring subprocess did not close in time, terminating." @@ -443,5 +451,3 @@ def _close_subprocess(self) -> None: self.queue.close() self.queue = None log.info(f"Rank {self._comm.Get_rank()}: Monitoring subprocess closed") - - self._reset_subprocess_handlers() diff --git a/perun/monitoring/subprocess.py b/perun/monitoring/subprocess.py index 5ce347b..314e5fe 100644 --- a/perun/monitoring/subprocess.py +++ b/perun/monitoring/subprocess.py @@ -20,7 +20,7 @@ def prepSensors( backends: Dict[str, Backend], l_assigned_sensors: Dict[str, Tuple] -) -> Tuple[List[int], MetricMetaData, List[List[np.number]], List[Sensor]]: +) -> Tuple[MetricMetaData, List[Sensor]]: """ Prepare sensors for monitoring. @@ -50,7 +50,6 @@ def prepSensors( if len(sensor_ids) > 0: lSensors += backend.getSensors(sensor_ids) - timesteps: List[int] = [] t_metadata = MetricMetaData( Unit.SECOND, Magnitude.ONE, @@ -59,11 +58,8 @@ def prepSensors( np.finfo("float32").max, np.float32(-1), ) - rawValues: List[List[np.number]] = [] - for _ in lSensors: - rawValues.append([]) - return timesteps, t_metadata, rawValues, lSensors + return t_metadata, lSensors def _monitoringLoop( @@ -78,7 +74,6 @@ def _monitoringLoop( delta = (time.time_ns() - timesteps[-1]) * 1e-9 while not stopCondition(delta): - log.debug("Loop") timesteps.append(time.time_ns()) for idx, device in enumerate(lSensors): rawValues[idx].append(device.read()) @@ -173,6 +168,7 @@ def perunSubprocess( sp_ready_event, start_event, stop_event, + close_event, sampling_period: float, ): """Parallel function that samples energy values from hardware libraries. @@ -210,37 +206,59 @@ def perunSubprocess( log.info(e) log.debug("Initialized backends.") ( - timesteps, t_metadata, - rawValues, lSensors, ) = prepSensors(backends, l_assigned_sensors) + + # Reset + timesteps: List[int] = [] + rawValues: List[List[np.number]] = [] + for _ in lSensors: + rawValues.append([]) log.debug(f"SP: backends -- {backends}") log.debug(f"SP: l_sensor_config -- {l_assigned_sensors}") log.debug(f"Rank {rank}: perunSP lSensors: {lSensors}") # Monitoring process ready + monitoring = True sp_ready_event.set() - # Waiting for main process to send the signal - start_event.wait() - _monitoringLoop( - lSensors, - timesteps, - rawValues, - lambda delta: stop_event.wait(sampling_period - delta), - ) + while monitoring: + if start_event.is_set(): + start_event.clear() + _monitoringLoop( + lSensors, + timesteps, + rawValues, + lambda delta: stop_event.wait(sampling_period - delta), + ) + stop_event.clear() + + log.info(f"Rank {rank}: Subprocess: Stop event received.") + hostNode = createNode( + timesteps, t_metadata, rawValues, lSensors, perunConfig + ) - log.info(f"Rank {rank}: Subprocess: Stop event received.") - hostNode = createNode(timesteps, t_metadata, rawValues, lSensors, perunConfig) + processDataNode(hostNode, perunConfig) - processDataNode(hostNode, perunConfig) + # This should send a single processed node for the current computational node + queue.put(hostNode, block=True) + log.info(f"Rank {rank}: Subprocess: Sent data") + + # Reset + timesteps = [] + rawValues = [] + for _ in lSensors: + rawValues.append([]) + elif close_event.is_set(): + monitoring = False + else: + time.sleep(sampling_period / 2) + + log.info("Close event recived.") - # This should send a single processed node for the current computational node - queue.put(hostNode, block=True) - log.info(f"Rank {rank}: Subprocess: Sent data") # Close backends for backend in backends: log.debug(f"Closing backend {backend}") del backend - return hostNode + return 0