diff --git a/arch_gym/envs/AstraSimEnv.py b/arch_gym/envs/AstraSimEnv.py index 737c5b25..10d0156d 100644 --- a/arch_gym/envs/AstraSimEnv.py +++ b/arch_gym/envs/AstraSimEnv.py @@ -8,6 +8,8 @@ import csv import random +from envHelpers import helpers + settings_file_path = os.path.realpath(__file__) settings_dir_path = os.path.dirname(settings_file_path) proj_root_path = os.path.join(settings_dir_path, '..', '..') @@ -16,18 +18,25 @@ # astra-sim environment class AstraSimEnv(gym.Env): - def __init__(self, rl_form="random_walker", max_steps=5, num_agents=1, reward_formulation="None", reward_scaling=1): - # action space = set of all possible actions. Space.sample() returns a random action - self.action_space = gym.spaces.Discrete(16) - # observation space = set of all possible observations - self.observation_space = gym.spaces.Discrete(1) + def __init__(self, rl_form="sa1", max_steps=5, num_agents=1, reward_formulation="None", reward_scaling=1,): + self.rl_form = rl_form + + if self.rl_form == 'sa1': + # action space = set of all possible actions. Space.sample() returns a random action + # observation space = set of all possible observations + self.observation_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) # box is an array of shape len + self.action_space = gym.spaces.Box(low=0, high=1, shape=(4,), dtype=np.float32) + self.helpers = helpers() + else: + self.observation_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) + self.action_space = gym.spaces.Discrete(16) + + # set parameters self.max_steps = max_steps self.counter = 0 self.useful_counter = 0 - - self.rl_form = rl_form self.num_agents = num_agents self.reward_formulation = reward_formulation self.reward_scaling = reward_scaling @@ -49,9 +58,15 @@ def __init__(self, rl_form="random_walker", max_steps=5, num_agents=1, reward_fo self.networks_folder = os.path.join(sim_path, "astrasim-archgym/dse/archgen_v1_knobs/templates/network") self.workloads_folder = os.path.join(sim_path, "astrasim-archgym/themis/inputs/workload") + # Config does not matter self.network_config = os.path.join(self.networks_folder, "3d_fc_ring_switch.json") - self.workload_config = os.path.join(sim_path, "realworld_workloads/transformer_1t_fused_only_t.txt") + self.workload_config = os.path.join(self.workloads_folder, "all_reduce/allreduce_0.65.txt") + self.astrasim_archgym = os.path.join(sim_path, "astrasim-archgym") + self.systems_folder = os.path.join(self.astrasim_archgym, "themis/inputs/system") + self.network_file = "4d_ring_fc_ring_switch.json" + self.system_file = os.path.join(self.systems_folder, "4d_ring_fc_ring_switch_baseline.txt") + self.workload_file = "all_reduce/allreduce_0.65.txt" print("_____________________*****************************_____________________") @@ -60,6 +75,7 @@ def __init__(self, rl_form="random_walker", max_steps=5, num_agents=1, reward_fo # reset function def reset(self): + self.counter = 0 # get results folder path results_folder_path = os.path.join(sim_path, "results", "run_general") @@ -72,7 +88,13 @@ def reset(self): csv_files = os.path.join(results_folder_path, csv_files) if os.path.exists(csv_files): os.remove(csv_files) - return + + # TODO: + obs = np.zeros(self.observation_space.shape) + + return obs + + # parses a result csv file and stores it in a dictionary def parse_result(self, file_name): @@ -113,13 +135,45 @@ def calculate_reward(self, observations): print(sum) return 1 / (sum ** 0.5) + + # parse system_file (above is the content) into dict + def parse_system(self, system_file, action_dict): + action_dict['system'] = {} + with open(system_file, 'r') as file: + lines = file.readlines() + + for line in lines: + key, value = line.strip().split(': ') + action_dict['system'][key] = value + # give it one action: one set of parameters from json file def step(self, action_dict): - # write the three config files - # with open(self.network_config, "w") as outfile: - # outfile.write(json.dumps(action_dict['network'], indent=4)) - print(action_dict) + if not isinstance(action_dict, dict): + with open(settings_dir_path + "/AstraSimRL_2.csv", 'a') as f: + writer = csv.writer(f) + writer.writerow(action_dict) + + print("STEP: action_dict is a list") + action_dict_decoded = {} + action_dict_decoded['network'] = {"path": self.network_file} + action_dict_decoded['workload'] = {"path": self.workload_file} + + # parse system: initial values + self.parse_system(self.system_file, action_dict_decoded) + + # returning an + action_decoded = self.helpers.action_decoder_ga_astraSim(action_dict) + + # change all variables decoded into action_dict + for sect in action_decoded: + for key in action_decoded[sect]: + action_dict_decoded[sect][key] = action_decoded[sect][key] + + action_dict = action_dict_decoded + + + if "path" in action_dict["network"]: self.network_config = action_dict["network"]["path"] @@ -135,12 +189,7 @@ def step(self, action_dict): # the action is actually the parsed parameter files print("Step: " + str(self.counter)) - if (self.counter == self.max_steps): - self.done = True - print("Maximum steps reached") - self.reset() - else: - self.counter += 1 + self.counter += 1 # start subrpocess to run the simulation # $1: network, $2: system, $3: workload @@ -174,6 +223,12 @@ def step(self, action_dict): sample_all_reduce_dimension_utilization = self.parse_result(sim_path + '/results/run_general/sample_all_reduce_dimension_utilization.csv') + if (self.counter == self.max_steps): + self.done = True + print("Maximum steps reached") + self.reset() + + # test if the csv files exist (if they don't, the config files are invalid) if ((len(backend_dim_info) == 0 or len(backend_end_to_end) == 0 or len(detailed) == 0 or len(end_to_end) == 0 or @@ -181,50 +236,33 @@ def step(self, action_dict): # set reward to be extremely negative reward = float("-inf") print("reward: ", reward) - return [[], reward, self.done, {"useful_counter": self.useful_counter}, self.state] + return [], reward, self.done, {"useful_counter": self.useful_counter}, self.state else: # only recording the first line because apparently they are all the same? TODO - self.observations = [ - backend_end_to_end["CommsTime"][0], + observations = [ + float(backend_end_to_end["CommsTime"][0]) # end_to_end["fwd compute"][0], # end_to_end["wg compute"][0], # end_to_end["ig compute"][0], # end_to_end["total exposed comm"][0] ] - reward = self.calculate_reward(self.observations) - print("reward: ", reward) - print("observations: ", self.observations) + + reward = self.calculate_reward(observations) + + print("reward: ", reward) + + # reshape observations with shape of observation space + observations = np.reshape(observations, self.observation_space.shape) self.useful_counter += 1 - return [self.observations, reward, self.done, {"useful_counter": self.useful_counter}, self.state] + return observations, reward, self.done, {"useful_counter": self.useful_counter}, self.state if __name__ == "__main__": print("Testing AstraSimEnv") - env = AstraSimEnv(rl_form='random_walker', + env = AstraSimEnv(rl_form='sa1', max_steps=10, num_agents=1, reward_formulation='reward_formulation_1', reward_scaling=1) - - - - - - - """ - Everytime rest happens: - - zero out the observation - - 3/24: - Communication Time (unit: microseconds) - Time breakdowns (forward pass, weight gradient, input gradient) - Exposed communication - - - 3/31: - Catch errors by giving it high negative reward. This way we can test the range. - - - """ diff --git a/arch_gym/envs/AstraSimWrapper.py b/arch_gym/envs/AstraSimWrapper.py index 02ef73d9..de42ce3b 100644 --- a/arch_gym/envs/AstraSimWrapper.py +++ b/arch_gym/envs/AstraSimWrapper.py @@ -13,7 +13,7 @@ # limitations under the License. """Wraps an OpenAI Gym environment to be used as a dm_env environment.""" -import sys, os +import sys from typing import Any, Dict, List, Optional from acme import specs @@ -25,8 +25,8 @@ import numpy as np import tree -os.sys.path.insert(0, os.path.abspath('../../')) -from arch_gym.envs.AstraSimEnv import AstraSimEnv +from AstraSimEnv import AstraSimEnv +from envHelpers import helpers # dm = deepmind class AstraSimEnvWrapper(dm_env.Environment): @@ -41,6 +41,7 @@ def __init__(self, environment: gym.Env, self._environment = environment self._reset_next_step = True self._last_info = None + self.helper = helpers() self.env_wrapper_sel = env_wrapper_sel # set useful counter @@ -182,12 +183,12 @@ def _convert_to_spec(space: gym.Space, else: raise ValueError('Unexpected gym space: {}'.format(space)) -def make_astraSim_env(seed: int = 12345, - rl_form = 'macme', +def make_astraSim_env(seed: int = 12234, + rl_form = 'sa1', reward_formulation = 'power', reward_scaling = 'false', - max_steps: int = 100, - num_agents: int = 10) -> dm_env.Environment: + max_steps: int = 1, + num_agents: int = 1) -> dm_env.Environment: """Returns DRAMSys environment.""" print("[DEBUG][Seed]", seed) print("[DEBUG][RL Form]", rl_form) @@ -205,7 +206,8 @@ def make_astraSim_env(seed: int = 12345, ), env_wrapper_sel = rl_form ) + environment = wrappers.SinglePrecisionWrapper(environment) - if(rl_form == 'sa' or rl_form == 'tdm'): - environment = wrappers.CanonicalSpecWrapper(environment, clip=True) + if(rl_form == 'sa1' or rl_form == 'tdm'): + environment = wrappers.CanonicalSpecWrapper(environment, clip=False) return environment \ No newline at end of file diff --git a/arch_gym/envs/envHelpers.py b/arch_gym/envs/envHelpers.py index d6836057..784fb789 100644 --- a/arch_gym/envs/envHelpers.py +++ b/arch_gym/envs/envHelpers.py @@ -797,10 +797,10 @@ def action_decoder_ga_astraSim(self, act_encoded): interDimension_mapper = {0: "baseline", 1: "themis"} # Modified system parameters - act_decoded["system"]["scheduling-policy"] = schedulePolicy_mapper[int(act_encoded[0])] - act_decoded["system"]["collective-optimization"] = collectiveOptimization_mapper[int(act_encoded[1])] - act_decoded["system"]["intra-dimension-scheduling"] = intraDimension_mapper[int(act_encoded[2])] - act_decoded["system"]["inter-dimension-scheduling"] = interDimension_mapper[int(act_encoded[3])] + act_decoded["system"]["scheduling-policy"] = schedulePolicy_mapper[int(round(act_encoded[0]))] + act_decoded["system"]["collective-optimization"] = collectiveOptimization_mapper[int(round(act_encoded[1]))] + act_decoded["system"]["intra-dimension-scheduling"] = intraDimension_mapper[int(round(act_encoded[2]))] + act_decoded["system"]["inter-dimension-scheduling"] = interDimension_mapper[int(round(act_encoded[3]))] return act_decoded diff --git a/sims/AstraSim/AstraSimRL.csv b/sims/AstraSim/AstraSimRL.csv new file mode 100644 index 00000000..4624ae37 --- /dev/null +++ b/sims/AstraSim/AstraSimRL.csv @@ -0,0 +1,5 @@ +0.17908713,0.44089648,0.83359694,0.2673431 +0.5,0.5,0.5,0.5 +0.0,0.0,0.0,1.0 +0.0,0.0,0.0,1.0 +0.17908713,0.44089648,0.83359694,0.2673431 diff --git a/sims/AstraSim/astrasim-archgym b/sims/AstraSim/astrasim-archgym new file mode 160000 index 00000000..2ff6b732 --- /dev/null +++ b/sims/AstraSim/astrasim-archgym @@ -0,0 +1 @@ +Subproject commit 2ff6b7325d0e21229124f1101a7d2941f434267c diff --git a/sims/AstraSim/bo_logs/metadata.riegeli b/sims/AstraSim/bo_logs/metadata.riegeli new file mode 100644 index 00000000..1eb1858e Binary files /dev/null and b/sims/AstraSim/bo_logs/metadata.riegeli differ diff --git a/sims/AstraSim/exp_config.ini b/sims/AstraSim/exp_config.ini new file mode 100644 index 00000000..b75f3bc8 --- /dev/null +++ b/sims/AstraSim/exp_config.ini @@ -0,0 +1,7 @@ +[experiment_configuration] +exp_name = resnet18_random_state_2_num_iter_16 +trajectory_dir = ./bo_trajectories/power/resnet18_random_state_2_num_iter_16 +log_dir = ./bo_logs/power/resnet18_random_state_2_num_iter_16 +reward_formulation = power +use_envlogger = True + diff --git a/sims/AstraSim/general_workload.txt b/sims/AstraSim/general_workload.txt new file mode 100644 index 00000000..c95a1f2a --- /dev/null +++ b/sims/AstraSim/general_workload.txt @@ -0,0 +1,20 @@ +HYBRID_TRANSFORMER_FWD_IN_BCKWD model_parallel_NPU_group: 128 checkpoints: 2 0 9 checkpoint_initiates: 2 17 8 +18 +Q1 -1 2343750 NONE 0 2343750 ALLREDUCE 805306368 2343750 ALLREDUCE 240316416 10 +K1 -1 2343750 NONE 0 2343750 NONE 0 2343750 NONE 0 10 +V1 -1 2343750 NONE 0 2343750 NONE 0 2343750 NONE 0 10 +QK1 -1 97656 NONE 0 97656 NONE 0 97656 NONE 0 10 +softmax1 -1 97656 NONE 0 97656 NONE 0 97656 NONE 0 10 +concat1 -1 2343750 ALLREDUCE 805306368 2343750 ALLGATHER 6291456 2343750 NONE 0 10 +X1W1b1 -1 9375000 NONE 0 9375000 ALLREDUCE 805306368 9375000 NONE 0 10 +X1W2b2 -1 9375000 ALLREDUCE 805306368 9375000 NONE 0 9375000 NONE 0 10 +layerNorm1 -1 12207 NONE 0 12207 NONE 0 12207 NONE 0 10 +Q2 -1 2343750 NONE 0 2343750 ALLREDUCE 805306368 2343750 NONE 0 10 +K2 -1 2343750 NONE 0 2343750 NONE 0 2343750 NONE 0 10 +V2 -1 2343750 NONE 0 2343750 NONE 0 2343750 NONE 0 10 +QK2 -1 97656 NONE 0 97656 NONE 0 97656 NONE 0 10 +softmax2 -1 97656 NONE 0 97656 NONE 0 97656 NONE 0 10 +concat2 -1 2343750 ALLREDUCE 805306368 2343750 ALLGATHER 6291456 2343750 NONE 0 10 +X2W1b1 -1 9375000 NONE 0 9375000 ALLREDUCE 805306368 9375000 NONE 0 10 +X2W2b2 -1 9375000 ALLREDUCE 805306368 9375000 NONE 0 9375000 NONE 0 10 +layerNorm2 -1 12207 NONE 0 12207 NONE 0 12207 NONE 0 10 \ No newline at end of file diff --git a/sims/AstraSim/gridSearchAstraSim.py b/sims/AstraSim/gridSearchAstraSim.py new file mode 100644 index 00000000..9c63a6eb --- /dev/null +++ b/sims/AstraSim/gridSearchAstraSim.py @@ -0,0 +1,159 @@ +import os +import sys +import pickle + +from absl import app + +os.sys.path.insert(0, os.path.abspath('../../')) +os.sys.path.insert(0, os.path.abspath('../../arch_gym')) + +from configs import arch_gym_configs + +from arch_gym.envs.envHelpers import helpers +from arch_gym.envs import AstraSimWrapper, AstraSimEnv +import envlogger +import numpy as np +import pandas as pd +import random +import time +import json + +# systems: parse from file into json into generate_random_actions +""" +system_file content: +scheduling-policy: LIFO +endpoint-delay: 1 +active-chunks-per-dimension: 1 +preferred-dataset-splits: 64 +boost-mode: 1 +all-reduce-implementation: direct_ring_halvingDoubling +all-gather-implementation: direct_ring_halvingDoubling +reduce-scatter-implementation: direct_ring_halvingDoubling +all-to-all-implementation: direct_direct_direct +collective-optimization: localBWAware +intra-dimension-scheduling: FIFO +inter-dimension-scheduling: baseline +""" +def parse_system(system_file, action_dict): + # parse system_file (above is the content) into dict + action_dict['system'] = {} + with open(system_file, 'r') as file: + lines = file.readlines() + + for line in lines: + key, value = line.strip().split(': ') + action_dict['system'][key] = value + +# parses knobs that we want to experiment with +def parse_knobs(knobs_spec): + SYSTEM_KNOBS = {} + NETWORK_KNOBS = {} + + with open(knobs_spec, 'r') as file: + file_contents = file.read() + parsed_dicts = {} + + # Evaluate the file contents and store the dictionaries in the parsed_dicts dictionary + exec(file_contents, parsed_dicts) + + # Access the dictionaries + SYSTEM_KNOBS = parsed_dicts['SYSTEM_KNOBS'] + NETWORK_KNOBS = parsed_dicts['NETWORK_KNOBS'] + + return SYSTEM_KNOBS, NETWORK_KNOBS + + +# action_type = specify 'network' or 'system +# new_params = parsed knobs from experiment file +def generate_random_actions(action_dict, system_knob, network_knob, args): + dicts = [(system_knob, 'system'), (network_knob, 'network')] + for dict_type, dict_name in dicts: + i = 0 + for knob in dict_type.keys(): + if isinstance(dict_type[knob], set): + action_dict[dict_name][knob] = list(dict_type[knob])[args[i]] + i += 1 + + return action_dict + + +def main(_): + settings_file_path = os.path.realpath(__file__) + settings_dir_path = os.path.dirname(settings_file_path) + proj_root_path = os.path.abspath(settings_dir_path) + + astrasim_archgym = os.path.join(proj_root_path, "astrasim-archgym") + + # TODO: V1 SPEC: + archgen_v1_knobs = os.path.join(astrasim_archgym, "dse/archgen_v1_knobs") + knobs_spec = os.path.join(archgen_v1_knobs, "archgen_v1_knobs_spec.py") + networks_folder = os.path.join(archgen_v1_knobs, "templates/network") + systems_folder = os.path.join(astrasim_archgym, "themis/inputs/system") + workloads_folder = os.path.join(astrasim_archgym, "themis/inputs/workload") + + # DEFINE NETWORK AND SYSTEM AND WORKLOAD + network_file = "4d_ring_fc_ring_switch.json" + system_file = os.path.join(systems_folder, "4d_ring_fc_ring_switch_baseline.txt") + workload_file = "all_reduce/allreduce_0.65.txt" + + env = AstraSimWrapper.make_astraSim_env(rl_form='random_walker') + # env = AstraSimEnv.AstraSimEnv(rl_form='random_walker') + + astrasim_helper = helpers() + + start = time.time() + + step_results = {} + + all_results = [] + best_reward, best_observation, best_actions = 0.0, 0.0, {} + + for sp in [0, 1]: + for co in [0, 1]: + for intra in [0, 1]: + for inter in [0, 1]: + # INITIATE action dict + action_dict = {} + args = [sp, co, intra, inter] + + # if path exists, use path, else parse the sub-dict + action_dict['network'] = {"path": network_file} + action_dict['workload'] = {"path": workload_file} + + # TODO: parse system + parse_system(system_file, action_dict) + + # TODO: parse knobs (all variables to change in action_dict) + system_knob, network_knob = parse_knobs(knobs_spec) + + # pass into generate_random_actions(dimension, knobs) + action_dict = generate_random_actions(action_dict, system_knob, network_knob, args) + + # with open("general_workload.txt", 'w') as file: + # file.write(action["workload"]["value"]) + + # step_result wrapped in TimeStep object + step_result = env.step(action_dict) + step_type, reward, discount, observation = step_result + + step_results['reward'] = [reward] + step_results['action'] = action_dict + step_results['obs'] = observation + + all_results.append((reward, observation)) + + if reward and reward > best_reward: + best_reward = reward + best_observation = observation + best_actions = action_dict + + end = time.time() + + print("Best Reward: ", best_reward) + print("Best Observation: ", best_observation) + print("Best Parameters: ", best_actions) + print("All Results: ", all_results) + + +if __name__ == '__main__': + app.run(main) diff --git a/sims/AstraSim/restructure.py b/sims/AstraSim/restructure.py deleted file mode 100644 index f17e131e..00000000 --- a/sims/AstraSim/restructure.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import csv - -def read_csv_column(file_path): - with open(file_path, 'r', newline='') as csvfile: - reader = csv.reader(csvfile) - column_data = [row if row else "NA" for row in reader] - return column_data - -def merge_columns(file1_path, file2_path, output_file_path): - column1_data = read_csv_column(file1_path) - column2_data = read_csv_column(file2_path) - - # Ensure both columns have the same length - if len(column1_data) != len(column2_data): - raise ValueError("The columns must have the same length.") - # Merge the columns into a list of dictionaries - merged_data = [{'x': column1_data[i], 'y': column2_data[i]} for i in range(len(column1_data))] - - # Write the merged data to a new CSV file - with open(output_file_path, 'w', newline='') as outfile: - fieldnames = ['x', 'y'] - writer = csv.DictWriter(outfile, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(merged_data) - -if __name__ == "__main__": - log_path = "random_walker_logs/latency/resnet18_num_steps_6_num_episodes_1/" - file1_path = os.path.join(log_path, "actions.csv") - file2_path = os.path.join(log_path, "observations.csv") - output_file_path = os.path.join(log_path, "merged.csv") - merge_columns(file1_path, file2_path, output_file_path) diff --git a/sims/AstraSim/run_3dfrs.sh b/sims/AstraSim/run_3dfrs.sh new file mode 100755 index 00000000..657eb6f8 --- /dev/null +++ b/sims/AstraSim/run_3dfrs.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +SCRIPT_DIR=$(dirname "$(realpath $0)") +BINARY="${SCRIPT_DIR:?}"/astrasim-archgym/astra-sim/build/astra_analytical/build/AnalyticalAstra/bin/AnalyticalAstra +WORKLOAD="${SCRIPT_DIR:?}"/3d_frs_baseline_allreduce065/allreduce_065.txt +SYSTEM="${SCRIPT_DIR:?}"/3d_frs_baseline_allreduce065/general_system.txt +NETWORK="${SCRIPT_DIR:?}"/3d_frs_baseline_allreduce065/3d_fc_ring_switch.json +RUN_DIR="${SCRIPT_DIR:?}"/3d_frs_baseline_allreduce065/ + +"${BINARY}" \ + --workload-configuration="${WORKLOAD}" \ + --system-configuration="${SYSTEM}" \ + --network-configuration="${NETWORK}" \ + --path="${RUN_DIR}" \ + --run-name="3d_frs_baseline_allreduce065" > ${RUN_DIR}/stdout \ No newline at end of file diff --git a/sims/AstraSim/run_general.sh b/sims/AstraSim/run_general.sh index 72b54b54..b7813d87 100755 --- a/sims/AstraSim/run_general.sh +++ b/sims/AstraSim/run_general.sh @@ -4,7 +4,6 @@ SCRIPT_DIR=$(dirname "$(realpath $0)") # Absolute paths to useful directories - BINARY="${SCRIPT_DIR:?}"/astrasim-archgym/astra-sim/build/astra_analytical/build/AnalyticalAstra/bin/AnalyticalAstra SYSTEM="${SCRIPT_DIR:?}"/general_system.txt NETWORK="${SCRIPT_DIR:?}"/astrasim-archgym/themis/inputs/network/analytical/$1 @@ -14,8 +13,6 @@ echo "SH NETWORK: ${NETWORK}" echo "SH SYSTEM: ${SYSTEM}" echo "SH WORKLOAD: ${WORKLOAD}" -# WORKLOAD="${SCRIPT_DIR:?}"/astra-sim/inputs/workload/Transformer_HybridParallel.txt # CHANGE THIS - STATS="${SCRIPT_DIR:?}"/results/run_general rm -rf "${STATS}" @@ -27,8 +24,4 @@ mkdir "${STATS}" --workload-configuration="${WORKLOAD}" \ --path="${STATS}/" \ --run-name="sample_all_reduce" \ ---num-passes=5 \ ---comm-scale=50 \ ---total-stat-rows=1 \ ---stat-row=0 diff --git a/sims/AstraSim/trainACOAstraSim.py b/sims/AstraSim/trainACOAstraSim.py index 56804567..2ea47fb2 100644 --- a/sims/AstraSim/trainACOAstraSim.py +++ b/sims/AstraSim/trainACOAstraSim.py @@ -22,7 +22,7 @@ flags.DEFINE_string('aco_log_dir', 'aco_logs', 'Directory to store logs.') flags.DEFINE_string('workload', 'stream.stl', 'Which workload to run') flags.DEFINE_string('exp_config_file', 'exp_config.ini', 'Experiment config file.') -flags.DEFINE_integer('depth', 1, 'Depth of the network.') +flags.DEFINE_integer('depth', 10, 'Depth of the network.') flags.DEFINE_string('summary_dir', '.', 'Directory to store summaries.') flags.DEFINE_string('reward_formulation', 'power', 'Reward formulation to use.') flags.DEFINE_bool('use_envlogger', True, 'Use EnvLogger to log environment data.') diff --git a/sims/AstraSim/trainGAAstraSim.py b/sims/AstraSim/trainGAAstraSim.py index 2c4a6854..b7bcf8a0 100644 --- a/sims/AstraSim/trainGAAstraSim.py +++ b/sims/AstraSim/trainGAAstraSim.py @@ -20,7 +20,7 @@ import pandas as pd import matplotlib.pyplot as plt -flags.DEFINE_integer('num_steps', 10, 'Number of training steps.') +flags.DEFINE_integer('num_steps', 20, 'Number of training steps.') flags.DEFINE_integer('num_agents', 4, 'Number of agents.') flags.DEFINE_float('prob_mutation', 0.1, 'Probability of mutation.') flags.DEFINE_string('workload','resnet18', 'ML model name') @@ -86,9 +86,9 @@ def AstraSim_optimization_function(p): workloads_folder = os.path.join(astrasim_archgym, "themis/inputs/workload") # DEFINE NETWORK AND SYSTEM AND WORKLOAD - network_file = "3d_fc_ring_switch.json" - system_file = os.path.join(systems_folder, "3d_fc_ring_switch_baseline.txt") - workload_file = "gnmt_fp16_fused.txt" + network_file = "4d_ring_fc_ring_switch.json" + system_file = os.path.join(systems_folder, "4d_ring_fc_ring_switch_baseline.txt") + workload_file = "all_reduce/allreduce_0.65.txt" # parse knobs system_knob, network_knob = parse_knobs(knobs_spec) @@ -122,7 +122,6 @@ def AstraSim_optimization_function(p): # parse system parse_system(system_file, action_dict) - action_dict_decoded = astraSim_helper.action_decoder_ga_astraSim(p) # change all variables decoded into action_dict diff --git a/sims/AstraSim/trainRandomWalkerAstraSim.py b/sims/AstraSim/trainRandomWalkerAstraSim.py index 29914f71..f6393975 100644 --- a/sims/AstraSim/trainRandomWalkerAstraSim.py +++ b/sims/AstraSim/trainRandomWalkerAstraSim.py @@ -9,7 +9,10 @@ os.sys.path.insert(0, os.path.abspath('../../')) os.sys.path.insert(0, os.path.abspath('../../arch_gym')) -from arch_gym.envs import AstraSimWrapper +from configs import arch_gym_configs + +from arch_gym.envs.envHelpers import helpers +from arch_gym.envs import AstraSimWrapper, AstraSimEnv import envlogger import numpy as np import pandas as pd @@ -20,7 +23,7 @@ # define workload in run_general.sh flags.DEFINE_string('workload', 'resnet18', 'Which AstraSim workload to run?') -flags.DEFINE_integer('num_steps', 2, 'Number of training steps.') +flags.DEFINE_integer('num_steps', 50, 'Number of training steps.') flags.DEFINE_integer('num_episodes', 1, 'Number of training episodes.') flags.DEFINE_bool('use_envlogger', True, 'Use envlogger to log the data.') flags.DEFINE_string('traject_dir', @@ -194,8 +197,6 @@ def parse_system(system_file, action_dict): key, value = line.strip().split(': ') action_dict['system'][key] = value - - # def parse_workload(workload_file): @@ -283,9 +284,9 @@ def main(_): workloads_folder = os.path.join(astrasim_archgym, "themis/inputs/workload") # DEFINE NETWORK AND SYSTEM AND WORKLOAD - network_file = "3d_fc_ring_switch.json" - system_file = os.path.join(systems_folder, "3d_fc_ring_switch_baseline.txt") - workload_file = "gnmt_fp16_fused.txt" + network_file = "4d_ring_fc_ring_switch.json" + system_file = os.path.join(systems_folder, "4d_ring_fc_ring_switch_baseline.txt") + workload_file = "all_reduce/allreduce_0.65.txt" exe_path = os.path.join(proj_root_path, "run_general.sh") @@ -297,6 +298,8 @@ def main(_): env = AstraSimWrapper.make_astraSim_env(rl_form='random_walker') # env = AstraSimEnv.AstraSimEnv(rl_form='random_walker') + astrasim_helper = helpers() + # experiment name exp_name = str(FLAGS.workload)+"_num_steps_" + str(FLAGS.num_steps) + "_num_episodes_" + str(FLAGS.num_episodes) # append logs to base path @@ -322,7 +325,7 @@ def main(_): # INITIATE action dict action_dict = {} - # TODO: load network and workloads + # if path exists, use path, else parse the sub-dict action_dict['network'] = {"path": network_file} action_dict['workload'] = {"path": workload_file} @@ -332,6 +335,8 @@ def main(_): # TODO: parse knobs (all variables to change in action_dict) system_knob, network_knob = parse_knobs(knobs_spec) + best_reward, best_observation, best_actions = 0.0, 0.0, {} + for i in range(FLAGS.num_episodes): logging.info('Episode %r', i) @@ -350,11 +355,19 @@ def main(_): step_results['reward'] = [reward] step_results['action'] = action_dict step_results['obs'] = observation + + if reward and reward > best_reward: + best_reward = reward + best_observation = observation + best_actions = action_dict log_results_to_csv(log_path, step_results) end = time.time() + print("Best Reward: ", best_reward) + print("Best Observation: ", best_observation) + print("Best Parameters: ", best_actions) print("Total Time Taken: ", end - start) print("Total Useful Steps: ", env.useful_counter) diff --git a/sims/AstraSim/trainSingleAgentAstraSim.py b/sims/AstraSim/trainSingleAgentAstraSim.py new file mode 100644 index 00000000..54cd4a1e --- /dev/null +++ b/sims/AstraSim/trainSingleAgentAstraSim.py @@ -0,0 +1,205 @@ +import os +import sys +import json +from typing import Optional + +os.sys.path.insert(0, os.path.abspath('../../')) +from configs import arch_gym_configs +os.sys.path.insert(0, os.path.abspath('../../acme/')) +# print(os.sys.path) +# sys.exit() +import envlogger +from acme.agents.jax import ppo +from acme.agents.jax import sac +from acme import wrappers +from acme import specs + +from absl import app +from absl import flags +from absl import logging +from acme.utils import lp_utils +from acme.jax import experiments +from acme.agents.jax import normalization + +from acme.utils.loggers.tf_summary import TFSummaryLogger +from acme.utils.loggers.terminal import TerminalLogger +from acme.utils.loggers.csv import CSVLogger +from acme.utils.loggers import aggregators +from acme.utils.loggers import base + +from arch_gym.envs import AstraSimWrapper + +FLAGS = flags.FLAGS + +# Workload to run for training + +_WORKLOAD = flags.DEFINE_string('workload', 'resnet18', 'Workload to run for training') + +# select which RL algorithm to use +_RL_AGO = flags.DEFINE_string('rl_algo', 'ppo', 'RL algorithm.') + +# select which RL form to use +_RL_FORM = flags.DEFINE_string('rl_form', 'sa1', 'RL form.') + +# Acceptable values for reward: power, latency, and both (both means latency & power) +_REWARD_FORM = flags.DEFINE_string('reward_form', 'both', 'Reward form.') + +# Scale reward +_REWARD_SCALE = flags.DEFINE_string('reward_scale', 'false', 'Scale reward.') + +# Hyperparameters for each RL algorithm +_NUM_STEPS = flags.DEFINE_integer('num_steps', 100, 'Number of training steps.') +_EVAL_EVERY = flags.DEFINE_integer('eval_every', 50, 'Number of evaluation steps.') +_EVAL_EPISODES = flags.DEFINE_integer('eval_episodes', 1, 'Number of evaluation episode.') +_SEED = flags.DEFINE_integer('seed', 1, 'Random seed.') +_LEARNING_RATE = flags.DEFINE_float('learning_rate', 1e-5, 'Learning rate.') + + +# Hyperparameters for PPO +_ENTROPY_COST = flags.DEFINE_float('entropy_cost', 0.1, 'Entropy cost.') +_PPO_CLIPPING_EPSILON = flags.DEFINE_float('ppo_clipping_epsilon', 0.2, 'PPO clipping epsilon.') +_CLIP_VALUE = flags.DEFINE_bool('clip_value', False, 'Clip value.') + +# Experiment setup related parameters +_SUMMARYDIR = flags.DEFINE_string('summarydir', './logs', 'Directory to save summaries.') +_ENVLOGGER_DIR = flags.DEFINE_string('envlogger_dir', 'trajectory', 'Directory to save envlogger.') +_USE_ENVLOGGER = flags.DEFINE_bool('use_envlogger', False, 'Use envlogger.') +_RUN_DISTRIBUTED = flags.DEFINE_bool( + 'run_distributed', False, 'Should an agent be executed in a ' + 'distributed way (the default is a single-threaded agent)') + +# Experimental feature to scale RL policy parameters. Ideally want to keep it same as number +# of agents used in multi-agent training. +flags.DEFINE_integer("params_scaling", 1, "Number of training steps") + +def get_directory_name(): + _EXP_NAME = 'Algo_{}_rlform_{}_num_steps_{}_seed_{}_lr_{}_entropy_{}'.format(_RL_AGO.value, _RL_FORM.value,_NUM_STEPS.value, _SEED.value, _LEARNING_RATE.value, _ENTROPY_COST.value) + + return _EXP_NAME + + +def wrap_in_envlogger(env, envlogger_dir): + metadata = { + 'agent_type': FLAGS.rl_algo, + 'rl_form': FLAGS.rl_form, + 'num_steps': FLAGS.num_steps, + 'env_type': type(env).__name__, + } + env = envlogger.EnvLogger(env, + data_directory = envlogger_dir, + metadata = metadata, + max_episodes_per_file = 1000) + return env + + +def _logger_factory(logger_label: str, steps_key: Optional[str] = None, task_instance: Optional[int]=0) -> base.Logger: + """logger factory.""" + _EXP_NAME = get_directory_name() + if logger_label == 'actor': + terminal_logger = TerminalLogger(label=logger_label, print_fn=logging.info) + summarydir = os.path.join(FLAGS.summarydir,_EXP_NAME, logger_label) + tb_logger = TFSummaryLogger(summarydir, label=logger_label, steps_key=steps_key) + csv_logger = CSVLogger(summarydir, label=logger_label) + serialize_fn = base.to_numpy + logger = aggregators.Dispatcher([terminal_logger, tb_logger, csv_logger], serialize_fn) + return logger + elif logger_label == 'learner': + terminal_logger = TerminalLogger(label=logger_label, print_fn=logging.info) + summarydir = os.path.join(FLAGS.summarydir,_EXP_NAME, logger_label) + tb_logger = TFSummaryLogger(summarydir, label=logger_label, steps_key=steps_key) + csv_logger = CSVLogger(summarydir, label=logger_label) + serialize_fn = base.to_numpy + logger = aggregators.Dispatcher([terminal_logger, tb_logger, csv_logger], serialize_fn) + return logger + elif logger_label == 'evaluator': + terminal_logger = TerminalLogger(label=logger_label, print_fn=logging.info) + summarydir = os.path.join(FLAGS.summarydir,_EXP_NAME, logger_label) + tb_logger = TFSummaryLogger(summarydir, label=logger_label, steps_key=steps_key) + csv_logger = CSVLogger(summarydir, label=logger_label) + serialize_fn = base.to_numpy + logger = aggregators.Dispatcher([terminal_logger, tb_logger, csv_logger], serialize_fn) + return logger + else: + raise ValueError( + f'Improper value for logger label. Logger_label is {logger_label}') + +def build_experiment_config(): + """Builds the experiment configuration.""" + + if(FLAGS.rl_form == 'tdm'): + env = AstraSimWrapper.make_astraSim_env( + reward_formulation = _REWARD_FORM.value, + reward_scaling = _REWARD_SCALE.value + ) + else: + env = AstraSimWrapper.make_astraSim_env( + rl_form=FLAGS.rl_form, + reward_formulation = _REWARD_FORM.value, + reward_scaling = _REWARD_SCALE.value) + if FLAGS.use_envlogger: + envlogger_dir = os.path.join(FLAGS.summarydir, get_directory_name(), FLAGS.envlogger_dir) + if(not os.path.exists(envlogger_dir)): + os.makedirs(envlogger_dir) + env = wrap_in_envlogger(env, envlogger_dir) + + env_spec = specs.make_environment_spec(env) #TODO + if FLAGS.rl_algo == 'ppo': + config = ppo.PPOConfig(entropy_cost=FLAGS.entropy_cost, + learning_rate=FLAGS.learning_rate, + ppo_clipping_epsilon=FLAGS.ppo_clipping_epsilon, + clip_value=FLAGS.clip_value, + ) + ppo_builder = ppo.PPOBuilder(config) + + if FLAGS.params_scaling > 1: + size = 32 * FLAGS.params_scaling + layer_sizes = (size, size, size) + else: + layer_sizes = (32, 32, 32) + make_eval_policy = lambda network: ppo.make_inference_fn(network, True) + + return experiments.ExperimentConfig( + builder=ppo_builder, + environment_factory=lambda seed: env, + network_factory=lambda spec: ppo.make_networks(env_spec, layer_sizes), + policy_network_factory = ppo.make_inference_fn, + eval_policy_network_factory = make_eval_policy, + seed = FLAGS.seed, + logger_factory=_logger_factory, + max_num_actor_steps=_NUM_STEPS.value) + elif FLAGS.rl_algo == 'sac': + config = sac.SACConfig( + learning_rate=FLAGS.learning_rate, + n_step=FLAGS.n_step, + ) + sac_builder = sac.builder.SACBuilder(config) + size = 32 * FLAGS.params_scaling + return experiments.ExperimentConfig( + builder = sac_builder, + environment_factory = lambda seed: env, + network_factory = lambda spec: sac.make_networks(env_spec, (size, size, size)), + seed = FLAGS.seed, + logger_factory = _logger_factory, + max_num_actor_steps = FLAGS.num_steps) + else: + raise ValueError(f'Improper value for rl_algo. rl_algo is {FLAGS.rl_algo}') + +def main(_): + + sim_config = arch_gym_configs.sim_config + config = build_experiment_config() #TODO + if FLAGS.run_distributed: + program = experiments.make_distributed_experiment( + experiment=config, num_actors=4) + lp.launch(program, xm_resources=lp_utils.make_xm_docker_resources(program)) + else: + experiments.run_experiment( + experiment=config, + eval_every=FLAGS.eval_every, + num_eval_episodes=FLAGS.eval_episodes) + +if __name__ == '__main__': + app.run(main) + + +