From 2184a6bad3e2a031730da0623696b5f7265ebf8a Mon Sep 17 00:00:00 2001 From: Stephen Lien Harrell Date: Mon, 4 Mar 2024 07:14:50 -0600 Subject: [PATCH] Update sync_timedb --- tacc_stats/dbload/sync_timedb.py | 285 ++++++++++++++++++++++++++----- 1 file changed, 241 insertions(+), 44 deletions(-) diff --git a/tacc_stats/dbload/sync_timedb.py b/tacc_stats/dbload/sync_timedb.py index 757b2b0..6beabbd 100755 --- a/tacc_stats/dbload/sync_timedb.py +++ b/tacc_stats/dbload/sync_timedb.py @@ -5,17 +5,22 @@ from multiprocessing import Pool, get_context from datetime import datetime, timedelta, date import time, string +import subprocess +import pytz +import tarfile +import random -#import pandas #pandas.set_option('display.max_rows', 100) from pandas import DataFrame, to_datetime, Timedelta, Timestamp, concat from tacc_stats.analysis.gen.utils import read_sql from tacc_stats import cfg - #import pandas - #pandas.set_option('display.max_rows', 100) +tgz_archive_dir = "/tacc_stats_site/ls6/tgz_archives/" +thread_count = 2 + +debug = True CONNECTION = "dbname={0} user=postgres port=5432".format(cfg.dbname) @@ -35,7 +40,8 @@ intel_skx_imc_eventmap = {0x400304 : "CAS_READS,W=48", 0x400c04 : "CAS_WRITES,W=48", 0x400b01 : "ACT_COUNT,W=48", 0x400102 : "PRE_COUNT_MISS,W=48"} -exclude_typs = ["ib", "ib_sw", "intel_skx_cha", "proc", "ps", "sysv_shm", "tmpfs", "vfs"] +exclude_types = ["ib", "ib_sw", "intel_skx_cha", "ps", "sysv_shm", "tmpfs", "vfs"] +#exclude_types = ["ib", "ib_sw", "intel_skx_cha", "proc", "ps", "sysv_shm", "tmpfs", "vfs"] query_create_hostdata_table = """CREATE TABLE IF NOT EXISTS host_data ( time TIMESTAMPTZ NOT NULL, @@ -60,8 +66,21 @@ (timescaledb.compress, timescaledb.compress_orderby = 'time DESC', timescaledb.compress_segmentby = 'host,jid,type,event'); SELECT add_compression_policy('host_data', INTERVAL '12h', if_not_exists => true);""" + +query_create_process_table = """CREATE TABLE IF NOT EXISTS proc_data ( +jid VARCHAR(32) NOT NULL, +host VARCHAR(64), +proc VARCHAR(512), +UNIQUE(jid, host, proc) +);""" + +query_create_process_index = "CREATE INDEX ON proc_data (jid);" + + conn = psycopg2.connect(CONNECTION) -print(conn.server_version) +if debug: + print("Postgresql server version: " + str(conn.server_version)) + with conn.cursor() as cur: # This should only be used for testing and debugging purposes @@ -70,46 +89,56 @@ #cur.execute(query_create_hostdata_table) #cur.execute(query_create_hostdata_hypertable) #cur.execute(query_create_compression) - cur.execute("SELECT pg_size_pretty(pg_database_size('{0}'));".format(cfg.dbname)) - for x in cur.fetchall(): - print("Database Size:", x[0]) - - cur.execute("SELECT chunk_name,before_compression_total_bytes/(1024*1024*1024),after_compression_total_bytes/(1024*1024*1024) FROM chunk_compression_stats('host_data');") - for x in cur.fetchall(): - try: print("{0} Size: {1:8.1f} {2:8.1f}".format(*x)) - except: pass - - cur.execute("SELECT chunk_name,range_start,range_end FROM timescaledb_information.chunks WHERE hypertable_name = 'host_data';") + cur.execute(query_create_process_table) + cur.execute(query_create_process_index) + if debug: + cur.execute("SELECT pg_size_pretty(pg_database_size('{0}'));".format(cfg.dbname)) + for x in cur.fetchall(): + print("Database Size:", x[0]) + + cur.execute("SELECT chunk_name,before_compression_total_bytes/(1024*1024*1024),after_compression_total_bytes/(1024*1024*1024) FROM chunk_compression_stats('host_data');") + for x in cur.fetchall(): + try: print("{0} Size: {1:8.1f} {2:8.1f}".format(*x)) + except: pass + else: + print("Reading Chunk Data") + + all_compressed_chunks = [] + cur.execute("SELECT chunk_name, range_start,range_end,is_compressed,chunk_schema FROM timescaledb_information.chunks WHERE hypertable_name = 'host_data';") for x in cur.fetchall(): try: - print("{0} Range: {1} -> {2}".format(*x)) + all_compressed_chunks.append(x) + if debug: + print("{0} Range: {1} -> {2}".format(*x)) except: pass conn.commit() conn.close() + # This routine will read the file until a timestamp is read that is not in the database. It then reads in the rest of the file. -def process(stats_file): +def add_stats_file_to_db(stats_file): hostname, create_time = stats_file.split('/')[-2:] try: fdate = datetime.fromtimestamp(int(create_time)) - except: return stats_file + except: return(stats_file, False) - fdate = datetime.fromtimestamp(int(create_time)) sql = "select distinct(time) from host_data where host = '{0}' and time >= '{1}'::timestamp - interval '24h' and time < '{1}'::timestamp + interval '48h' order by time;".format(hostname, fdate) + conn = psycopg2.connect(CONNECTION) - - times = [int(float(t.timestamp())) for t in read_sql(sql, conn)["time"].tolist()] + + times = [float(t.timestamp()) for t in read_sql(sql, conn)["time"].tolist()] + itimes = [int(t) for t in times] #if len(times) > 0 and max(times) > time.time() - 600: return stats_file - #print(times) + with open(stats_file, 'r') as fd: lines = fd.readlines() # start reading stats data from file at first - 1 missing time start_idx = -1 last_idx = 0 - + need_archival=True first_ts = True for i, line in enumerate(lines): if not line[0]: continue @@ -118,25 +147,29 @@ def process(stats_file): first_ts = False continue t, jid, host = line.split() - if int(float(t)) not in times: + + if (float(t) not in times) and (int(float(t)) not in itimes): start_idx = last_idx + need_archival=False break last_idx = i - if start_idx == -1: return stats_file + if start_idx == -1: return((stats_file, need_archival)) schema = {} stats = [] + proc_stats = [] #process stats insert = False start = time.time() try: for i, line in enumerate(lines): if not line[0]: continue + if jid == '-': continue if line[0].isalpha() and insert: typ, dev, vals = line.split(maxsplit = 2) vals = vals.split() - if typ in exclude_typs: continue + if typ in exclude_types: continue # Mapping hardware counters to events if typ == "amd64_pmc" or typ == "amd64_df" or typ == "intel_8pmc3" or typ == "intel_skx_imc": @@ -168,6 +201,10 @@ def process(stats_file): for idx in sorted(rm_idx, reverse = True): del vals[idx] vals = dict(zip(schema_mod, vals)) + elif typ == "proc": + proc_name=(line.split()[1]).split('/')[0] + proc_stats += [ { **tags2, "proc": proc_name } ] + continue else: # Software counters are not programmable and do not require mapping vals = dict(zip(schema[typ], vals)) @@ -195,18 +232,25 @@ def process(stats_file): t, jid, host = line.split() insert = True tags = { "time" : float(t), "host" : host, "jid" : jid } + tags2 = {"jid": jid, "host" : host} elif line[0] == '!': label, events = line.split(maxsplit = 1) typ, events = label[1:], events.split() schema[typ] = events - except: + except Exception as e: + print("error: process data failed: ", str(e)) print("Possibly corrupt file: %s" % stats_file) - return(stats_file) + return((stats_file, False)) + + unique_entries = set(tuple(d.items()) for d in proc_stats) + # Convert set of tuples back to a list of dictionaries + proc_stats = [dict(entry) for entry in unique_entries] + proc_stats = DataFrame.from_records(proc_stats) stats = DataFrame.from_records(stats) if stats.empty: - return(stats_file) + return((stats_file, False)) # Always drop the first timestamp. For new file this is just first timestamp (at random rotate time). # For update from existing file this is timestamp already in database. @@ -229,24 +273,145 @@ def process(stats_file): stats["time"] = to_datetime(stats["time"], unit = 's').dt.tz_localize('UTC').dt.tz_convert('US/Central') # drop rows from first timestamp - stats = stats.dropna() + stats=stats.dropna() #junjie debug print("processing time for {0} {1:.1f}s".format(stats_file, time.time() - start)) # bulk insertion using pgcopy sqltime = time.time() + + + mgr2 = CopyManager(conn, 'proc_data', proc_stats.columns) + try: + mgr2.copy(proc_stats.values.tolist()) + except Exception as e: + if debug: + print("error in mrg2.copy: " , str(e)) + conn.rollback() + copy_data_to_pgsql_individually(conn, proc_stats, 'proc_data') + else: + conn.commit() + + mgr = CopyManager(conn, 'host_data', stats.columns) try: mgr.copy(stats.values.tolist()) except Exception as e: - print("error: mgr.copy failed: ", str(e)) - conn.close() - return stats_file + if debug: + print("error in mrg.copy: " , str(e)) + conn.rollback() + copy_data_to_pgsql_individually(conn, stats, 'host_data') + else: + conn.commit() - conn.commit() #print("sql insert time for {0} {1:.1f}s".format(stats_file, time.time() - sqltime)) conn.close() - return stats_file + + return((stats_file, need_archival)) + + +def copy_data_to_pgsql_individually(conn, data, table): + # Decompress chunks if needed + a_day = timedelta(days=1) + if table is 'host_data': + file_date = to_datetime(data["time"].values[0]).replace(tzinfo=pytz.timezone('US/Central')) + day_before_date = file_date - a_day + day_after_date = file_date + a_day + for i, chunk_data in enumerate(all_compressed_chunks): + chunk_name, chunk_start_date, chunk_end_date, is_compressed, chunk_schema = chunk_data + + # decompress previous, current, and next chunk in case of overlap. + if is_compressed and ( + (chunk_start_date <= file_date <= chunk_end_date) or + (chunk_start_date <= day_before_date <= chunk_end_date) or + (chunk_start_date <= day_after_date <= chunk_end_date)): + + with conn.cursor() as curs: + try: + curs.execute("SELECT decompress_chunk('%s', true);" % (all_compressed_chunks[i][4] + "." + all_compressed_chunks[i][0])) + if debug: + print("Chunk decompressed:" + str(curs.fetchall())) + except Exception as e: + print("error in decompressing chunks: " , str(e)) + conn.rollback() + sys.exit(1) + else: + conn.commit() + + with conn.cursor() as curs: + for row in data.values.tolist(): + + sql_columns = ','.join(['"%s"' % value for value in data.columns.values]) + + sql_insert = 'INSERT INTO "%s" (%s) VALUES ' % (table, sql_columns) + sql_insert = sql_insert + "(" + ','.join(["%s" for i in row]) + ");" + + try: + curs.execute(sql_insert, row) + except Exception as e: + if debug: + print("error in single insert: " , str(e), "while executing", str(sql_insert)) + conn.rollback() + else: + conn.commit() + + +def archive_stats_files(archive_info): + archive_fname, stats_files = archive_info + archive_tar_fname = archive_fname[:-3] + if os.path.exists(archive_fname): + print(subprocess.check_output(['/usr/bin/gunzip', '-v', archive_fname])) + +# currently_archived_files = [] +# if os.path.exists(archive_tar_fname): +# try: +# currently_archived_files = subprocess.check_output(['/usr/bin/tar', 'tf', archive_tar_fname]).split() +# except Exception as e: +# if debug: +# print("Error when checking for exisiting tarfile: " , str(e)) + + existing_archive_file = {} + if os.path.exists(archive_tar_fname): + + try: + with tarfile.open(archive_tar_fname, 'r') as archive_tarfile: + existing_archive_tarinfo = archive_tarfile.getmembers() + + for tar_member_data in existing_archive_tarinfo: + existing_archive_file[tar_member_data.name] = tar_member_data.size + + except Exception as e: + pass + + for stats_fname_path in stats_files: + fname_parts = stats_fname_path.split('/') + + if ((stats_fname_path[1:] in existing_archive_file.keys()) and + (tarfile.open('/tmp/test.tar', 'w').gettarinfo(stats_fname_path).size == existing_archive_file[stats_fname_path[1:]])): + + print("file %s found in archive, skipping" % stats_fname_path) + continue + + print(subprocess.check_output(['/usr/bin/tar', 'ufv', archive_tar_fname, stats_fname_path])) + print("Archived:" % stats_fname_path) + + + ### VERIFY TAR AND DELETE DATA IF IT IS ARCHIVED AND HAS THE SAME FILE SIZE + with tarfile.open(archive_tar_fname, 'r') as archive_tarfile: + existing_archive_tarinfo = archive_tarfile.getmembers() + for tar_member_data in existing_archive_tarinfo: + existing_archive_file[tar_member_data.name] = tar_member_data.size + + for stats_fname_path in stats_files: + if ((stats_fname_path[1:] in existing_archive_file.keys()) and + (tarfile.open('/tmp/test.tar', 'w').gettarinfo(stats_fname_path).size == + existing_archive_file[stats_fname_path[1:]])): + print("removing stats file:" + stats_fname_path) + os.remove(stats_fname_path) + + + print(subprocess.check_output(['/usr/bin/gzip', '-8', '-v', archive_tar_fname])) + if __name__ == '__main__': @@ -258,7 +423,7 @@ def process(stats_file): try: startdate = datetime.strptime(sys.argv[1], "%Y-%m-%d") except: - startdate = datetime.combine(datetime.today(), datetime.min.time()) + startdate = datetime.combine(datetime.today(), datetime.min.time()) - timedelta(days = 3) try: enddate = datetime.strptime(sys.argv[2], "%Y-%m-%d") except: @@ -277,27 +442,59 @@ def process(stats_file): directory = cfg.archive_dir stats_files = [] + ar_file_mapping = {} for entry in os.scandir(directory): - if entry.is_file() or not entry.name.startswith("c"): continue + if entry.is_file() or not (entry.name.startswith("c") or entry.name.startswith("v")): continue for stats_file in os.scandir(entry.path): if startdate == 'all': stats_files += [stats_file.path] continue if not stats_file.is_file() or stats_file.name.startswith('.'): continue if stats_file.name.startswith("current"): continue + fdate=None try: - fdate = datetime.fromtimestamp(int(stats_file.name)) - except: continue + ### different ways to define the date of the file: use timestamp or use the time of the last piece of data + # based on filename + name_fdate = datetime.fromtimestamp(int(stats_file.name)) + + # timestamp of rabbitmq modify + mtime_fdate = datetime.fromtimestamp(int(os.path.getmtime(stats_file))) + + fdate=mtime_fdate + except Exception as e: + print("error in obtaining timestamp of raw data files: ", str(e)) + continue if fdate <= startdate - timedelta(days = 1) or fdate > enddate: continue stats_files += [stats_file.path] print("Number of host stats files to process = ", len(stats_files)) - - with Pool(processes = 2) as pool: - for i in pool.imap_unordered(process, stats_files): - print("[{0:.1f}%] completed".format(100*stats_files.index(i)/len(stats_files)), end = "\r") + files_to_be_archived = [] + with Pool(processes = thread_count) as pool: + for stats_fname, need_archival in pool.imap_unordered(add_stats_file_to_db, stats_files): + if need_archival: files_to_be_archived.append(stats_fname) + print("[{0:.1f}%] completed".format(100*stats_files.index(stats_fname)/len(stats_files)), end = "\r") pool.terminate() print("loading time", time.time() - start) + + for stats_fname in files_to_be_archived: + stats_start = open(stats_fname, 'r').readlines(8192) # grab first 8k bytes + archive_fname = '' + for line in stats_start: + if line[0].isdigit(): + t, jid, host = line.split() + archive_fname = os.path.join(tgz_archive_dir, datetime.fromtimestamp(float(t)).strftime("%Y-%m-%d.tar.gz")) + break + if not archive_fname: + print("Unable to find first timestamp in %s, skipping archiving" % stats_fname) + continue + if archive_fname not in ar_file_mapping: + ar_file_mapping[archive_fname] = [] + ar_file_mapping[archive_fname].append(stats_fname) - + + with Pool(processes = thread_count) as pool: + for stats_files_archived in pool.imap_unordered(archive_stats_files, list(ar_file_mapping.items())): + print("[{0:.1f}%] completed".format(100*stats_files.index(stats_fname)/len(stats_files)), end = "\r") + #pass + pool.terminate()