From 0b7ef9264646c7cb6109bd8902d7519e2774c20f Mon Sep 17 00:00:00 2001 From: Stephen Lien Harrell Date: Sun, 3 Mar 2024 22:25:40 -0600 Subject: [PATCH] sync_timescale now has archival and auto-chunk decompression for old data, now also deletes archived files - Also a few random bug fixes and improvements from Junjie. --- tacc_stats/analysis/plot/summaryplot.py | 8 ++++---- tacc_stats/dbload/sacct_gen.py | 4 +++- tacc_stats/dbload/sync_acct.py | 14 ++++++++++++++ tacc_stats/listend.py | 7 +++++-- .../machine/templates/machine/type_detail.html | 5 ++++- .../templates/tacc_stats_site/base.html | 14 +++++++------- 6 files changed, 37 insertions(+), 15 deletions(-) diff --git a/tacc_stats/analysis/plot/summaryplot.py b/tacc_stats/analysis/plot/summaryplot.py index 6925892a..6e2f0e14 100644 --- a/tacc_stats/analysis/plot/summaryplot.py +++ b/tacc_stats/analysis/plot/summaryplot.py @@ -5,10 +5,10 @@ from multiprocessing import Pool from datetime import datetime, timedelta import time, string -from pandas import DataFrame, to_datetime, Timedelta, concat +from pandas import DataFrame, to_datetime, Timedelta, concat, read_sql import pandas import numpy as np -from tacc_stats.analysis.gen.utils import read_sql, clean_dataframe +#from tacc_stats.analysis.gen.utils import read_sql, clean_dataframe from bokeh.palettes import d3 from bokeh.layouts import gridplot @@ -27,7 +27,7 @@ def plot_metric(self, df, metric, label): s = time.time() df = df[["time", "host", metric]] - df = clean_dataframe(df) + #df = clean_dataframe(df) y_range_end = 1.1*df[metric].max() if math.isnan(y_range_end): @@ -109,7 +109,7 @@ def plot(self): df["time"] = to_datetime(df["time"], utc = True) df["time"] = df["time"].dt.tz_convert('US/Central').dt.tz_localize(None) - df = clean_dataframe(df) +# df = clean_dataframe(df) plots = [] diff --git a/tacc_stats/dbload/sacct_gen.py b/tacc_stats/dbload/sacct_gen.py index f2b635e3..2f2b1df1 100755 --- a/tacc_stats/dbload/sacct_gen.py +++ b/tacc_stats/dbload/sacct_gen.py @@ -21,5 +21,7 @@ def daterange(start_date, end_date): for single_date in daterange(start_date, end_date): file_name = os.path.join(acct_path, single_date.strftime("%Y-%m-%d")) + ".txt" - os.system("/bin/sacct -a -s CA,CD,F,NF,TO -P -X -S " + single_date.strftime("%Y-%m-%d") + " -E " + (single_date + timedelta(1)).strftime("%Y-%m-%d") +" -o JobID,User,Account,Start,End,Submit,Partition,TimeLimit,JobName,State,NNodes,ReqCPUS,NodeList > " + file_name) + sacct_command = "/bin/sacct -a -s CA,CD,F,NF,TO -P -X -S " + single_date.strftime("%Y-%m-%d") + " -E " + (single_date + timedelta(1)).strftime("%Y-%m-%d") +" -o JobID,User,Account,Start,End,Submit,Partition,TimeLimit,JobName,State,NNodes,ReqCPUS,NodeList > " + file_name + print(sacct_command) + os.system(sacct_command) diff --git a/tacc_stats/dbload/sync_acct.py b/tacc_stats/dbload/sync_acct.py index 63282321..3442ede3 100755 --- a/tacc_stats/dbload/sync_acct.py +++ b/tacc_stats/dbload/sync_acct.py @@ -5,6 +5,7 @@ import psycopg2 from pgcopy import CopyManager +import pandas as pd from pandas import read_csv, to_datetime, to_timedelta, concat import hostlist @@ -54,7 +55,11 @@ def sync_acct(acct_file, date_str): print(date_str) conn = psycopg2.connect(CONNECTION) edf = read_sql("select jid from job_data where date(end_time) = '{0}' ".format(date_str), conn) + print("Total number of existing entries:", edf.shape[0]) +# Junjie: ensure job name is treated as str. + data_types = {8: str} + df = read_csv(acct_file, sep='|') df.rename(columns = {'JobID': 'jid', 'User': 'username', 'Account' : 'account', 'Start' : 'start_time', 'End' : 'end_time', 'Submit' : 'submit_time', 'Partition' : 'queue', @@ -62,6 +67,13 @@ def sync_acct(acct_file, date_str): 'NNodes' : 'nhosts', 'ReqCPUS' : 'ncores', 'NodeList' : 'host_list'}, inplace = True) df["jid"] = df["jid"].apply(str) + # Junjie: in case newer slurm gives "None" time for unstarted jobs. Older slurm prints start_time=end_time=cancelled_time. + ### >>> + df['start_time'].replace('^None$', pd.NA, inplace=True, regex=True) + df['start_time'].replace('^Unknown$', pd.NA, inplace=True, regex=True) + df['start_time'].fillna(df['end_time'], inplace=True) + #### <<< + df["start_time"] = to_datetime(df["start_time"]).dt.tz_localize('US/Central') df["end_time"] = to_datetime(df["end_time"]).dt.tz_localize('US/Central') df["submit_time"] = to_datetime(df["submit_time"]).dt.tz_localize('US/Central') @@ -74,6 +86,8 @@ def sync_acct(acct_file, date_str): df["node_hrs"] = df["nhosts"]*df["runtime"]/3600. df = df[~df["jid"].isin(edf["jid"])] + print("Total number of new entries:", df.shape[0]) + mgr = CopyManager(conn, 'job_data', df.columns) mgr.copy(df.values.tolist()) diff --git a/tacc_stats/listend.py b/tacc_stats/listend.py index 5a3fae52..fcf4979e 100755 --- a/tacc_stats/listend.py +++ b/tacc_stats/listend.py @@ -14,8 +14,11 @@ def on_message(channel, method_frame, header_frame, body): #print(body) return - if message[0] == '$': host = message.split('\n')[1].split()[1] - else: host = message.split()[2] + if message[0] == '$': + host = message.split('\n')[1].split()[1] + else: + host = message.split()[2] + #if host == "localhost.localdomain": return host_dir = os.path.join(cfg.archive_dir, host) if not os.path.exists(host_dir): diff --git a/tacc_stats/site/machine/templates/machine/type_detail.html b/tacc_stats/site/machine/templates/machine/type_detail.html index e58dc362..c7682edf 100644 --- a/tacc_stats/site/machine/templates/machine/type_detail.html +++ b/tacc_stats/site/machine/templates/machine/type_detail.html @@ -23,7 +23,10 @@

Counts Aggregated over devices and hosts

{% else %} -

Type not found

+ +

...

{% endif %} diff --git a/tacc_stats/site/tacc_stats_site/templates/tacc_stats_site/base.html b/tacc_stats/site/tacc_stats_site/templates/tacc_stats_site/base.html index 2a8d4d00..8210cde3 100644 --- a/tacc_stats/site/tacc_stats_site/templates/tacc_stats_site/base.html +++ b/tacc_stats/site/tacc_stats_site/templates/tacc_stats_site/base.html @@ -3,16 +3,16 @@ TACC Stats - - + + - - - - - + + + + +