diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 81854d70..bce4c9b7 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -45,6 +45,7 @@ from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk from autosubmit.job.job_utils import get_job_package_code, get_split_size_unit, get_split_size +from autosubmit.job.metrics_processor import UserMetricProcessor from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from autosubmit.platforms.platform import Platform from autosubmitconfigparser.config.basicconfig import BasicConfig @@ -734,7 +735,7 @@ def is_serial(self): return str(self.processors) == '1' or str(self.processors) == '' @property - def platform(self): + def platform(self) -> Platform: """ Returns the platform to be used by the job. Chooses between serial and parallel platforms @@ -1408,7 +1409,14 @@ def update_status(self, as_conf: AutosubmitConfig, failed_file: bool = False) -> else: self.platform.add_job_to_log_recover(self) - # TODO Read and store metrics here + # Read and store metrics here + try: + metric_procesor = UserMetricProcessor(as_conf, self) + metrics_specs = metric_procesor.read_metrics_specs() + metric_procesor.process_metrics_specs(metrics_specs) + except Exception as exc: + # Warn if metrics are not processed + Log.warning(f"Error processing metrics for job {self.name}: {exc}") return self.status diff --git a/autosubmit/job/metrics_processor.py b/autosubmit/job/metrics_processor.py index 514330c8..78ac63f7 100644 --- a/autosubmit/job/metrics_processor.py +++ b/autosubmit/job/metrics_processor.py @@ -3,11 +3,17 @@ import json import copy from pathlib import Path -from typing import Any, Dict, List, Optional -from autosubmit.job.job import Job +import traceback +from typing import TYPE_CHECKING, Any, Dict, List, Optional from autosubmitconfigparser.config.configcommon import AutosubmitConfig from log.log import Log +if TYPE_CHECKING: + # Avoid circular imports + from autosubmit.job.job import Job + +MAX_FILE_SIZE = 4 * 1024 * 1024 # Default 4MB max file size + class MetricSpecSelectorType(Enum): TEXT = "TEXT" @@ -79,14 +85,20 @@ def load(data: Dict[str, Any]) -> "MetricSpec": class UserMetricProcessor: - def __init__(self, as_conf: AutosubmitConfig, job: Job): + def __init__(self, as_conf: AutosubmitConfig, job: "Job"): self.as_conf = as_conf self.job = job def read_metrics_specs(self) -> List[MetricSpec]: - raw_metrics: List[Dict[str, Any]] = self.as_conf.normalize_parameters_keys( - self.as_conf.get_section([self.job.section, "METRICS"]) - ) + try: + # Log.warning(str(self.as_conf.get_section(["JOBS", self.job.section, "METRICS"]))) + raw_metrics: List[Dict[str, Any]] = self.as_conf.get_section(["JOBS", self.job.section, "METRICS"]) + + # Normalize the parameters keys + raw_metrics = [self.as_conf.normalize_parameters_keys(metric) for metric in raw_metrics] + except Exception: + Log.debug(traceback.format_exc()) + raise ValueError("Invalid or missing metrics section") metrics_specs: List[MetricSpec] = [] for raw_metric in raw_metrics: @@ -138,11 +150,13 @@ def store_metric(self, metric_name: str, metric_value: Any): """ Store the metric value in the database """ - self.job.name - raise NotImplementedError("store_metric method must be implemented") + with open("metrics.txt", "w") as f: + f.write(f"{self.job.name} {metric_name}: {metric_value}\n") def process_metrics_specs(self, metrics_specs: List[MetricSpec]): - """ """ + """ + Process the metrics specs of the job + """ metrics_by_path_selector_type = self._group_metrics_by_path_selector_type( metrics_specs @@ -150,8 +164,15 @@ def process_metrics_specs(self, metrics_specs: List[MetricSpec]): # For each file path, read the content of the file for path, metrics_by_selector_type in metrics_by_path_selector_type.items(): - with open(path, "r") as f: - content = f.read() + # Read the file from remote platform, it will replace the decoding errors. + try: + content = self.job.platform.read_file(path, max_size=MAX_FILE_SIZE) + Log.debug(f"Read file {path}: {content}") + content = content.decode(errors="replace") + except Exception: + Log.debug(traceback.format_exc()) + Log.warning(f"Error reading metric file at {path}") + continue # Process the content based on the selector type diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index ba3ec7bb..fcb1862c 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -19,6 +19,8 @@ import locale import os from pathlib import Path +import traceback +from typing import Union from xml.dom.minidom import parseString import subprocess from matplotlib.patches import PathPatch @@ -324,3 +326,27 @@ def check_completed_files(self, sections: str = None) -> str: return self._ssh_output else: return None + + def get_file_size(self, src: str) -> Union[int, None]: + """ + Get file size in bytes + :param src: file path + """ + try: + return Path(src).stat().st_size + except Exception: + Log.debug(traceback.format_exc()) + return None + + def read_file(self, src: str, max_size: int = None) -> Union[bytes, None]: + """ + Read file content as bytes. If max_size is set, only the first max_size bytes are read. + :param src: file path + :param max_size: maximum size to read + """ + try: + with open(src, "rb") as f: + return f.read(max_size) + except Exception: + Log.debug(traceback.format_exc()) + return None diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index d94ace83..36a62136 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -5,7 +5,8 @@ import sys import socket import os -from typing import List, TYPE_CHECKING +import traceback +from typing import List, TYPE_CHECKING, Union import paramiko import datetime import select @@ -1459,6 +1460,32 @@ def check_absolute_file_exists(self, src): return False except: return False + + def get_file_size(self, src: str) -> Union[int, None]: + """ + Get file size in bytes + :param src: file path + """ + try: + return self._ftpChannel.stat(src).st_size + except Exception: + Log.debug(traceback.format_exc()) + return None + + def read_file(self, src: str, max_size: int = None) -> Union[bytes, None]: + """ + Read file content as bytes. If max_size is set, only the first max_size bytes are read. + :param src: file path + :param max_size: maximum size to read + """ + try: + with self._ftpChannel.file(src, "r") as file: + return file.read(size=max_size) + except Exception: + Log.debug(traceback.format_exc()) + return None + + class ParamikoPlatformException(Exception): """ Exception raised from HPC queues diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 5b93811a..78addf46 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1005,3 +1005,19 @@ def recover_platform_job_logs(self) -> None: self.closeConnection() Log.info(f"{identifier} Exiting.") _exit(0) # Exit userspace after manually closing ssh sockets, recommended for child processes, the queue() and shared signals should be in charge of the main process. + + def get_file_size(self, src: str) -> Union[int, None]: + """ + Get file size in bytes + :param src: file path + """ + raise NotImplementedError + + def read_file(self, src: str, max_size: int = None) -> Union[bytes, None]: + """ + Read file content as bytes. If max_size is set, only the first max_size bytes are read. + :param src: file path + :param max_size: maximum size to read + """ + raise NotImplementedError + \ No newline at end of file diff --git a/test/unit/test_user_metrics.py b/test/unit/test_user_metrics.py index 3633e6f1..2f2e6027 100644 --- a/test/unit/test_user_metrics.py +++ b/test/unit/test_user_metrics.py @@ -154,8 +154,7 @@ def test_read_metrics_specs(): as_conf = MagicMock() job = MagicMock() - as_conf.get_section.return_value = None - as_conf.normalize_parameters_keys.return_value = [ + as_conf.get_section.return_value = [ {"NAME": "metric1", "PATH": "/path/to/", "FILENAME": "file1"}, { "NAME": "invalid metric", @@ -167,6 +166,7 @@ def test_read_metrics_specs(): "SELECTOR": {"TYPE": "JSON", "KEY": "key1.key2.key3"}, }, ] + as_conf.normalize_parameters_keys = lambda x: x # Do the read test user_metric_processor = UserMetricProcessor(as_conf, job)