Skip to content

Commit

Permalink
Reformatting using black
Browse files Browse the repository at this point in the history
  • Loading branch information
micknudsen committed May 22, 2023
1 parent 54e6fa5 commit a1d7169
Showing 1 changed file with 56 additions and 28 deletions.
84 changes: 56 additions & 28 deletions src/gwf_utilization/accounting.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@
SECONDS_PER_HOUR = 60 * SECONDS_PER_MINUTE
SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR

EXPONENTS = OrderedDict([
('P', 50), ('T', 40), ('G', 30), ('M', 20), ('K', 10), ('', 0)
])
EXPONENTS = OrderedDict(
[("P", 50), ("T", 40), ("G", 30), ("M", 20), ("K", 10), ("", 0)]
)

SLURM_SACCT_COLS = (
'JobName', 'JobID', 'State', 'NCPUS', 'Elapsed', 'TotalCPU', 'Timelimit', 'ReqMem', 'MaxRSS', 'NNodes'
"JobName",
"JobID",
"State",
"NCPUS",
"Elapsed",
"TotalCPU",
"Timelimit",
"ReqMem",
"MaxRSS",
"NNodes",
)


Expand All @@ -25,14 +34,14 @@ def _seconds(time_string):
"""Converts time string on the form [[[days-]hours:]minutes:]seconds[.milliseconds] to seconds"""

# Remove milliseconds
time_string = time_string.split('.')[0]
time_string = time_string.split(".")[0]

if '-' in time_string:
days, time_string = time_string.split('-')
if "-" in time_string:
days, time_string = time_string.split("-")
else:
days = None

parts = time_string.split(':')[::-1]
parts = time_string.split(":")[::-1]

try:
seconds = parts[0]
Expand Down Expand Up @@ -66,20 +75,26 @@ def _parse_memory_string(memory_string, cores, nodes):
if not memory_string:
return 0

memory_regexp = r'([0-9]+)([KMGTP]?)([cn]?)'
memory_regexp = r"([0-9]+)([KMGTP]?)([cn]?)"
scalar, prefix, multiplier = re.match(memory_regexp, memory_string).groups()

raw_result = int(scalar) * 2 ** EXPONENTS[prefix]
if multiplier == 'c':
if multiplier == "c":
raw_result *= cores
elif multiplier == 'n':
elif multiplier == "n":
raw_result *= nodes
return raw_result


def _call_sacct(job_id, include_header=False):
proc = subprocess.Popen(
['sacct', '--format=' + ','.join(SLURM_SACCT_COLS), '--parsable2', '--jobs', job_id],
[
"sacct",
"--format=" + ",".join(SLURM_SACCT_COLS),
"--parsable2",
"--jobs",
job_id,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
Expand All @@ -89,12 +104,12 @@ def _call_sacct(job_id, include_header=False):
if include_header:
return stdout
# Skip first line (the header).
return '\n'.join(stdout.split('\n')[1:])
return "\n".join(stdout.split("\n")[1:])


def _call_sacct_batch(job_ids):
if not job_ids:
return ''
return ""
result = _call_sacct(job_ids[0], include_header=True)
for job_id in job_ids[1:]:
result += _call_sacct(job_id)
Expand All @@ -103,33 +118,37 @@ def _call_sacct_batch(job_ids):

def get_jobs_from_string(sacct_output):
"""Yield jobs given a string of sacct output."""
sacct_output_rows = [line.split('|') for line in sacct_output.splitlines()]
sacct_output_rows = [line.split("|") for line in sacct_output.splitlines()]
columns = sacct_output_rows[0]
data = [row for row in sacct_output_rows if 'COMPLETED' in row and re.match(r'[0-9]+(\.batch)?$', row[1])]
data = [
row
for row in sacct_output_rows
if "COMPLETED" in row and re.match(r"[0-9]+(\.batch)?$", row[1])
]

assert tuple(columns) == SLURM_SACCT_COLS

for entry, entry_batch in _iterpairs(data):
dct = dict(zip(columns, entry))

if '_' in dct['JobID']:
if "_" in dct["JobID"]:
continue

dct_batch = dict(zip(columns, entry_batch))
assert dct_batch['JobID'] == dct['JobID'] + '.batch'
assert dct_batch["JobID"] == dct["JobID"] + ".batch"

cores = int(dct['NCPUS'])
nodes = int(dct['NNodes'])
cores = int(dct["NCPUS"])
nodes = int(dct["NNodes"])

yield Job(
name=dct['JobName'],
name=dct["JobName"],
cores=cores,
nodes=nodes,
used_walltime=_seconds(dct['Elapsed']),
allocated_time_per_core=_seconds(dct['Timelimit']),
used_cpu_time=_seconds(dct['TotalCPU']),
allocated_memory=_parse_memory_string(dct['ReqMem'], cores, nodes),
used_memory=_parse_memory_string(dct_batch['MaxRSS'], cores, nodes)
used_walltime=_seconds(dct["Elapsed"]),
allocated_time_per_core=_seconds(dct["Timelimit"]),
used_cpu_time=_seconds(dct["TotalCPU"]),
allocated_memory=_parse_memory_string(dct["ReqMem"], cores, nodes),
used_memory=_parse_memory_string(dct_batch["MaxRSS"], cores, nodes),
)


Expand Down Expand Up @@ -162,8 +181,17 @@ class Job:
Memory used by the job in bytes.
"""

def __init__(self, name, cores, nodes, allocated_time_per_core,
used_walltime, used_cpu_time, allocated_memory, used_memory):
def __init__(
self,
name,
cores,
nodes,
allocated_time_per_core,
used_walltime,
used_cpu_time,
allocated_memory,
used_memory,
):
self.name = name
self.allocated_time_per_core = allocated_time_per_core
self.used_walltime = used_walltime
Expand Down

0 comments on commit a1d7169

Please sign in to comment.