From eda65d2493ecec1c7ad41b4f02d323ccdb0ad34c Mon Sep 17 00:00:00 2001 From: ShvetankPrakash Date: Sat, 21 Oct 2023 09:18:41 +0000 Subject: [PATCH] Skeleton files with base impl for OpenLane-ArchGym integration --- arch_gym/envs/OpenLaneEnv.py | 135 ++++++++++++ arch_gym/envs/OpenLane_wrapper.py | 210 +++++++++++++++++++ arch_gym/envs/envHelpers.py | 18 ++ configs/sims/OpenLane_config.py | 1 + sims/OpenLane/train_randomwalker_OpenLane.py | 127 +++++++++++ 5 files changed, 491 insertions(+) create mode 100644 arch_gym/envs/OpenLaneEnv.py create mode 100644 arch_gym/envs/OpenLane_wrapper.py create mode 100644 configs/sims/OpenLane_config.py create mode 100644 sims/OpenLane/train_randomwalker_OpenLane.py diff --git a/arch_gym/envs/OpenLaneEnv.py b/arch_gym/envs/OpenLaneEnv.py new file mode 100644 index 00000000..b6f62e3b --- /dev/null +++ b/arch_gym/envs/OpenLaneEnv.py @@ -0,0 +1,135 @@ +import gym + +class OpenLaneEnv(gym.Env): + """OpenLane Gym Environment.""" + def __init__(self , + rtl_top : str = "Top" , + reward_formulation: str = "POWER", + reward_scaling : bool = False , + max_steps : int = 1 , + num_agents : int = 1 , + rl_form = None , + rl_algo = None ): + + # Call base class constructor + super(OpenLaneEnv, self).__init__() + + # Assign basic class variables + self.rtl_top = rtl_top # name of RTL top module & Verilog dir + self.reward_formulation = reward_formulation + self.reward_scaling = reward_scaling + self.max_steps = max_steps + self.num_agents = num_agents + self.rl_form = rl_form + self.rl_algo = rl_algo + self.curr_step = 0 + self.observation = None + self.reward = None + self.done = False + self.info = {} # We do not currently make use of this metadata, but return it to DM Env Wrapper + + # Construct action and observation spaces for environment + # TODO: Revisit definitions below based on params selected + self.action_space = gym.spaces.Dict({ + "pdk" : gym.spaces.Discrete(2), + "synthesis": gym.spaces.Discrete(2), + "floorplan": gym.spaces.Discrete(2), + "placement": gym.spaces.Discrete(2), + "cts" : gym.spaces.Discrete(2), + "route" : gym.spaces.Discrete(2) + }) + self.observation_space = gym.spaces.Dict({ + "power" : gym.spaces.Box(low=0, high=1e10, shape=(1,)), + "performance" : gym.spaces.Box(low=0, high=1e10, shape=(1,)), + "area" : gym.spaces.Box(low=0, high=1e10, shape=(1,)) + }) + + # Reset environment upon construction + self.reset() + + def reset(self): + self.curr_step = 0 + return self.observation_space.sample() # Return random sample from observation space on reset + + def step(self, action): + """ Step in the Gym environment with the action. + + Returns: + observation, reward, done, info. + """ + if (self.curr_step == self.max_steps): + print("Max number of steps reached: episode complete.") + self.observation = self.reset() # set curr_step back to zero & return random sample from observation space + self.reward = self.calculate_reward(self.observation) + self.done = True + self.info = {} + else: + self.observation = self.run_OpenLane(action) + self.reward = self.calculate_reward(self.observation) + self.done = False + self.info = {} + self.curr_step = self.curr_step + 1 + + return self.observation, self.reward, self.done, self.info + + def run_OpenLane(self, action): + """ Run OpenLane RTL to GDS flow with parameters specified by agent. + + Returns: + observation: PPA of design. + """ + # reward_action = action["reward_formulation"] + pdk_action = action["pdk"] + synth_actions = action["synthesis"] + fp_actions = action["floorplan"] + place_action = action["placement"] + cts_action = action["cts"] + route_action = action["route"] + + # TODO: Invoke OpenLane with params above + + return self.get_observation() + + def get_observation(self): + """ Gets observation (i.e. PPA) from OpenLane physically implemented design. + + Returns: + observation: PPA of design. + """ + + # TODO: Get PPA from OpenLane logs + observation = {} + observation["power"] = 0 + observation["performance"] = 0 + observation["area"] = 0 + + return self.observation_space.sample() # return random sample for now + + def calculate_reward(self, observation): + """ Calculates the reward for the agent based on reward_formulation metric (i.e. power, performance, and/or area). + + Returns: + reward. + """ + power = observation["power"] + performance = observation["performance"] + area = observation["area"] + + if self.reward_formulation == "POWER": + # TODO: Normalize + return power + elif self.reward_formulation == "PERFORMANCE": + # TODO: Normalize + return performance + elif self.reward_formulation == "AREA": + # TODO: Normalize + return area + elif self.reward_formulation == "ALL": + # TODO: Normalize + return power + performance + area + + def render(self): + pass + + def close(self): + pass diff --git a/arch_gym/envs/OpenLane_wrapper.py b/arch_gym/envs/OpenLane_wrapper.py new file mode 100644 index 00000000..ee5c54ec --- /dev/null +++ b/arch_gym/envs/OpenLane_wrapper.py @@ -0,0 +1,210 @@ +# Standard imports for every respective environment wrapper +import numpy as np +import tree +import dm_env +import gym +from gym import spaces +from acme import specs +from acme import types +from acme import wrappers +from typing import Any, Dict, Optional +from OpenLaneEnv import OpenLaneEnv + +class OpenLaneEnvWrapper(dm_env.Environment): + """Deepmind Environment wrapper for OpenAI Gym environments.""" + + def __init__(self, environment: gym.Env): + self._environment = environment + self._reset_next_step = True + self._last_info = None + + # Convert Gym observation & action spaces into Deepmind's acme specs + self._observation_spec = _convert_to_spec(self._environment.observation_space, name="observation") + self._action_spec = _convert_to_spec(self._environment.action_space , name="action" ) + + def reset(self) -> dm_env.TimeStep: + """Resets the episode.""" + self._reset_next_step = False + self._last_info = None # reset the diagnostic info + observation = self._environment.reset() # reset the underlying Gym environment + return dm_env.restart(observation) # returns initial dm_env.TimeStep at restart using obs + + def step(self, action: types.NestedArray) -> dm_env.TimeStep: + """Steps in the environment.""" + if self._reset_next_step: + return self.reset() + + # Step in the Gym environment with the action + observation, reward, done, info = self._environment.step(action) + + # Set corresponding class variables returned from step + self._reset_next_step = done + self._last_info = info + + # Convert the reward type to conform to structure specified by self.reward_spec(), respecting scalar or array property. + reward = tree.map_structure( + lambda x, t: ( + t.dtype.type(x) + if np.isscalar(x) else np.asarray(x, dtype=t.dtype)), + reward, + self.reward_spec()) + + # If episode complete, return appropriate cause (i.e., timesteps max limit reached or natural episode end) + if done: + truncated = info.get("TimeLimit.truncated", False) + if truncated: + # Episode concluded because max timesteps reached + return dm_env.truncation(reward, observation) + else: + # Episode concluded + return dm_env.termination(reward, observation) + + # Episode continuing, provide agent with reward + obs returned from applied action when step taken in environment + return dm_env.transition(reward, observation) + + def observation_spec(self) -> types.NestedSpec: + """Retrieve the specification for environment observations. + + Returns: + types.NestedSpec: Specification detailing the format of observations. + """ + return self._observation_spec + + def action_spec(self) -> types.NestedSpec: + """Retrieve the specification for valid agent actions. + + Returns: + types.NestedSpec: Specification detailing the format of actions. + """ + return self._action_spec + + def reward_spec(self): + """Retrieve the specification for environment rewards. + + Returns: + specs.Array: Specification detailing the format of rewards. + """ + return specs.Array(shape=(), dtype=float, name='reward') + + def get_info(self) -> Optional[Dict[str, Any]]: + """Returns the last info returned from env.step(action). + + Returns: + info: Dictionary of diagnostic information from the last environment step. + """ + return self._last_info + + @property + def environment(self) -> gym.Env: + """Return the wrapped Gym environment. + + Returns: + gym.Env: The underlying environment. + """ + return self._environment + + def __getattr__(self, name: str): + """Retrieve attributes from the wrapped environment. + + Args: + name (str): Name of the attribute to retrieve. + + Returns: + Any: The value of the attribute from the wrapped environment. + + Raises: + AttributeError: If attempting to access a private attribute. + """ + if name.startswith('__'): + raise AttributeError( + f"Attempted to get missing private attribute '{name}'") + return getattr(self._environment, name) + + def close(self): + """Close and clean up the wrapped environment.""" + self._environment.close() + + +def _convert_to_spec(space: gym.Space, + name: Optional[str] = None) -> types.NestedSpec: + """Converts an OpenAI Gym space to a dm_env spec or nested structure of specs. + Box, MultiBinary and MultiDiscrete Gym spaces are converted to BoundedArray + specs. Discrete OpenAI spaces are converted to DiscreteArray specs. Tuple and + Dict spaces are recursively converted to tuples and dictionaries of specs. + Args: + space: The Gym space to convert. + name: Optional name to apply to all return spec(s). + Returns: + A dm_env spec or nested structure of specs, corresponding to the input + space. + """ + if isinstance(space, spaces.Discrete): + return specs.DiscreteArray(num_values=space.n, dtype=space.dtype, name=name) + + elif isinstance(space, spaces.Box): + return specs.BoundedArray( + shape=space.shape, + dtype=space.dtype, + minimum=space.low, + maximum=space.high, + name=name) + + elif isinstance(space, spaces.MultiBinary): + return specs.BoundedArray( + shape=space.shape, + dtype=space.dtype, + minimum=0.0, + maximum=1.0, + name=name) + + elif isinstance(space, spaces.MultiDiscrete): + return specs.BoundedArray( + shape=space.shape, + dtype=space.dtype, + minimum=np.zeros(space.shape), + maximum=space.nvec - 1, + name=name) + + elif isinstance(space, spaces.Tuple): + return tuple(_convert_to_spec(s, name) for s in space.spaces) + + elif isinstance(space, spaces.Dict): + return { + key: _convert_to_spec(value, key) + for key, value in space.spaces.items() + } + + else: + raise ValueError('Unexpected gym space: {}'.format(space)) + + +def make_OpenLaneEnvironment( + seed : int = 12345 , + rtl_top : str = "Top" , + reward_formulation: str = "POWER", + reward_scaling : bool = False , + max_steps : int = 1 , + num_agents : int = 1 , + rl_form = None , + rl_algo = None + ) -> dm_env.Environment: + + """Returns instance of OpenLane Gym Environment wrapped around DM Environment.""" + environment = OpenLaneEnvWrapper( + OpenLaneEnv( + rtl_top = rtl_top , + reward_formulation = reward_formulation, + reward_scaling = reward_scaling , + max_steps = max_steps , + num_agents = num_agents , + rl_form = rl_form , + rl_algo = rl_algo + ) + ) + + # Set obs, reward, and action spec precision of the environment and clipping if needed + environment = wrappers.SinglePrecisionWrapper(environment) + if(rl_form == 'sa' or rl_form == 'tdm'): + environment = wrappers.CanonicalSpecWrapper(environment, clip=True) + + return environment diff --git a/arch_gym/envs/envHelpers.py b/arch_gym/envs/envHelpers.py index 179d81cc..c2a9aed8 100644 --- a/arch_gym/envs/envHelpers.py +++ b/arch_gym/envs/envHelpers.py @@ -1362,6 +1362,24 @@ def generate_aco_maestro_config(self, yaml_file, params_dict): write_ok = False return write_ok + def get_OpenLane_action_space(self): + # TODO: Define space of actions more clearly than empty lists + action_space = { + "pdk" : [], + "synthesis": [], + "floorplan": [], + "placement": [], + "cts" : [], + "route" : [] + } + return action_space + + def get_OpenLane_random_action(self): + action_space = self.get_OpenLane_action_space() + # TODO: Assign random actions + return action_space + + # For testing if __name__ == "__main__": print("Hello!") diff --git a/configs/sims/OpenLane_config.py b/configs/sims/OpenLane_config.py new file mode 100644 index 00000000..46409041 --- /dev/null +++ b/configs/sims/OpenLane_config.py @@ -0,0 +1 @@ +# TODO diff --git a/sims/OpenLane/train_randomwalker_OpenLane.py b/sims/OpenLane/train_randomwalker_OpenLane.py new file mode 100644 index 00000000..c1ed7048 --- /dev/null +++ b/sims/OpenLane/train_randomwalker_OpenLane.py @@ -0,0 +1,127 @@ +import os +import sys + +from absl import flags +from absl import app +from absl import logging + +os.sys.path.insert(0, os.path.abspath('../../')) +from configs.sims import OpenLane_config +from arch_gym.envs.envHelpers import helpers +from arch_gym.envs import OpenLane_wrapper +import envlogger +import numpy as np +import pandas as pd + +# User-defined flags to run training script with +flags.DEFINE_string ('rtl_top' , 'Top' , 'RTL Design for physical implementation.' ) +flags.DEFINE_string ('reward_formulation' , 'POWER' , 'Metric for optimization.' ) +flags.DEFINE_integer('max_steps' , 100 , 'Number of training steps.' ) +flags.DEFINE_integer('num_episodes' , 2 , 'Number of training episodes.' ) +flags.DEFINE_bool ('use_envlogger' , False , 'Use envlogger to log the trajectory data.' ) +flags.DEFINE_string ('experiment_summary_data_dir_path' , '.' , 'Path to location where to save data from launched experiment.' ) +flags.DEFINE_string ('experiment_csv_data_dir_name' , 'randomwalk_csv_data' , 'Name of directory to store experiment log data in csv format.' ) +flags.DEFINE_string ('experiment_trajectory_data_dir_name', 'randomwalk_trajectory_data', 'Name of directory to store experiment trajectory data in envlogger format from DeepMind.') +FLAGS = flags.FLAGS + + +def logging_logistics(): + # Create a log file name for this training run / experiment based on hyperparams + experiment_name = f"{FLAGS.rtl_top}_design_{FLAGS.max_steps}_steps_{FLAGS.num_episodes}_episodes" + csv_data_dir_path = os.path.join(FLAGS.experiment_summary_data_dir_path, + FLAGS.experiment_csv_data_dir_name, + FLAGS.reward_formulation, + experiment_name) + trajectory_data_dir_path = os.path.join(FLAGS.experiment_summary_data_dir_path, + FLAGS.experiment_trajectory_data_dir_name, + FLAGS.reward_formulation, + experiment_name) + if not os.path.exists(csv_data_dir_path): + os.makedirs(csv_data_dir_path) + if FLAGS.use_envlogger: + if not os.path.exists(trajectory_data_dir_path): + os.makedirs(trajectory_data_dir_path) + + return csv_data_dir_path, trajectory_data_dir_path + + +def log_fitness_to_csv(csv_data_dir_path, fitness_hist): + """Logs fitness history to csv files. + + Args: + csv_data_dir_path (str): path to where data will be stored in csv format + fitness_hist (dict): dictionary containing the fitness history + """ + # Log reward data separately to see how reward improves over time easily + reward_df = pd.DataFrame([fitness_hist['reward']]) + reward_csv_path = os.path.join(csv_data_dir_path, "fitness.csv") + reward_df.to_csv(reward_csv_path, index=False, header=False, mode='a') + + # Log all fitness data (action, obs, reward) in different file to see agent trajectory over time + trajectory_df = pd.DataFrame([fitness_hist]) + trajectory_csv_path = os.path.join(csv_data_dir_path, "trajectory.csv") + trajectory_df.to_csv(trajectory_csv_path, index=False, header=False, mode='a') + + +def wrap_in_envlogger(env, trajectory_data_dir_name): + """Wraps the environment in envlogger. + + Args: + env : OpenLane gym environment + trajectory_data_dir_name (str): path to the directory where the trajectory data will be logged by envlogger + """ + metadata = { + "agent_type": "Random Walker", + "max_steps" : FLAGS.max_steps, + "env_type" : type(env).__name__, + } + + logging.info('Wrapping OpenLane environment with EnvironmentLogger...') + env = envlogger.EnvLogger(env, + data_directory=trajectory_data_dir_name, + max_episodes_per_file=1000, + metadata=metadata) + logging.info('Done wrapping OpenLane environment with EnvironmentLogger.') + return env + + +def main(_): + # Instantiate OpenLane Environment + OpenLane_env = OpenLane_wrapper.make_OpenLaneEnvironment(rtl_top=FLAGS.rtl_top, reward_formulation=FLAGS.reward_formulation, max_steps=FLAGS.max_steps) + + # Setup directories for logging data from experiment in generic csv format and trajectory data in envlogger format (if desired) + csv_data_dir_path, trajectory_data_dir_path = logging_logistics() + + # Wrap environment in envlogger if logging trajectory data in envlogger format + if FLAGS.use_envlogger: + OpenLane_env = wrap_in_envlogger(OpenLane_env, trajectory_data_dir_path) + + # Create a helper that will randomly generate actions for our random walker agent + OpenLane_helper = helpers() + + # Run training + for episode in range(FLAGS.num_episodes): + print(f"Beginning episode #{episode + 1}.") # episode num is zero indexed + + for step in range(FLAGS.max_steps): + # Random walker agent: generate random action (choices for OpenLane parameters) + action = OpenLane_helper.get_OpenLane_random_action() + + # Take a step in the environment using this action and retreive reward + observation + _, reward, _, observation = OpenLane_env.step(action) # TimeStep object returned from .step() which is tuple of (step_type, reward, discount, observation) + + # Log the fitness of this action with its reward and observation + fitness_hist = {} + fitness_hist['action'] = action + fitness_hist['obs'] = observation + fitness_hist['reward'] = reward if reward is not None else [-1.0] # None returned for reward at start of each episode; log as -1 instead of None. + log_fitness_to_csv(csv_data_dir_path, fitness_hist) + + print(f"Ending episode #{episode + 1}.") # episode num is zero indexed + + print(f"Training complete: {FLAGS.num_episodes} episodes finished.") + + return + +if __name__ == "__main__": + app.run(main)