diff --git a/monitor/src/daemonize.c b/monitor/src/daemonize.c index 1758a8d..882c12b 100644 --- a/monitor/src/daemonize.c +++ b/monitor/src/daemonize.c @@ -1,4 +1,4 @@ -#include "daemonize.h" +//#include "daemonize.h" #include #include #include @@ -8,6 +8,10 @@ #include + +int pid_fd; +char *pid_file_name; + void daemonize() { pid_t pid = 0; diff --git a/monitor/src/daemonize.h b/monitor/src/daemonize.h index 72ebfec..98fbd20 100644 --- a/monitor/src/daemonize.h +++ b/monitor/src/daemonize.h @@ -3,8 +3,8 @@ #include -int pid_fd; -char *pid_file_name; +extern int pid_fd; +extern char *pid_file_name; void daemonize(); #endif diff --git a/monitor/src/monitor.c b/monitor/src/monitor.c index fe93787..d19842f 100644 --- a/monitor/src/monitor.c +++ b/monitor/src/monitor.c @@ -29,7 +29,7 @@ static char *server = NULL; static char *queue = "default"; static char *port = "5672"; -static char dumpfile_dir[] = "/tmp/taccstats"; +static char *dumpfile_dir = "/tmp/taccstats"; static double freq = 300; static int max_buffer_size = 300; // 25 hours static int allow_ring_buffer_overwrite = 0; diff --git a/setup.py b/setup.py index 2bedc21..26610d9 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ package_data = {'tacc_stats' : ['cfg.py']}, include_package_data = True, scripts = scripts, - install_requires = ['argparse','numpy', 'psycopg2-binary', 'pandas', + install_requires = ['argparse','numpy', 'psycopg2-binary', 'pandas', 'pgcopy', 'bokeh', 'django', 'python-hostlist', 'PyMySQL', 'mod_wsgi', 'mysql-connector-python', 'python-memcached', 'pika', 'mysqlclient'], platforms = 'any', diff --git a/tacc_stats/dbload/sync_timedb.py b/tacc_stats/dbload/sync_timedb.py index 757b2b0..f3568f1 100755 --- a/tacc_stats/dbload/sync_timedb.py +++ b/tacc_stats/dbload/sync_timedb.py @@ -2,19 +2,34 @@ import psycopg2 from pgcopy import CopyManager import os, sys, stat -from multiprocessing import Pool, get_context +import multiprocessing +import itertools +from multiprocessing import Pool, get_context, Lock, set_start_method + from datetime import datetime, timedelta, date import time, string +import subprocess +import pytz +import tarfile +import random -#import pandas #pandas.set_option('display.max_rows', 100) from pandas import DataFrame, to_datetime, Timedelta, Timestamp, concat from tacc_stats.analysis.gen.utils import read_sql from tacc_stats import cfg - #import pandas - #pandas.set_option('display.max_rows', 100) +# archive toggle +should_archive = True + +# debug messages +debug = False + +# Thread count for database loading and archival +thread_count = 8 + +tgz_archive_dir = "/tacc_stats_site/ls6/tgz_archives/" + CONNECTION = "dbname={0} user=postgres port=5432".format(cfg.dbname) @@ -35,81 +50,36 @@ intel_skx_imc_eventmap = {0x400304 : "CAS_READS,W=48", 0x400c04 : "CAS_WRITES,W=48", 0x400b01 : "ACT_COUNT,W=48", 0x400102 : "PRE_COUNT_MISS,W=48"} -exclude_typs = ["ib", "ib_sw", "intel_skx_cha", "proc", "ps", "sysv_shm", "tmpfs", "vfs"] - -query_create_hostdata_table = """CREATE TABLE IF NOT EXISTS host_data ( - time TIMESTAMPTZ NOT NULL, - host VARCHAR(64), - jid VARCHAR(32), - type VARCHAR(32), - event VARCHAR(64), - unit VARCHAR(16), - value real, - delta real, - arc real, - UNIQUE (time, host, type, event) - );""" - - -query_create_hostdata_hypertable = """CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; - SELECT create_hypertable('host_data', 'time', if_not_exists => TRUE, chunk_time_interval => INTERVAL '1 day'); - CREATE INDEX ON host_data (host, time DESC); - CREATE INDEX ON host_data (jid, time DESC);""" - -query_create_compression = """ALTER TABLE host_data SET \ - (timescaledb.compress, timescaledb.compress_orderby = 'time DESC', timescaledb.compress_segmentby = 'host,jid,type,event'); - SELECT add_compression_policy('host_data', INTERVAL '12h', if_not_exists => true);""" - -conn = psycopg2.connect(CONNECTION) -print(conn.server_version) -with conn.cursor() as cur: - - # This should only be used for testing and debugging purposes - #cur.execute("DROP TABLE IF EXISTS host_data CASCADE;") - - #cur.execute(query_create_hostdata_table) - #cur.execute(query_create_hostdata_hypertable) - #cur.execute(query_create_compression) - cur.execute("SELECT pg_size_pretty(pg_database_size('{0}'));".format(cfg.dbname)) - for x in cur.fetchall(): - print("Database Size:", x[0]) - - cur.execute("SELECT chunk_name,before_compression_total_bytes/(1024*1024*1024),after_compression_total_bytes/(1024*1024*1024) FROM chunk_compression_stats('host_data');") - for x in cur.fetchall(): - try: print("{0} Size: {1:8.1f} {2:8.1f}".format(*x)) - except: pass - - cur.execute("SELECT chunk_name,range_start,range_end FROM timescaledb_information.chunks WHERE hypertable_name = 'host_data';") - for x in cur.fetchall(): - try: - print("{0} Range: {1} -> {2}".format(*x)) - except: pass - conn.commit() -conn.close() +exclude_types = ["ib", "ib_sw", "intel_skx_cha", "ps", "sysv_shm", "tmpfs", "vfs"] +#exclude_types = ["ib", "ib_sw", "intel_skx_cha", "proc", "ps", "sysv_shm", "tmpfs", "vfs"] + -# This routine will read the file until a timestamp is read that is not in the database. It then reads in the rest of the file. -def process(stats_file): + +# This routine will read the file until a timestamp is read that is not in the database. It then reads in the rest of the file. +def add_stats_file_to_db(stats_data): + stats_file, all_compressed_chunks = stats_data hostname, create_time = stats_file.split('/')[-2:] try: fdate = datetime.fromtimestamp(int(create_time)) - except: return stats_file + except: return(stats_file, False) - fdate = datetime.fromtimestamp(int(create_time)) sql = "select distinct(time) from host_data where host = '{0}' and time >= '{1}'::timestamp - interval '24h' and time < '{1}'::timestamp + interval '48h' order by time;".format(hostname, fdate) + conn = psycopg2.connect(CONNECTION) - - times = [int(float(t.timestamp())) for t in read_sql(sql, conn)["time"].tolist()] + + times = [float(t.timestamp()) for t in read_sql(sql, conn)["time"].tolist()] + itimes = [int(t) for t in times] #if len(times) > 0 and max(times) > time.time() - 600: return stats_file - #print(times) + with open(stats_file, 'r') as fd: lines = fd.readlines() # start reading stats data from file at first - 1 missing time start_idx = -1 last_idx = 0 - + need_archival=True first_ts = True for i, line in enumerate(lines): if not line[0]: continue @@ -118,25 +88,29 @@ def process(stats_file): first_ts = False continue t, jid, host = line.split() - if int(float(t)) not in times: + + if (float(t) not in times) and (int(float(t)) not in itimes): start_idx = last_idx + need_archival=False break last_idx = i - if start_idx == -1: return stats_file + if start_idx == -1: return((stats_file, need_archival)) schema = {} stats = [] + proc_stats = [] #process stats insert = False start = time.time() try: for i, line in enumerate(lines): if not line[0]: continue + if jid == '-': continue if line[0].isalpha() and insert: typ, dev, vals = line.split(maxsplit = 2) vals = vals.split() - if typ in exclude_typs: continue + if typ in exclude_types: continue # Mapping hardware counters to events if typ == "amd64_pmc" or typ == "amd64_df" or typ == "intel_8pmc3" or typ == "intel_skx_imc": @@ -168,6 +142,10 @@ def process(stats_file): for idx in sorted(rm_idx, reverse = True): del vals[idx] vals = dict(zip(schema_mod, vals)) + elif typ == "proc": + proc_name=(line.split()[1]).split('/')[0] + proc_stats += [ { **tags2, "proc": proc_name } ] + continue else: # Software counters are not programmable and do not require mapping vals = dict(zip(schema[typ], vals)) @@ -195,18 +173,25 @@ def process(stats_file): t, jid, host = line.split() insert = True tags = { "time" : float(t), "host" : host, "jid" : jid } + tags2 = {"jid": jid, "host" : host} elif line[0] == '!': label, events = line.split(maxsplit = 1) typ, events = label[1:], events.split() schema[typ] = events - except: + except Exception as e: + print("error: process data failed: ", str(e)) print("Possibly corrupt file: %s" % stats_file) - return(stats_file) + return((stats_file, False)) + + unique_entries = set(tuple(d.items()) for d in proc_stats) + # Convert set of tuples back to a list of dictionaries + proc_stats = [dict(entry) for entry in unique_entries] + proc_stats = DataFrame.from_records(proc_stats) stats = DataFrame.from_records(stats) if stats.empty: - return(stats_file) + return((stats_file, False)) # Always drop the first timestamp. For new file this is just first timestamp (at random rotate time). # For update from existing file this is timestamp already in database. @@ -229,45 +214,250 @@ def process(stats_file): stats["time"] = to_datetime(stats["time"], unit = 's').dt.tz_localize('UTC').dt.tz_convert('US/Central') # drop rows from first timestamp - stats = stats.dropna() + stats=stats.dropna() #junjie debug print("processing time for {0} {1:.1f}s".format(stats_file, time.time() - start)) # bulk insertion using pgcopy sqltime = time.time() + + + mgr2 = CopyManager(conn, 'proc_data', proc_stats.columns) + try: + mgr2.copy(proc_stats.values.tolist()) + except Exception as e: + if debug: + print("error in mrg2.copy: " , str(e)) + conn.rollback() + copy_data_to_pgsql_individually(conn, proc_stats, 'proc_data', all_compressed_chunks) + else: + conn.commit() + + mgr = CopyManager(conn, 'host_data', stats.columns) try: mgr.copy(stats.values.tolist()) except Exception as e: - print("error: mgr.copy failed: ", str(e)) - conn.close() - return stats_file + if debug: + print("error in mrg.copy: " , str(e)) + conn.rollback() + need_archival = copy_data_to_pgsql_individually(conn, stats, 'host_data', all_compressed_chunks) + else: + conn.commit() - conn.commit() #print("sql insert time for {0} {1:.1f}s".format(stats_file, time.time() - sqltime)) conn.close() - return stats_file -if __name__ == '__main__': + return((stats_file, need_archival)) + + +def copy_data_to_pgsql_individually(conn, data, table, all_compressed_chunks): + # Decompress chunks if needed + a_day = timedelta(days=1) + if table is 'host_data': + first_date = to_datetime(data["time"].values[0]).replace(tzinfo=pytz.timezone('US/Central')) + last_date = to_datetime(data["time"].values[-1]).replace(tzinfo=pytz.timezone('US/Central')) + day_before_date = first_date - a_day + day_after_date = last_date + a_day + + chunks_needing_decompression = [] + + for i, chunk_data in enumerate(all_compressed_chunks): + chunk_name, chunk_start_date, chunk_end_date, is_compressed, chunk_schema = chunk_data + + # decompress previous, current, and next chunk in case of overlap. + if is_compressed and ( + (chunk_start_date <= first_date <= chunk_end_date) or + (chunk_start_date <= last_date <= chunk_end_date) or + (chunk_start_date <= day_before_date <= chunk_end_date) or + (chunk_start_date <= day_after_date <= chunk_end_date)): + + chunks_needing_decompression.append(all_compressed_chunks[i][4] + "." + all_compressed_chunks[i][0]) + + compression_lock = Lock() + with compression_lock: + with conn.cursor() as curs: + for chunk_name in chunks_needing_decompression: + try: + curs.execute("SELECT decompress_chunk('%s', true);" % chunk_name) + if debug: + print("Chunk decompressed:" + str(curs.fetchall())) + except Exception as e: + print("error in decompressing chunks: " , str(e)) + conn.rollback() + continue + else: + conn.commit() - #while True: + + need_archival = True + unique_violations = 0 + with conn.cursor() as curs: + for row in data.values.tolist(): + + sql_columns = ','.join(['"%s"' % value for value in data.columns.values]) + + sql_insert = 'INSERT INTO "%s" (%s) VALUES ' % (table, sql_columns) + sql_insert = sql_insert + "(" + ','.join(["%s" for i in row]) + ");" + + + try: + curs.execute(sql_insert, row) + except psycopg2.errors.UniqueViolation as uv: + # count for rows that already exist. + unique_violations += 1 + conn.rollback() + except Exception as e: + print("error in single insert: ", e.pgcode, " ", str(e), "while executing", str(sql_insert)) + need_archival = False + conn.rollback() + else: + conn.commit() + if debug: + print("Existing Rows Found in DB: %s" % unique_violations) + + return need_archival + +def archive_stats_files(archive_info): + archive_fname, stats_files = archive_info + archive_tar_fname = archive_fname[:-3] + if os.path.exists(archive_fname): + print(subprocess.check_output(['/usr/bin/gunzip', '-v', archive_fname])) + + existing_archive_file = {} + if os.path.exists(archive_tar_fname): - ################################################################# + try: + with tarfile.open(archive_tar_fname, 'r') as archive_tarfile: + existing_archive_tarinfo = archive_tarfile.getmembers() + + for tar_member_data in existing_archive_tarinfo: + existing_archive_file[tar_member_data.name] = tar_member_data.size + + except Exception as e: + pass + for stats_fname_path in stats_files: + fname_parts = stats_fname_path.split('/') + + if ((stats_fname_path[1:] in existing_archive_file.keys()) and + (tarfile.open('/tmp/test.tar', 'w').gettarinfo(stats_fname_path).size == existing_archive_file[stats_fname_path[1:]])): + + print("file %s found in archive, skipping" % stats_fname_path) + continue + + print(subprocess.check_output(['/usr/bin/tar', 'ufv', archive_tar_fname, stats_fname_path]), flush=True) + print("Archived: " + stats_fname_path) + + + ### VERIFY TAR AND DELETE DATA IF IT IS ARCHIVED AND HAS THE SAME FILE SIZE + with tarfile.open(archive_tar_fname, 'r') as archive_tarfile: + existing_archive_tarinfo = archive_tarfile.getmembers() + for tar_member_data in existing_archive_tarinfo: + existing_archive_file[tar_member_data.name] = tar_member_data.size + + for stats_fname_path in stats_files: + if ((stats_fname_path[1:] in existing_archive_file.keys()) and + (tarfile.open('/tmp/test.tar', 'w').gettarinfo(stats_fname_path).size == + existing_archive_file[stats_fname_path[1:]])): + print("removing stats file:" + stats_fname_path) + os.remove(stats_fname_path) + + + print(subprocess.check_output(['/usr/bin/gzip', '-8', '-v', archive_tar_fname]), flush=True) + +def database_startup(): + + query_create_hostdata_table = """CREATE TABLE IF NOT EXISTS host_data ( + time TIMESTAMPTZ NOT NULL, + host VARCHAR(64), + jid VARCHAR(32), + type VARCHAR(32), + event VARCHAR(64), + unit VARCHAR(16), + value real, + delta real, + arc real, + UNIQUE (time, host, type, event) + );""" + + + query_create_hostdata_hypertable = """CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; + SELECT create_hypertable('host_data', 'time', if_not_exists => TRUE, chunk_time_interval => INTERVAL '1 day'); + CREATE INDEX ON host_data (host, time DESC); + CREATE INDEX ON host_data (jid, time DESC);""" + + query_create_compression = """ALTER TABLE host_data SET \ + (timescaledb.compress, timescaledb.compress_orderby = 'time DESC', timescaledb.compress_segmentby = 'host,jid,type,event'); + SELECT add_compression_policy('host_data', INTERVAL '12h', if_not_exists => true);""" + + + query_create_process_table = """CREATE TABLE IF NOT EXISTS proc_data ( + jid VARCHAR(32) NOT NULL, + host VARCHAR(64), + proc VARCHAR(512), + UNIQUE(jid, host, proc) + );""" + + query_create_process_index = "CREATE INDEX ON proc_data (jid);" + + + conn = psycopg2.connect(CONNECTION) + if debug: + print("Postgresql server version: " + str(conn.server_version)) + + with conn.cursor() as cur: + + # This should only be used for testing and debugging purposes + #cur.execute("DROP TABLE IF EXISTS host_data CASCADE;") + + #cur.execute(query_create_hostdata_table) + #cur.execute(query_create_hostdata_hypertable) + #cur.execute(query_create_compression) + + # cur.execute(query_create_process_table) + # cur.execute(query_create_process_index) + cur.execute("SELECT pg_size_pretty(pg_database_size('{0}'));".format(cfg.dbname)) + for x in cur.fetchall(): + print("Database Size:", x[0]) + if debug: + cur.execute("SELECT chunk_name,before_compression_total_bytes/(1024*1024*1024),after_compression_total_bytes/(1024*1024*1024) FROM chunk_compression_stats('host_data');") + for x in cur.fetchall(): + try: print("{0} Size: {1:8.1f} {2:8.1f}".format(*x)) + except: pass + else: + print("Reading Chunk Data") + + all_compressed_chunks = [] + cur.execute("SELECT chunk_name, range_start,range_end,is_compressed,chunk_schema FROM timescaledb_information.chunks WHERE hypertable_name = 'host_data';") + for x in cur.fetchall(): + try: + all_compressed_chunks.append(x) + if debug: + print("{0} Range: {1} -> {2}".format(*x)) + except: pass + conn.commit() + conn.close() + return all_compressed_chunks + +if __name__ == '__main__': + + all_compressed_chunks = database_startup() + ################################################################# try: startdate = datetime.strptime(sys.argv[1], "%Y-%m-%d") except: - startdate = datetime.combine(datetime.today(), datetime.min.time()) + startdate = datetime.combine(datetime.today(), datetime.min.time()) - timedelta(days = 10) try: enddate = datetime.strptime(sys.argv[2], "%Y-%m-%d") except: - enddate = startdate + timedelta(days = 1) + enddate = startdate + timedelta(days = 10) if (len(sys.argv) > 1): if sys.argv[1] == 'all': startdate = 'all' - enddate = datetime.combine(datetime.today(), datetime.min.time()) - timedelta(days = 1) + enddate = datetime.combine(datetime.today(), datetime.min.time()) print("###Date Range of stats files to ingest: {0} -> {1}####".format(startdate, enddate)) ################################################################# @@ -277,27 +467,61 @@ def process(stats_file): directory = cfg.archive_dir stats_files = [] + ar_file_mapping = {} for entry in os.scandir(directory): - if entry.is_file() or not entry.name.startswith("c"): continue + if entry.is_file() or not (entry.name.startswith("c") or entry.name.startswith("v")): continue for stats_file in os.scandir(entry.path): if startdate == 'all': stats_files += [stats_file.path] continue if not stats_file.is_file() or stats_file.name.startswith('.'): continue if stats_file.name.startswith("current"): continue + fdate=None try: - fdate = datetime.fromtimestamp(int(stats_file.name)) - except: continue + ### different ways to define the date of the file: use timestamp or use the time of the last piece of data + # based on filename + name_fdate = datetime.fromtimestamp(int(stats_file.name)) + + # timestamp of rabbitmq modify + mtime_fdate = datetime.fromtimestamp(int(os.path.getmtime(stats_file.path))) + + fdate=mtime_fdate + except Exception as e: + print("error in obtaining timestamp of raw data files: ", str(e)) + continue if fdate <= startdate - timedelta(days = 1) or fdate > enddate: continue stats_files += [stats_file.path] print("Number of host stats files to process = ", len(stats_files)) - - with Pool(processes = 2) as pool: - for i in pool.imap_unordered(process, stats_files): - print("[{0:.1f}%] completed".format(100*stats_files.index(i)/len(stats_files)), end = "\r") - pool.terminate() + files_to_be_archived = [] + with multiprocessing.get_context('spawn').Pool(processes = thread_count) as pool: + for stats_fname, need_archival in pool.imap_unordered(add_stats_file_to_db, zip(stats_files, itertools.repeat(all_compressed_chunks))): + if should_archive and need_archival: files_to_be_archived.append(stats_fname) + print("[{0:.1f}%] completed".format(100*stats_files.index(stats_fname)/len(stats_files)), end = "\r", flush=True) print("loading time", time.time() - start) + + for stats_fname in files_to_be_archived: + stats_start = open(stats_fname, 'r').readlines(8192) # grab first 8k bytes + archive_fname = '' + for line in stats_start: + if line[0].isdigit(): + t, jid, host = line.split() + file_date = datetime.fromtimestamp(float(t)) + archive_fname = os.path.join(tgz_archive_dir, file_date.strftime("%Y-%m-%d.tar.gz")) + break + + if file_date.date == datetime.today().date: + continue + + + if not archive_fname: + print("Unable to find first timestamp in %s, skipping archiving" % stats_fname) + continue + if archive_fname not in ar_file_mapping: ar_file_mapping[archive_fname] = [] + ar_file_mapping[archive_fname].append(stats_fname) - + + with multiprocessing.get_context('spawn').Pool(processes = thread_count) as pool: + for stats_files_archived in pool.imap_unordered(archive_stats_files, list(ar_file_mapping.items())): + print("[{0:.1f}%] completed".format(100*stats_files.index(stats_fname)/len(stats_files)), end = "\r", flush=True) diff --git a/tacc_stats/rerun_import.sh b/tacc_stats/rerun_import.sh new file mode 100644 index 0000000..94fb742 --- /dev/null +++ b/tacc_stats/rerun_import.sh @@ -0,0 +1,8 @@ +for i in $(seq 30 20 900); + +do + j=$(($i-20)); +# echo `date --date "today - $i day" +\%Y-\%m-\%d` + source /home/sharrell/ls6_ts/bin/activate; /home/sharrell/ls6_ts/tacc_stats/tacc_stats/dbload/sync_timedb.py `date --date "today - $i day" +\%Y-\%m-\%d` `date --date "today - $j day" +\%Y-\%m-\%d` + echo +done