Skip to content

Commit

Permalink
Improved multiprocessing efficiency in script
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Shreve committed Aug 15, 2024
1 parent dfab0a8 commit c9801b7
Showing 1 changed file with 65 additions and 40 deletions.
105 changes: 65 additions & 40 deletions vtr_flow/scripts/profiling_utils/parse_lookahead_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
import shutil
import sys
import argparse
from collections import deque
from enum import Enum
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Lock
from multiprocessing import Lock, Process
import pandas as pd
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import distinctipy as dp
from colour import Color
import seaborn as sns

lock = Lock() # Used for multiprocessing


# pylint: disable=too-many-instance-attributes
# pylint: disable=too-few-public-methods
Expand All @@ -41,8 +43,7 @@ def __init__(self,
no_replace: bool,
should_print: bool,
percent_error_threshold: float,
exclusions: dict,
pool: ThreadPoolExecutor):
exclusions: dict):
# Output directory
self.output_dir = "./vtr_flow/tasks/lookahead_verifier_output"
# The graph types (pie, heatmap, bar, scatter) that will be created
Expand Down Expand Up @@ -106,9 +107,8 @@ def __init__(self,
"test name"
]

# Lock and Pool for multithreading
self.lock = Lock()
self.pool = pool
# Processes list for multiprocessing
self.processes = []


def check_valid_component(comp: str):
Expand All @@ -132,21 +132,21 @@ def check_valid_df(df: pd.DataFrame, gv: GlobalVars):
raise Exception("IncompleteDataFrame")


def make_dir(directory: str, clean: bool, gv: GlobalVars):
def make_dir(directory: str, clean: bool):
"""Create a directory"""

gv.lock.acquire()
lock.acquire()

if os.path.exists(directory):
if clean:
shutil.rmtree(directory)
else:
gv.lock.release()
lock.release()
return

os.makedirs(directory)

gv.lock.release()
lock.release()


def column_file_name(column_name: str) -> str:
Expand Down Expand Up @@ -227,7 +227,7 @@ def __init__(
os.path.join(self.__directory, "proportion_under_threshold"),
]
for directory in self.__sub_dirs:
make_dir(directory, False, gv)
make_dir(directory, False)

self.__test_name = test_name

Expand Down Expand Up @@ -336,7 +336,7 @@ def make_scatter_plot(
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
return

make_dir(curr_dir, False, gv)
make_dir(curr_dir, False)

# Determine colors for legend
num_colors = self.__df[legend_column].nunique() + 1
Expand Down Expand Up @@ -427,7 +427,11 @@ def make_standard_scatter_plots(self, test_name_plot: bool, gv: GlobalVars):
if first_it and col == "iteration no.":
continue

gv.pool.submit(self.make_scatter_plot, comp, plot_type, col, first_it, gv)
gv.processes.append((self.make_scatter_plot, (comp,
plot_type,
col,
first_it,
gv)))

# pylint: disable=too-many-locals
def make_bar_graph(self,
Expand Down Expand Up @@ -499,7 +503,7 @@ def make_bar_graph(self,
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
return

make_dir(curr_dir, False, gv)
make_dir(curr_dir, False)

# Get DF with average error for each "type" encountered in column
avg_error = {}
Expand Down Expand Up @@ -548,7 +552,11 @@ def make_standard_bar_graphs(self, test_name_plot: bool, gv: GlobalVars):
for col in columns:
for use_abs in [True, False]:
for first_it in [True, False]:
gv.pool.submit(self.make_bar_graph, comp, col, use_abs, first_it, gv)
gv.processes.append((self.make_bar_graph, (comp,
col,
use_abs,
first_it,
gv)))

# pylint: disable=too-many-locals
def make_heatmap(
Expand Down Expand Up @@ -636,7 +644,7 @@ def make_heatmap(
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
return

make_dir(curr_dir, False, gv)
make_dir(curr_dir, False)

# Get DF with average error for each "coordinate" in the heatmap
df_avgs = pd.DataFrame(columns=[x_column, y_column, scale_column])
Expand Down Expand Up @@ -683,20 +691,18 @@ def make_standard_heatmaps(self, gv: GlobalVars):
for comp in gv.components:
for first_it in [True, False]:
for use_abs in [True, False]:
gv.pool.submit(self.make_heatmap,
comp,
"sink cluster tile width",
"sink cluster tile height",
first_it,
use_abs,
gv)
gv.pool.submit(self.make_heatmap,
comp,
"delta x",
"delta y",
first_it,
use_abs,
gv)
gv.processes.append((self.make_heatmap, (comp,
"sink cluster tile width",
"sink cluster tile height",
first_it,
use_abs,
gv)))
gv.processes.append((self.make_heatmap, (comp,
"delta x",
"delta y",
first_it,
use_abs,
gv)))

# pylint: disable=too-many-locals
def make_pie_chart(
Expand Down Expand Up @@ -764,7 +770,7 @@ def make_pie_chart(
if gv.no_replace and os.path.exists(os.path.join(curr_dir, file_name)):
return

make_dir(curr_dir, False, gv)
make_dir(curr_dir, False)

# Constrict DF to columns whose error is under threshold
curr_df = curr_df[curr_df[f"{comp} % error"] < gv.percent_error_threshold]
Expand Down Expand Up @@ -821,7 +827,11 @@ def make_standard_pie_charts(self, test_name_plot: bool, gv: GlobalVars):
for col in columns:
for first_it in [True, False]:
for weighted in [True, False]:
gv.pool.submit(self.make_pie_chart, comp, col, first_it, weighted, gv)
gv.processes.append((self.make_pie_chart, (comp,
col,
first_it,
weighted,
gv)))

def make_standard_plots(self, test_name_plot: bool, gv: GlobalVars):
"""
Expand Down Expand Up @@ -961,7 +971,7 @@ def make_csv(df_out: pd.DataFrame, file_name: str):
gv.csv_data and
(not os.path.exists(os.path.join(directory, "data.csv")) or not gv.no_replace)
):
gv.pool.submit(make_csv, df, os.path.join(directory, "data.csv"))
gv.processes.append((make_csv, (df, os.path.join(directory, "data.csv"))))

if gv.should_print:
print("Created ", os.path.join(directory, "data.csv"), sep="")
Expand Down Expand Up @@ -1111,7 +1121,6 @@ def parse_args():
)

args = parser.parse_args()
pool = ThreadPoolExecutor(max_workers=args.j)

if args.all:
graph_types = ["bar", "scatter", "heatmap", "pie"]
Expand Down Expand Up @@ -1162,7 +1171,6 @@ def parse_args():
args.print,
args.threshold,
{},
pool
)

for excl in args.exclude:
Expand Down Expand Up @@ -1202,7 +1210,7 @@ def create_complete_outputs(
if len(args.dir_app) > 0:
results_folder_path += f"{args.dir_app[0]}"

make_dir(results_folder_path, False, gv)
make_dir(results_folder_path, False)

df_complete = df_complete.reset_index(drop=True)

Expand All @@ -1228,7 +1236,7 @@ def create_benchmark_outputs(

results_folder = os.path.join(output_folder, "__all__")
results_folder_path = os.path.join(gv.output_dir, results_folder)
make_dir(results_folder_path, False, gv)
make_dir(results_folder_path, False)

df_benchmark = df_benchmark.reset_index(drop=True)

Expand Down Expand Up @@ -1270,7 +1278,7 @@ def create_circuit_outputs(

results_folder = os.path.join(output_folder, test_name)
results_folder_path = os.path.join(gv.output_dir, results_folder)
make_dir(results_folder_path, False, gv)
make_dir(results_folder_path, False)
# Read csv with lookahead data (or, a csv previously created by this script)
df = pd.read_csv(file_path)
df = df.reset_index(drop=True)
Expand Down Expand Up @@ -1329,7 +1337,7 @@ def create_circuit_outputs(
def main():
args, gv = parse_args()

make_dir(gv.output_dir, False, gv)
make_dir(gv.output_dir, False)

# The DF containing info across all given csv files
df_complete = pd.DataFrame(columns=gv.column_names)
Expand Down Expand Up @@ -1371,6 +1379,23 @@ def main():
if args.collect != "":
create_complete_outputs(args, df_complete, gv)

# Create output graphs simultaneously
# This is the best way I have found to do this that avoids having a while
# loop continually checking if any processes have finished while using an
# entire CPU. Pool, concurrent.futures.ProcessPoolExecutor, and
# concurrent.futures.ThreadPoolExecutor seem to have a lot of overhead.
# This loop assumes that the graph-creating functions take approximately
# the same amount of time, which is not the case.
q = deque()
for func, params in gv.processes:
while len(q) >= args.j:
proc = q.popleft()
proc.join()

proc = Process(target=func, args=params)
proc.start()
q.append(proc)


if __name__ == "__main__":
main()

0 comments on commit c9801b7

Please sign in to comment.