Skip to content

Commit

Permalink
Skeleton files with base impl for OpenLane-ArchGym integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ShvetankPrakash committed Oct 21, 2023
1 parent 2cce688 commit eda65d2
Show file tree
Hide file tree
Showing 5 changed files with 491 additions and 0 deletions.
135 changes: 135 additions & 0 deletions arch_gym/envs/OpenLaneEnv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import gym

class OpenLaneEnv(gym.Env):
"""OpenLane Gym Environment."""
def __init__(self ,
rtl_top : str = "Top" ,
reward_formulation: str = "POWER",
reward_scaling : bool = False ,
max_steps : int = 1 ,
num_agents : int = 1 ,
rl_form = None ,
rl_algo = None ):

# Call base class constructor
super(OpenLaneEnv, self).__init__()

# Assign basic class variables
self.rtl_top = rtl_top # name of RTL top module & Verilog dir
self.reward_formulation = reward_formulation
self.reward_scaling = reward_scaling
self.max_steps = max_steps
self.num_agents = num_agents
self.rl_form = rl_form
self.rl_algo = rl_algo
self.curr_step = 0
self.observation = None
self.reward = None
self.done = False
self.info = {} # We do not currently make use of this metadata, but return it to DM Env Wrapper

# Construct action and observation spaces for environment
# TODO: Revisit definitions below based on params selected
self.action_space = gym.spaces.Dict({
"pdk" : gym.spaces.Discrete(2),
"synthesis": gym.spaces.Discrete(2),
"floorplan": gym.spaces.Discrete(2),
"placement": gym.spaces.Discrete(2),
"cts" : gym.spaces.Discrete(2),
"route" : gym.spaces.Discrete(2)
})
self.observation_space = gym.spaces.Dict({
"power" : gym.spaces.Box(low=0, high=1e10, shape=(1,)),
"performance" : gym.spaces.Box(low=0, high=1e10, shape=(1,)),
"area" : gym.spaces.Box(low=0, high=1e10, shape=(1,))
})

# Reset environment upon construction
self.reset()

def reset(self):
self.curr_step = 0
return self.observation_space.sample() # Return random sample from observation space on reset

def step(self, action):
""" Step in the Gym environment with the action.
Returns:
observation, reward, done, info.
"""
if (self.curr_step == self.max_steps):
print("Max number of steps reached: episode complete.")
self.observation = self.reset() # set curr_step back to zero & return random sample from observation space
self.reward = self.calculate_reward(self.observation)
self.done = True
self.info = {}
else:
self.observation = self.run_OpenLane(action)
self.reward = self.calculate_reward(self.observation)
self.done = False
self.info = {}
self.curr_step = self.curr_step + 1

return self.observation, self.reward, self.done, self.info

def run_OpenLane(self, action):
""" Run OpenLane RTL to GDS flow with parameters specified by agent.
Returns:
observation: PPA of design.
"""
# reward_action = action["reward_formulation"]
pdk_action = action["pdk"]
synth_actions = action["synthesis"]
fp_actions = action["floorplan"]
place_action = action["placement"]
cts_action = action["cts"]
route_action = action["route"]

# TODO: Invoke OpenLane with params above

return self.get_observation()

def get_observation(self):
""" Gets observation (i.e. PPA) from OpenLane physically implemented design.
Returns:
observation: PPA of design.
"""

# TODO: Get PPA from OpenLane logs
observation = {}
observation["power"] = 0
observation["performance"] = 0
observation["area"] = 0

return self.observation_space.sample() # return random sample for now

def calculate_reward(self, observation):
""" Calculates the reward for the agent based on reward_formulation metric (i.e. power, performance, and/or area).
Returns:
reward.
"""
power = observation["power"]
performance = observation["performance"]
area = observation["area"]

if self.reward_formulation == "POWER":
# TODO: Normalize
return power
elif self.reward_formulation == "PERFORMANCE":
# TODO: Normalize
return performance
elif self.reward_formulation == "AREA":
# TODO: Normalize
return area
elif self.reward_formulation == "ALL":
# TODO: Normalize
return power + performance + area

def render(self):
pass

def close(self):
pass
210 changes: 210 additions & 0 deletions arch_gym/envs/OpenLane_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Standard imports for every respective environment wrapper
import numpy as np
import tree
import dm_env
import gym
from gym import spaces
from acme import specs
from acme import types
from acme import wrappers
from typing import Any, Dict, Optional
from OpenLaneEnv import OpenLaneEnv

class OpenLaneEnvWrapper(dm_env.Environment):
"""Deepmind Environment wrapper for OpenAI Gym environments."""

def __init__(self, environment: gym.Env):
self._environment = environment
self._reset_next_step = True
self._last_info = None

# Convert Gym observation & action spaces into Deepmind's acme specs
self._observation_spec = _convert_to_spec(self._environment.observation_space, name="observation")
self._action_spec = _convert_to_spec(self._environment.action_space , name="action" )

def reset(self) -> dm_env.TimeStep:
"""Resets the episode."""
self._reset_next_step = False
self._last_info = None # reset the diagnostic info
observation = self._environment.reset() # reset the underlying Gym environment
return dm_env.restart(observation) # returns initial dm_env.TimeStep at restart using obs

def step(self, action: types.NestedArray) -> dm_env.TimeStep:
"""Steps in the environment."""
if self._reset_next_step:
return self.reset()

# Step in the Gym environment with the action
observation, reward, done, info = self._environment.step(action)

# Set corresponding class variables returned from step
self._reset_next_step = done
self._last_info = info

# Convert the reward type to conform to structure specified by self.reward_spec(), respecting scalar or array property.
reward = tree.map_structure(
lambda x, t: (
t.dtype.type(x)
if np.isscalar(x) else np.asarray(x, dtype=t.dtype)),
reward,
self.reward_spec())

# If episode complete, return appropriate cause (i.e., timesteps max limit reached or natural episode end)
if done:
truncated = info.get("TimeLimit.truncated", False)
if truncated:
# Episode concluded because max timesteps reached
return dm_env.truncation(reward, observation)
else:
# Episode concluded
return dm_env.termination(reward, observation)

# Episode continuing, provide agent with reward + obs returned from applied action when step taken in environment
return dm_env.transition(reward, observation)

def observation_spec(self) -> types.NestedSpec:
"""Retrieve the specification for environment observations.
Returns:
types.NestedSpec: Specification detailing the format of observations.
"""
return self._observation_spec

def action_spec(self) -> types.NestedSpec:
"""Retrieve the specification for valid agent actions.
Returns:
types.NestedSpec: Specification detailing the format of actions.
"""
return self._action_spec

def reward_spec(self):
"""Retrieve the specification for environment rewards.
Returns:
specs.Array: Specification detailing the format of rewards.
"""
return specs.Array(shape=(), dtype=float, name='reward')

def get_info(self) -> Optional[Dict[str, Any]]:
"""Returns the last info returned from env.step(action).
Returns:
info: Dictionary of diagnostic information from the last environment step.
"""
return self._last_info

@property
def environment(self) -> gym.Env:
"""Return the wrapped Gym environment.
Returns:
gym.Env: The underlying environment.
"""
return self._environment

def __getattr__(self, name: str):
"""Retrieve attributes from the wrapped environment.
Args:
name (str): Name of the attribute to retrieve.
Returns:
Any: The value of the attribute from the wrapped environment.
Raises:
AttributeError: If attempting to access a private attribute.
"""
if name.startswith('__'):
raise AttributeError(
f"Attempted to get missing private attribute '{name}'")
return getattr(self._environment, name)

def close(self):
"""Close and clean up the wrapped environment."""
self._environment.close()


def _convert_to_spec(space: gym.Space,
name: Optional[str] = None) -> types.NestedSpec:
"""Converts an OpenAI Gym space to a dm_env spec or nested structure of specs.
Box, MultiBinary and MultiDiscrete Gym spaces are converted to BoundedArray
specs. Discrete OpenAI spaces are converted to DiscreteArray specs. Tuple and
Dict spaces are recursively converted to tuples and dictionaries of specs.
Args:
space: The Gym space to convert.
name: Optional name to apply to all return spec(s).
Returns:
A dm_env spec or nested structure of specs, corresponding to the input
space.
"""
if isinstance(space, spaces.Discrete):
return specs.DiscreteArray(num_values=space.n, dtype=space.dtype, name=name)

elif isinstance(space, spaces.Box):
return specs.BoundedArray(
shape=space.shape,
dtype=space.dtype,
minimum=space.low,
maximum=space.high,
name=name)

elif isinstance(space, spaces.MultiBinary):
return specs.BoundedArray(
shape=space.shape,
dtype=space.dtype,
minimum=0.0,
maximum=1.0,
name=name)

elif isinstance(space, spaces.MultiDiscrete):
return specs.BoundedArray(
shape=space.shape,
dtype=space.dtype,
minimum=np.zeros(space.shape),
maximum=space.nvec - 1,
name=name)

elif isinstance(space, spaces.Tuple):
return tuple(_convert_to_spec(s, name) for s in space.spaces)

elif isinstance(space, spaces.Dict):
return {
key: _convert_to_spec(value, key)
for key, value in space.spaces.items()
}

else:
raise ValueError('Unexpected gym space: {}'.format(space))


def make_OpenLaneEnvironment(
seed : int = 12345 ,
rtl_top : str = "Top" ,
reward_formulation: str = "POWER",
reward_scaling : bool = False ,
max_steps : int = 1 ,
num_agents : int = 1 ,
rl_form = None ,
rl_algo = None
) -> dm_env.Environment:

"""Returns instance of OpenLane Gym Environment wrapped around DM Environment."""
environment = OpenLaneEnvWrapper(
OpenLaneEnv(
rtl_top = rtl_top ,
reward_formulation = reward_formulation,
reward_scaling = reward_scaling ,
max_steps = max_steps ,
num_agents = num_agents ,
rl_form = rl_form ,
rl_algo = rl_algo
)
)

# Set obs, reward, and action spec precision of the environment and clipping if needed
environment = wrappers.SinglePrecisionWrapper(environment)
if(rl_form == 'sa' or rl_form == 'tdm'):
environment = wrappers.CanonicalSpecWrapper(environment, clip=True)

return environment
18 changes: 18 additions & 0 deletions arch_gym/envs/envHelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,24 @@ def generate_aco_maestro_config(self, yaml_file, params_dict):
write_ok = False
return write_ok

def get_OpenLane_action_space(self):
# TODO: Define space of actions more clearly than empty lists
action_space = {
"pdk" : [],
"synthesis": [],
"floorplan": [],
"placement": [],
"cts" : [],
"route" : []
}
return action_space

def get_OpenLane_random_action(self):
action_space = self.get_OpenLane_action_space()
# TODO: Assign random actions
return action_space


# For testing
if __name__ == "__main__":
print("Hello!")
Expand Down
1 change: 1 addition & 0 deletions configs/sims/OpenLane_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# TODO
Loading

0 comments on commit eda65d2

Please sign in to comment.