Skip to content


update random walker
Browse files Browse the repository at this point in the history
  • Loading branch information
AditiR-42 committed Nov 8, 2023
1 parent b057706 commit aef2a49
Showing 1 changed file with 50 additions and 176 deletions.
226 changes: 50 additions & 176 deletions sims/AstraSim/
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
import json
import time
import random
import pandas as pd
import numpy as np
import envlogger
from arch_gym.envs import AstraSimWrapper, AstraSimEnv
from arch_gym.envs.envHelpers import helpers
from configs import arch_gym_configs
import os
import sys
import pickle
Expand All @@ -9,161 +18,22 @@
os.sys.path.insert(0, os.path.abspath('../../'))
os.sys.path.insert(0, os.path.abspath('../../arch_gym'))

from configs import arch_gym_configs

from arch_gym.envs.envHelpers import helpers
from arch_gym.envs import AstraSimWrapper, AstraSimEnv
import envlogger
import numpy as np
import pandas as pd
import random
import time
import json

# define workload in

flags.DEFINE_string('workload', 'resnet18', 'Which AstraSim workload to run?')
flags.DEFINE_integer('num_steps', 50, 'Number of training steps.')
flags.DEFINE_integer('num_episodes', 1, 'Number of training episodes.')
flags.DEFINE_bool('use_envlogger', True, 'Use envlogger to log the data.')
'Directory to save the dataset.')
flags.DEFINE_bool('use_envlogger', True, 'Use envlogger to log the data.')
'Directory to save the dataset.')
flags.DEFINE_string('summary_dir', ".", 'Directory to save the dataset.')
flags.DEFINE_string('reward_formulation', 'latency', 'Which reward formulation to use?')
flags.DEFINE_string('reward_formulation', 'latency',
'Which reward formulation to use?')

def write_network(dimension):
def rand_dim_helper(dim, vals):
return [random.choice(vals) for _ in range(dim)]

links_count = {"Ring": 2, "FullyConnected": 7, "Switch": 1}

def rand_num_helper(dim, min, max):
return [random.randint(min, max) for _ in range(dim)]

def rand_float_helper(dim, min, max):
return [round(random.uniform(min, max), 1) for _ in range(dim)]

network = {
"topology-name": random.choice(["Hierarchical"]),
"topologies-per-dim": rand_dim_helper(dimension, ["Ring", "FullyConnected", "Switch"]),
# "dimension-type": rand_dim_helper(dimension, ["T", "N", "P"]),
"dimension-type": rand_dim_helper(dimension, ["N"]),
# "dimensions-count": (int, 1, 5, 2),
"dimensions-count": dimension,
"units-count": rand_num_helper(dimension, 2, 1024),
"links-count": rand_num_helper(dimension, 1, 10),
"link-latency": rand_num_helper(dimension, 1, 1000),
"link-bandwidth": rand_float_helper(dimension, 0.00001, 100000),
"nic-latency": rand_num_helper(dimension, 0, 1000),
"router-latency": rand_num_helper(dimension, 0, 1000),
"hbm-latency": rand_num_helper(dimension, 1, 1),
"hbm-bandwidth": rand_num_helper(dimension, 1, 1),
"hbm-scale": rand_num_helper(dimension, 0, 0),

return network

def write_system(dimension):
def implementation_helper(dim, val):
if val in ["oneRing", "oneDirect"]:
return val
value = ""
for _ in range(dim):
value += val + "_"
return value[:-1]

system = {
"scheduling-policy": random.choice(["LIFO", "FIFO"]),
"endpoint-delay": random.randint(1, 1000),
"active-chunks-per-dimension": random.randint(1, 32),
# whenever dataset splits is high, it takes a long time to run
"preferred-dataset-splits": random.randint(16, 1024),
"boost-mode": 1,
"all-reduce-implementation": implementation_helper(dimension, random.choice(["ring", "direct", "doubleBinaryTree", "oneRing", "oneDirect", "hierarchicalRing", "halvingDoubling", "oneHalvingDoubling"])),
"all-gather-implementation": implementation_helper(dimension, random.choice(["ring", "direct", "doubleBinaryTree", "oneRing", "oneDirect", "hierarchicalRing", "halvingDoubling", "oneHalvingDoubling"])),
"reduce-scatter-implementation": implementation_helper(dimension, random.choice(["ring", "direct", "doubleBinaryTree", "oneRing", "oneDirect", "hierarchicalRing", "halvingDoubling", "oneHalvingDoubling"])),
"all-to-all-implementation": implementation_helper(dimension, random.choice(["ring", "direct", "doubleBinaryTree", "oneRing", "oneDirect", "hierarchicalRing", "halvingDoubling", "oneHalvingDoubling"])),

"collective-optimization": random.choice(["baseline", "localBWAware"]),
"intra-dimension-scheduling": random.choice(["FIFO", "SCF"]),
"inter-dimension-scheduling": random.choice(["baseline", "themis"])
return system

def write_workload():
value = ""
# randomize workload type
workload_type = random.choice(["DATA\n", "HYBRID_TRANSFORMER\n", "HYBRID_DLRM\n", "MICRO\n"])
# randomize number of DNN layers
layers_count = random.randint(1, 50)
if workload_type == "MICRO\n":
layers_count = 1
value += workload_type

value += str(layers_count) + "\n"
# configure each layer
for i in range(layers_count):
# layer name and reserved variable
value += "layer" + str(i) + "\t-1\t"
# forward pass compute time
forward_time = str(random.randint(1, 42000000)) + "\t"
# forward_time = str(random.randint(1, 4200)) + "\t"
if workload_type == "MICRO\n":
forward_time = "5\t"
value += forward_time

# forward pass communication type
forward_type = random.choice(["ALLREDUCE", "ALLGATHER", "ALLTOALL", "NONE"]) + "\t"
if workload_type == "MICRO\n":
forward_type = "NONE\t"
value += forward_type
# forward pass communication size
forward_size = "0\t" if forward_type == "NONE\t" else str(random.randint(0, 70000000)) + "\t"
value += forward_size

# input grad compute time
grad_time = str(random.randint(1, 42000000)) + " "

if workload_type == "MICRO\n":
grad_time = "5\t"
value += grad_time
# input grad communication type
grad_type = random.choice(["ALLREDUCE", "ALLGATHER", "ALLTOALL", "NONE"]) + "\t"
if workload_type == "MICRO\n":
grad_type = "NONE\t"
value += grad_type
# input grad communication size
grad_size = "0\t" if grad_type == "NONE\t" else str(random.randint(0, 70000000)) + "\t"
value += grad_size

# weight grad compute time
weight_time = str(random.randint(1, 42000000)) + "\t"
# weight_time = str(random.randint(1, 4200)) + "\t"
if workload_type == "MICRO\n":
weight_time = "5\t"
value += weight_time
# weight grad communication type
weight_type = random.choice(["ALLREDUCE", "ALLGATHER", "ALLTOALL", "NONE"]) + "\t"
value += weight_type
# weight grad communication size
weight_size = "0\t" if weight_type == "NONE\t" else str(random.randint(0, 70000000)) + "\t"

value += weight_size
# delay per entire weight/input/output update after the collective is finished
value += str(random.randint(5, 5000)) + "\n"
# value += str(random.randint(5, 50)) + "\n"

return {"value": value}

# parses the network file
# def parse_network(network_file):
# with open(network_file) as f:
Expand All @@ -187,6 +57,8 @@ def write_workload():
intra-dimension-scheduling: FIFO
inter-dimension-scheduling: baseline

def parse_system(system_file, action_dict):
# parse system_file (above is the content) into dict
action_dict['system'] = {}
Expand All @@ -199,8 +71,7 @@ def parse_system(system_file, action_dict):

# def parse_workload(workload_file):

# parses knobs that we want to experiment with
def parse_knobs(knobs_spec):
Expand All @@ -217,9 +88,8 @@ def parse_knobs(knobs_spec):
# Access the dictionaries


# action_type = specify 'network' or 'system
Expand All @@ -229,25 +99,27 @@ def generate_random_actions(action_dict, system_knob, network_knob):
for dict_type, dict_name in dicts:
for knob in dict_type.keys():
if isinstance(dict_type[knob], set):
action_dict[dict_name][knob] = random.choice(list(dict_type[knob]))
action_dict[dict_name][knob] = random.choice(
action_dict[dict_name][knob] = random.randint(dict_type[knob][1], dict_type[knob][2])

action_dict[dict_name][knob] = random.randint(
dict_type[knob][1], dict_type[knob][2])

return action_dict

def log_results_to_csv(filename, fitness_dict):
df = pd.DataFrame([fitness_dict['reward']])
csvfile = os.path.join(filename, "rewards.csv")
df.to_csv(csvfile, index=False, header=False, mode='a')
df = pd.DataFrame([fitness_dict['reward']])
csvfile = os.path.join(filename, "rewards.csv")
df.to_csv(csvfile, index=False, header=False, mode='a')

df = pd.DataFrame([fitness_dict['action']])
csvfile = os.path.join(filename, "actions.csv")
df.to_csv(csvfile, index=False, header=False, mode='a')
df = pd.DataFrame([fitness_dict['action']])
csvfile = os.path.join(filename, "actions.csv")
df.to_csv(csvfile, index=False, header=False, mode='a')

df = pd.DataFrame([fitness_dict['obs']])
csvfile = os.path.join(filename, "observations.csv")
df.to_csv(csvfile, index=False, header=False, mode='a')
df = pd.DataFrame([fitness_dict['obs']])
csvfile = os.path.join(filename, "observations.csv")
df.to_csv(csvfile, index=False, header=False, mode='a')

# Random walker then random walker, else use other
Expand Down Expand Up @@ -285,27 +157,29 @@ def main(_):

network_file = "4d_ring_fc_ring_switch.json"
system_file = os.path.join(systems_folder, "4d_ring_fc_ring_switch_baseline.txt")
system_file = os.path.join(
systems_folder, "4d_ring_fc_ring_switch_baseline.txt")
workload_file = "all_reduce/allreduce_0.65.txt"

exe_path = os.path.join(proj_root_path, "")
network_config = os.path.join(proj_root_path, "general_network.json")
system_config = os.path.join(proj_root_path, "general_system.txt")
workload_config = os.path.join(proj_root_path, "general_workload.txt")

env = AstraSimWrapper.make_astraSim_env(rl_form='random_walker')
# env = AstraSimEnv.AstraSimEnv(rl_form='random_walker')

astrasim_helper = helpers()

# experiment name
exp_name = str(FLAGS.workload)+"_num_steps_" + str(FLAGS.num_steps) + "_num_episodes_" + str(FLAGS.num_episodes)
# experiment name
exp_name = str(FLAGS.workload)+"_num_steps_" + \
str(FLAGS.num_steps) + "_num_episodes_" + str(FLAGS.num_episodes)
# append logs to base path
log_path = os.path.join(FLAGS.summary_dir, 'random_walker_logs', FLAGS.reward_formulation, exp_name)
log_path = os.path.join(
FLAGS.summary_dir, 'random_walker_logs', FLAGS.reward_formulation, exp_name)
# get the current working directory and append the exp name
traject_dir = os.path.join(FLAGS.summary_dir, FLAGS.traject_dir, FLAGS.reward_formulation, exp_name)
traject_dir = os.path.join(
FLAGS.summary_dir, FLAGS.traject_dir, FLAGS.reward_formulation, exp_name)
# check if log_path exists else create it
if not os.path.exists(log_path):
Expand All @@ -321,15 +195,15 @@ def main(_):
start = time.time()

step_results = {}

# INITIATE action dict
action_dict = {}

# if path exists, use path, else parse the sub-dict
action_dict['network'] = {"path": network_file}
action_dict['workload'] = {"path": workload_file}
# TODO: parse system

# TODO: parse system
parse_system(system_file, action_dict)

# TODO: parse knobs (all variables to change in action_dict)
Expand All @@ -343,15 +217,16 @@ def main(_):
# every step of the current training
for step in range(FLAGS.num_steps):
# pass into generate_random_actions(dimension, knobs)
action_dict = generate_random_actions(action_dict, system_knob, network_knob)
action_dict = generate_random_actions(
action_dict, system_knob, network_knob)

# with open("general_workload.txt", 'w') as file:
# file.write(action["workload"]["value"])

# step_result wrapped in TimeStep object
step_result = env.step(action_dict)
step_type, reward, discount, observation = step_result

step_results['reward'] = [reward]
step_results['action'] = action_dict
step_results['obs'] = observation
Expand All @@ -360,7 +235,7 @@ def main(_):
best_reward = reward
best_observation = observation
best_actions = action_dict

log_results_to_csv(log_path, step_results)

end = time.time()
Expand All @@ -372,6 +247,5 @@ def main(_):
print("Total Useful Steps: ", env.useful_counter)

if __name__ == '__main__':

0 comments on commit aef2a49

Please sign in to comment.