Skip to content

Commit

Permalink
integrate to workflow run
Browse files Browse the repository at this point in the history
  • Loading branch information
LuiggiTenorioK committed Jan 30, 2025
1 parent 41fa832 commit e57278d
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 16 deletions.
12 changes: 10 additions & 2 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty
from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk
from autosubmit.job.job_utils import get_job_package_code, get_split_size_unit, get_split_size
from autosubmit.job.metrics_processor import UserMetricProcessor
from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter
from autosubmit.platforms.platform import Platform
from autosubmitconfigparser.config.basicconfig import BasicConfig
Expand Down Expand Up @@ -734,7 +735,7 @@ def is_serial(self):
return str(self.processors) == '1' or str(self.processors) == ''

@property
def platform(self):
def platform(self) -> Platform:
"""
Returns the platform to be used by the job. Chooses between serial and parallel platforms
Expand Down Expand Up @@ -1408,7 +1409,14 @@ def update_status(self, as_conf: AutosubmitConfig, failed_file: bool = False) ->
else:
self.platform.add_job_to_log_recover(self)

# TODO Read and store metrics here
# Read and store metrics here
try:
metric_procesor = UserMetricProcessor(as_conf, self)
metrics_specs = metric_procesor.read_metrics_specs()
metric_procesor.process_metrics_specs(metrics_specs)
except Exception as exc:
# Warn if metrics are not processed
Log.warning(f"Error processing metrics for job {self.name}: {exc}")

return self.status

Expand Down
43 changes: 32 additions & 11 deletions autosubmit/job/metrics_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
import json
import copy
from pathlib import Path
from typing import Any, Dict, List, Optional
from autosubmit.job.job import Job
import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from autosubmitconfigparser.config.configcommon import AutosubmitConfig
from log.log import Log

if TYPE_CHECKING:
# Avoid circular imports
from autosubmit.job.job import Job

MAX_FILE_SIZE = 4 * 1024 * 1024 # Default 4MB max file size


class MetricSpecSelectorType(Enum):
TEXT = "TEXT"
Expand Down Expand Up @@ -79,14 +85,20 @@ def load(data: Dict[str, Any]) -> "MetricSpec":


class UserMetricProcessor:
def __init__(self, as_conf: AutosubmitConfig, job: Job):
def __init__(self, as_conf: AutosubmitConfig, job: "Job"):
self.as_conf = as_conf
self.job = job

def read_metrics_specs(self) -> List[MetricSpec]:
raw_metrics: List[Dict[str, Any]] = self.as_conf.normalize_parameters_keys(
self.as_conf.get_section([self.job.section, "METRICS"])
)
try:
# Log.warning(str(self.as_conf.get_section(["JOBS", self.job.section, "METRICS"])))
raw_metrics: List[Dict[str, Any]] = self.as_conf.get_section(["JOBS", self.job.section, "METRICS"])

# Normalize the parameters keys
raw_metrics = [self.as_conf.normalize_parameters_keys(metric) for metric in raw_metrics]
except Exception:
Log.debug(traceback.format_exc())
raise ValueError("Invalid or missing metrics section")

metrics_specs: List[MetricSpec] = []
for raw_metric in raw_metrics:
Expand Down Expand Up @@ -138,20 +150,29 @@ def store_metric(self, metric_name: str, metric_value: Any):
"""
Store the metric value in the database
"""
self.job.name
raise NotImplementedError("store_metric method must be implemented")
with open("metrics.txt", "w") as f:
f.write(f"{self.job.name} {metric_name}: {metric_value}\n")

def process_metrics_specs(self, metrics_specs: List[MetricSpec]):
""" """
"""
Process the metrics specs of the job
"""

metrics_by_path_selector_type = self._group_metrics_by_path_selector_type(
metrics_specs
)

# For each file path, read the content of the file
for path, metrics_by_selector_type in metrics_by_path_selector_type.items():
with open(path, "r") as f:
content = f.read()
# Read the file from remote platform, it will replace the decoding errors.
try:
content = self.job.platform.read_file(path, max_size=MAX_FILE_SIZE)
Log.debug(f"Read file {path}: {content}")
content = content.decode(errors="replace")
except Exception:
Log.debug(traceback.format_exc())
Log.warning(f"Error reading metric file at {path}")
continue

# Process the content based on the selector type

Expand Down
26 changes: 26 additions & 0 deletions autosubmit/platforms/locplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import locale
import os
from pathlib import Path
import traceback
from typing import Union
from xml.dom.minidom import parseString
import subprocess
from matplotlib.patches import PathPatch
Expand Down Expand Up @@ -324,3 +326,27 @@ def check_completed_files(self, sections: str = None) -> str:
return self._ssh_output
else:
return None

def get_file_size(self, src: str) -> Union[int, None]:
"""
Get file size in bytes
:param src: file path
"""
try:
return Path(src).stat().st_size
except Exception:
Log.debug(traceback.format_exc())
return None

def read_file(self, src: str, max_size: int = None) -> Union[bytes, None]:
"""
Read file content as bytes. If max_size is set, only the first max_size bytes are read.
:param src: file path
:param max_size: maximum size to read
"""
try:
with open(src, "rb") as f:
return f.read(max_size)
except Exception:
Log.debug(traceback.format_exc())
return None
29 changes: 28 additions & 1 deletion autosubmit/platforms/paramiko_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import sys
import socket
import os
from typing import List, TYPE_CHECKING
import traceback
from typing import List, TYPE_CHECKING, Union
import paramiko
import datetime
import select
Expand Down Expand Up @@ -1459,6 +1460,32 @@ def check_absolute_file_exists(self, src):
return False
except:
return False

def get_file_size(self, src: str) -> Union[int, None]:
"""
Get file size in bytes
:param src: file path
"""
try:
return self._ftpChannel.stat(src).st_size
except Exception:
Log.debug(traceback.format_exc())
return None

def read_file(self, src: str, max_size: int = None) -> Union[bytes, None]:
"""
Read file content as bytes. If max_size is set, only the first max_size bytes are read.
:param src: file path
:param max_size: maximum size to read
"""
try:
with self._ftpChannel.file(src, "r") as file:
return file.read(size=max_size)
except Exception:
Log.debug(traceback.format_exc())
return None


class ParamikoPlatformException(Exception):
"""
Exception raised from HPC queues
Expand Down
16 changes: 16 additions & 0 deletions autosubmit/platforms/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,3 +1005,19 @@ def recover_platform_job_logs(self) -> None:
self.closeConnection()
Log.info(f"{identifier} Exiting.")
_exit(0) # Exit userspace after manually closing ssh sockets, recommended for child processes, the queue() and shared signals should be in charge of the main process.

def get_file_size(self, src: str) -> Union[int, None]:
"""
Get file size in bytes
:param src: file path
"""
raise NotImplementedError

def read_file(self, src: str, max_size: int = None) -> Union[bytes, None]:
"""
Read file content as bytes. If max_size is set, only the first max_size bytes are read.
:param src: file path
:param max_size: maximum size to read
"""
raise NotImplementedError

4 changes: 2 additions & 2 deletions test/unit/test_user_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ def test_read_metrics_specs():
as_conf = MagicMock()
job = MagicMock()

as_conf.get_section.return_value = None
as_conf.normalize_parameters_keys.return_value = [
as_conf.get_section.return_value = [
{"NAME": "metric1", "PATH": "/path/to/", "FILENAME": "file1"},
{
"NAME": "invalid metric",
Expand All @@ -167,6 +166,7 @@ def test_read_metrics_specs():
"SELECTOR": {"TYPE": "JSON", "KEY": "key1.key2.key3"},
},
]
as_conf.normalize_parameters_keys = lambda x: x

# Do the read test
user_metric_processor = UserMetricProcessor(as_conf, job)
Expand Down

0 comments on commit e57278d

Please sign in to comment.