From c9801b720923cf45471c7d10dd1a502460a8700f Mon Sep 17 00:00:00 2001 From: Nathan Shreve Date: Wed, 14 Aug 2024 18:48:32 -0400 Subject: [PATCH] Improved multiprocessing efficiency in script --- .../profiling_utils/parse_lookahead_data.py | 105 +++++++++++------- 1 file changed, 65 insertions(+), 40 deletions(-) diff --git a/vtr_flow/scripts/profiling_utils/parse_lookahead_data.py b/vtr_flow/scripts/profiling_utils/parse_lookahead_data.py index eaed2fc29e5..3a49c641c1c 100755 --- a/vtr_flow/scripts/profiling_utils/parse_lookahead_data.py +++ b/vtr_flow/scripts/profiling_utils/parse_lookahead_data.py @@ -12,10 +12,10 @@ import shutil import sys import argparse +from collections import deque from enum import Enum from pathlib import Path -from concurrent.futures import ThreadPoolExecutor -from multiprocessing import Lock +from multiprocessing import Lock, Process import pandas as pd from sklearn.metrics import mean_squared_error import matplotlib.pyplot as plt @@ -23,6 +23,8 @@ from colour import Color import seaborn as sns +lock = Lock() # Used for multiprocessing + # pylint: disable=too-many-instance-attributes # pylint: disable=too-few-public-methods @@ -41,8 +43,7 @@ def __init__(self, no_replace: bool, should_print: bool, percent_error_threshold: float, - exclusions: dict, - pool: ThreadPoolExecutor): + exclusions: dict): # Output directory self.output_dir = "./vtr_flow/tasks/lookahead_verifier_output" # The graph types (pie, heatmap, bar, scatter) that will be created @@ -106,9 +107,8 @@ def __init__(self, "test name" ] - # Lock and Pool for multithreading - self.lock = Lock() - self.pool = pool + # Processes list for multiprocessing + self.processes = [] def check_valid_component(comp: str): @@ -132,21 +132,21 @@ def check_valid_df(df: pd.DataFrame, gv: GlobalVars): raise Exception("IncompleteDataFrame") -def make_dir(directory: str, clean: bool, gv: GlobalVars): +def make_dir(directory: str, clean: bool): """Create a directory""" - gv.lock.acquire() + lock.acquire() if os.path.exists(directory): if clean: shutil.rmtree(directory) else: - gv.lock.release() + lock.release() return os.makedirs(directory) - gv.lock.release() + lock.release() def column_file_name(column_name: str) -> str: @@ -227,7 +227,7 @@ def __init__( os.path.join(self.__directory, "proportion_under_threshold"), ] for directory in self.__sub_dirs: - make_dir(directory, False, gv) + make_dir(directory, False) self.__test_name = test_name @@ -336,7 +336,7 @@ def make_scatter_plot( if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)): return - make_dir(curr_dir, False, gv) + make_dir(curr_dir, False) # Determine colors for legend num_colors = self.__df[legend_column].nunique() + 1 @@ -427,7 +427,11 @@ def make_standard_scatter_plots(self, test_name_plot: bool, gv: GlobalVars): if first_it and col == "iteration no.": continue - gv.pool.submit(self.make_scatter_plot, comp, plot_type, col, first_it, gv) + gv.processes.append((self.make_scatter_plot, (comp, + plot_type, + col, + first_it, + gv))) # pylint: disable=too-many-locals def make_bar_graph(self, @@ -499,7 +503,7 @@ def make_bar_graph(self, if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)): return - make_dir(curr_dir, False, gv) + make_dir(curr_dir, False) # Get DF with average error for each "type" encountered in column avg_error = {} @@ -548,7 +552,11 @@ def make_standard_bar_graphs(self, test_name_plot: bool, gv: GlobalVars): for col in columns: for use_abs in [True, False]: for first_it in [True, False]: - gv.pool.submit(self.make_bar_graph, comp, col, use_abs, first_it, gv) + gv.processes.append((self.make_bar_graph, (comp, + col, + use_abs, + first_it, + gv))) # pylint: disable=too-many-locals def make_heatmap( @@ -636,7 +644,7 @@ def make_heatmap( if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)): return - make_dir(curr_dir, False, gv) + make_dir(curr_dir, False) # Get DF with average error for each "coordinate" in the heatmap df_avgs = pd.DataFrame(columns=[x_column, y_column, scale_column]) @@ -683,20 +691,18 @@ def make_standard_heatmaps(self, gv: GlobalVars): for comp in gv.components: for first_it in [True, False]: for use_abs in [True, False]: - gv.pool.submit(self.make_heatmap, - comp, - "sink cluster tile width", - "sink cluster tile height", - first_it, - use_abs, - gv) - gv.pool.submit(self.make_heatmap, - comp, - "delta x", - "delta y", - first_it, - use_abs, - gv) + gv.processes.append((self.make_heatmap, (comp, + "sink cluster tile width", + "sink cluster tile height", + first_it, + use_abs, + gv))) + gv.processes.append((self.make_heatmap, (comp, + "delta x", + "delta y", + first_it, + use_abs, + gv))) # pylint: disable=too-many-locals def make_pie_chart( @@ -764,7 +770,7 @@ def make_pie_chart( if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)): return - make_dir(curr_dir, False, gv) + make_dir(curr_dir, False) # Constrict DF to columns whose error is under threshold curr_df = curr_df[curr_df[f"{comp} % error"] < gv.percent_error_threshold] @@ -821,7 +827,11 @@ def make_standard_pie_charts(self, test_name_plot: bool, gv: GlobalVars): for col in columns: for first_it in [True, False]: for weighted in [True, False]: - gv.pool.submit(self.make_pie_chart, comp, col, first_it, weighted, gv) + gv.processes.append((self.make_pie_chart, (comp, + col, + first_it, + weighted, + gv))) def make_standard_plots(self, test_name_plot: bool, gv: GlobalVars): """ @@ -961,7 +971,7 @@ def make_csv(df_out: pd.DataFrame, file_name: str): gv.csv_data and (not os.path.exists(os.path.join(directory, "data.csv")) or not gv.no_replace) ): - gv.pool.submit(make_csv, df, os.path.join(directory, "data.csv")) + gv.processes.append((make_csv, (df, os.path.join(directory, "data.csv")))) if gv.should_print: print("Created ", os.path.join(directory, "data.csv"), sep="") @@ -1111,7 +1121,6 @@ def parse_args(): ) args = parser.parse_args() - pool = ThreadPoolExecutor(max_workers=args.j) if args.all: graph_types = ["bar", "scatter", "heatmap", "pie"] @@ -1162,7 +1171,6 @@ def parse_args(): args.print, args.threshold, {}, - pool ) for excl in args.exclude: @@ -1202,7 +1210,7 @@ def create_complete_outputs( if len(args.dir_app) > 0: results_folder_path += f"{args.dir_app[0]}" - make_dir(results_folder_path, False, gv) + make_dir(results_folder_path, False) df_complete = df_complete.reset_index(drop=True) @@ -1228,7 +1236,7 @@ def create_benchmark_outputs( results_folder = os.path.join(output_folder, "__all__") results_folder_path = os.path.join(gv.output_dir, results_folder) - make_dir(results_folder_path, False, gv) + make_dir(results_folder_path, False) df_benchmark = df_benchmark.reset_index(drop=True) @@ -1270,7 +1278,7 @@ def create_circuit_outputs( results_folder = os.path.join(output_folder, test_name) results_folder_path = os.path.join(gv.output_dir, results_folder) - make_dir(results_folder_path, False, gv) + make_dir(results_folder_path, False) # Read csv with lookahead data (or, a csv previously created by this script) df = pd.read_csv(file_path) df = df.reset_index(drop=True) @@ -1329,7 +1337,7 @@ def create_circuit_outputs( def main(): args, gv = parse_args() - make_dir(gv.output_dir, False, gv) + make_dir(gv.output_dir, False) # The DF containing info across all given csv files df_complete = pd.DataFrame(columns=gv.column_names) @@ -1371,6 +1379,23 @@ def main(): if args.collect != "": create_complete_outputs(args, df_complete, gv) + # Create output graphs simultaneously + # This is the best way I have found to do this that avoids having a while + # loop continually checking if any processes have finished while using an + # entire CPU. Pool, concurrent.futures.ProcessPoolExecutor, and + # concurrent.futures.ThreadPoolExecutor seem to have a lot of overhead. + # This loop assumes that the graph-creating functions take approximately + # the same amount of time, which is not the case. + q = deque() + for func, params in gv.processes: + while len(q) >= args.j: + proc = q.popleft() + proc.join() + + proc = Process(target=func, args=params) + proc.start() + q.append(proc) + if __name__ == "__main__": main()