Skip to content

Commit

Permalink
Merge pull request #171 from djw8605/hepscore2
Browse files Browse the repository at this point in the history
Adding HEPScore support to the apel reporting
  • Loading branch information
matyasselmeci authored Feb 9, 2024
2 parents da5d5a0 + 8b8f3ec commit c5e8eee
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 28 deletions.
2 changes: 1 addition & 1 deletion opensciencegrid/gracc-apel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ARG BASE_OS

# install dependencies
RUN yum -y install python3 python3-pip && \
pip3 install elasticsearch-dsl && \
pip3 install elasticsearch-dsl requests && \
pip3 install argo-ams-library
RUN yum -y install http://rpm-repo.argo.grnet.gr/ARGO/devel/centos7/python-argo-ams-library-0.5.5-20210415071520.ff0c536.$BASE_OS.noarch.rpm

Expand Down
117 changes: 90 additions & 27 deletions opensciencegrid/gracc-apel/apel_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import operator
import sys
import os
import requests
import json
from statistics import mean
from math import isclose


#logging.basicConfig(level=logging.WARN)
Expand All @@ -26,6 +30,42 @@
MAXSZ=2**30
MISSING='__MISSING__'

resource_group_map = None

def get_hs23_portion(resource_group) -> float:
"""
Download the HS23 portion of the OSG site info from OIM.
:param resource_group: The Topology resource group name.
:return: The HS23 portion of the site, or 0.0 if not found.
"""
global resource_group_map
if resource_group_map == None:
# Download the map from Topology
resp = requests.get("https://topology.opensciencegrid.org/api/resource_group_summary")
if resp.status_code != 200:
#print("Error downloading resource group summary from Topology: {}".format(resp.status_code))
#return 0.0
raise Exception("Error downloading resource group summary from Topology: {}".format(resp.status_code))

raw_json = resp.json()
# Parse the JSON response
resource_group_map = {}
for resource_group_name in raw_json:
hep_spec_percentages = []
for resource in raw_json[resource_group_name]["Resources"]['Resource']:
if 'HEPScore23Percentage' in resource['WLCGInformation']:
hep_spec_percentages.append(float(resource['WLCGInformation']['HEPScore23Percentage']))
if len(hep_spec_percentages) > 0:
resource_group_map[resource_group_name] = mean(hep_spec_percentages)
else:
resource_group_map[resource_group_name] = 0.0

return resource_group_map.get(resource_group, 0.0)




def add_bkt_metrics(bkt):
bkt = bkt.metric('NormalFactor','terms', field='OIM_WLCGAPELNormalFactor')
bkt = bkt.metric('CpuDuration_system', 'sum', field='CpuDuration_system')
Expand Down Expand Up @@ -71,7 +111,7 @@ def gracc_query_apel(year, month):
return response

# Fixed entries:
fixed_header = "APEL-summary-job-message: v0.3"
fixed_header = "APEL-summary-job-message: v0.4"
fixed_separator = "%%"
fixed_infrastructure = "Gratia-OSG"
fixed_nodecount = 1
Expand Down Expand Up @@ -183,32 +223,57 @@ def add_record(recs, vo, site, cores, dn, bkt):

recs[rk] += rec

def print_header():
print(fixed_header)
def print_header(output_file = sys.stdout):
print(fixed_header, file=output_file)

def print_rk_recr(year, month, rk, rec):
def print_rk_recr(year, month, rk, rec, output_file=sys.stdout):

if rk.dn == "N/A":
dn = "generic %s user" % rk.vo
else:
dn = rk.dn

print("Site:", rk.site)
print("VO:", rk.vo)
print("EarliestEndTime:", rec.mintime)
print("LatestEndTime:", rec.maxtime + 60*60*24 - 1)
print("Month:", "%02d" % month)
print("Year:", year)
print("Infrastructure:", fixed_infrastructure)
print("GlobalUserName:", dn)
print("Processors:", rk.cores)
print("NodeCount:", fixed_nodecount)
print("WallDuration:", rec.walldur)
print("CpuDuration:", rec.cpudur)
print("NormalisedWallDuration:", int(rec.walldur * rec.nf))
print("NormalisedCpuDuration:", int(rec.cpudur * rec.nf))
print("NumberOfJobs:", rec.njobs)
print(fixed_separator)
# With no hs23 portion, the submit host is just "hepspec-hosts"
# With hs23 portion, it's both "hepspec-hosts" and "hepscore-hosts"
submit_hosts = ["hepspec-hosts"]
# Check the site name for the HS23 portion
hs23_portion = get_hs23_portion(rk.site)
if not isclose(hs23_portion, 0.0):
submit_hosts.append("hepscore-hosts")

# Quick lambda to write the lines
write = lambda *line: print(*line, file=output_file)

for submit_host in range(len(submit_hosts)):
# Index 0 is hepspec-hosts, index 1 is hepscore-hosts
# Do some clever math to get the portion

if submit_host == 0:
portion = 1.0 - hs23_portion
metric_name = "hepspec"
elif submit_host == 1:
portion = hs23_portion
metric_name = "HEPscore23"
else:
raise ValueError(f"Invalid submit_host: {submit_host}")

write("Site:", rk.site)
write("SubmitHost:", submit_hosts[submit_host])
write("VO:", rk.vo)
write("EarliestEndTime:", rec.mintime)
write("LatestEndTime:", rec.maxtime + 60*60*24 - 1)
write("Month:", "%02d" % month)
write("Year:", year)
write("Infrastructure:", fixed_infrastructure)
write("GlobalUserName:", dn)
write("Processors:", rk.cores)
write("NodeCount:", fixed_nodecount)
write("WallDuration:", int(rec.walldur * portion))
write("CpuDuration:", int(rec.cpudur * portion))
write("NormalisedWallDuration:", "{" + metric_name + ": " + str(int(rec.walldur * rec.nf * portion)) + "}")
write("NormalisedCpuDuration:", "{" + metric_name + ": " + str(int(rec.cpudur * rec.nf * portion)) + "}")
write("NumberOfJobs:", int(rec.njobs * portion))
write(fixed_separator)

def bkt_key_lower(bkt):
return bkt.key.lower()
Expand All @@ -235,16 +300,15 @@ def main():
print("usage: %s [YEAR MONTH]" % os.path.basename(__file__), file=sys.stderr)
sys.exit(0)

orig_stdout = sys.stdout
outfile = "%02d_%d.apel" % (month, year)
sys.stdout = open(outfile, "w")
outfile_name = "%02d_%d.apel" % (month, year)
outfile = open(outfile_name, "w")

resp = gracc_query_apel(year, month)
aggs = resp.aggregations

recs = autodict()

print_header()
print_header(outfile)
for cores_bkt in sorted_buckets(aggs.Cores):
cores = cores_bkt.key
for vo_bkt in sorted_buckets(cores_bkt.VO):
Expand All @@ -262,10 +326,9 @@ def main():
add_record(recs, vo, site, cores, dn, site_bkt)

for rk,rec in sorted(recs.items()):
print_rk_recr(year, month, rk, rec)
print_rk_recr(year, month, rk, rec, outfile)

sys.stdout = orig_stdout
print("wrote: %s" % outfile)
print("wrote: %s" % outfile_name)

if __name__ == '__main__':
main()
Expand Down
116 changes: 116 additions & 0 deletions opensciencegrid/gracc-apel/test_apel_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Testing the apel_report.py script
"""

import unittest
from io import StringIO
import apel_report


class TestApelReport(unittest.TestCase):
"""
Test the apel_report.py script
"""

@staticmethod
def parse_reports(reports: str) -> list[dict]:
"""
Parse the reports string into a list of lines. Example report:
Site: Nebraska
SubmitHost: hepspec-hosts
VO: cms
EarliestEndTime: 0
LatestEndTime: 86499
Month: 05
Year: 2023
Infrastructure: Gratia-OSG
GlobalUserName: cms
Processors: 1
NodeCount: 1
WallDuration: 90
CpuDuration: 90
NormalisedWallDuration: 90
NormalisedCpuDuration: 90
NumberOfJobs: 0
%%
Site: Nebraska
SubmitHost: hepscore-hosts
VO: cms
EarliestEndTime: 0
LatestEndTime: 86499
Month: 05
Year: 2023
Infrastructure: Gratia-OSG
GlobalUserName: cms
Processors: 1
NodeCount: 1
WallDuration: 10
CpuDuration: 10
NormalisedWallDuration: 10
NormalisedCpuDuration: 10
NumberOfJobs: 0
%%
"""
# First spit by the separator, %%
reports = reports.strip().split("%%")
to_return = []

# Each line is in the form of <key>:<value>
# Split each line by the colon and return the value
for report in reports:
report_dict = {}
for line in report.split("\n"):
if line.strip() == "":
continue
kv = line.split(":")
try:
report_dict[kv[0].strip()] = kv[1].strip()
except IndexError as ie:
print("Failure on line:", line)
raise
to_return.append(report_dict)

return to_return

def test_print_rk_recr(self):
"""
Test the print_rk_recr function
"""
# First create the data structures we need
class rk:
site = "Nebraska"
vo = "cms"
cores = 1
dn = "cms"

class rec:
mintime = 0
maxtime = 100
walldur = 100
cpudur = 100
nf = 1
njobs = 1

# Create a textio object to capture the output
# We need to use StringIO because print_rk_recr uses print
# and we want to capture the output
to_write = StringIO()

# Now call the function
apel_report.print_rk_recr(2023, 5, rk, rec, to_write)

# Now check the output
reports = self.parse_reports(to_write.getvalue())

self.assertEqual("Nebraska", reports[0]['Site'])
self.assertEqual("hepspec-hosts", reports[0]['SubmitHost'])
self.assertEqual("hepscore-hosts", reports[1]['SubmitHost'])



if __name__ == '__main__':
unittest.main()

0 comments on commit c5e8eee

Please sign in to comment.