Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] distributed maddpg #795

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions benchmark/torch/maddpg/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import parl
from simple_model import MAModel
from simple_agent import MAAgent
from parl.algorithms import MADDPG
from parl.env.multiagent_simple_env import MAenv
from parl.utils import logger

CRITIC_LR = 0.01 # learning rate for the critic model
ACTOR_LR = 0.01 # learning rate of the actor model
GAMMA = 0.95 # reward discount factor
TAU = 0.01 # soft update
BATCH_SIZE = 1024
MAX_STEP_PER_EPISODE = 25 # maximum step per episode

@parl.remote_class(wait=False)
class Actor(object):
def __init__(self, args):
env = MAenv(args.env)

from gym import spaces
from multiagent.multi_discrete import MultiDiscrete
for space in env.action_space:
assert (isinstance(space, spaces.Discrete)
or isinstance(space, MultiDiscrete))

critic_in_dim = sum(env.obs_shape_n) + sum(env.act_shape_n)
logger.info('critic_in_dim: {}'.format(critic_in_dim))

agents = []
for i in range(env.n):
model = MAModel(env.obs_shape_n[i], env.act_shape_n[i], critic_in_dim)
algorithm = MADDPG(
model,
agent_index=i,
act_space=env.action_space,
gamma=GAMMA,
tau=TAU,
critic_lr=CRITIC_LR,
actor_lr=ACTOR_LR)
agent = MAAgent(
algorithm,
agent_index=i,
obs_dim_n=env.obs_shape_n,
act_dim_n=env.act_shape_n,
batch_size=BATCH_SIZE,
speedup=(not args.restore))
agents.append(agent)

self.env = env
self.agents = agents

def set_weights(self, weights):
for i, agent in enumerate(self.agents):
agent.set_weights(weights[i])

def run_episode(self):
obs_n = self.env.reset()
total_reward = 0
agents_reward = [0 for _ in range(self.env.n)]
steps = 0

experience = []
while True:
steps += 1
action_n = [agent.predict(obs) for agent, obs in zip(self.agents, obs_n)]
next_obs_n, reward_n, done_n, _ = self.env.step(action_n)
done = all(done_n)
terminal = (steps >= MAX_STEP_PER_EPISODE)

# store experience
experience.append((obs_n, action_n, reward_n,
next_obs_n, done_n))

# compute reward of every agent
obs_n = next_obs_n
for i, reward in enumerate(reward_n):
total_reward += reward
agents_reward[i] += reward

# check the end of an episode
if done or terminal:
break

return experience, total_reward, agents_reward, steps


22 changes: 7 additions & 15 deletions benchmark/torch/maddpg/simple_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import parl
import torch
import numpy as np
from parl.utils import ReplayMemory
from parl.utils import machine_info, get_gpu_count


class MAAgent(parl.Agent):
Expand All @@ -39,12 +37,7 @@ def __init__(self,
self.speedup = speedup
self.n = len(act_dim_n)

self.memory_size = int(1e5)
self.min_memory_size = batch_size * 25 # batch_size * args.max_episode_len
self.rpm = ReplayMemory(
max_size=self.memory_size,
obs_dim=self.obs_dim_n[agent_index],
act_dim=self.act_dim_n[agent_index])
self.global_train_step = 0
self.device = torch.device("cuda" if torch.cuda.
is_available() else "cpu")
Expand All @@ -62,31 +55,33 @@ def predict(self, obs, use_target_model=False):
act_numpy = act.detach().cpu().numpy().flatten()
return act_numpy

def learn(self, agents):
def learn(self, agents, rpms):
""" sample batch, compute q_target and train
"""
self.global_train_step += 1

# only update parameter every 100 steps
if self.global_train_step % 100 != 0:
return 0.0

rpm = rpms[self.agent_index]

if self.rpm.size() <= self.min_memory_size:
if rpm.size() <= self.min_memory_size:
return 0.0

batch_obs_n = []
batch_act_n = []
batch_obs_next_n = []

# sample batch
rpm_sample_index = self.rpm.make_index(self.batch_size)
rpm_sample_index = rpm.make_index(self.batch_size)
for i in range(self.n):
batch_obs, batch_act, _, batch_obs_next, _ \
= agents[i].rpm.sample_batch_by_index(rpm_sample_index)
= rpms[i].sample_batch_by_index(rpm_sample_index)
batch_obs_n.append(batch_obs)
batch_act_n.append(batch_act)
batch_obs_next_n.append(batch_obs_next)
_, _, batch_rew, _, batch_isOver = self.rpm.sample_batch_by_index(
_, _, batch_rew, _, batch_isOver = rpm.sample_batch_by_index(
rpm_sample_index)
batch_obs_n = [
torch.FloatTensor(obs).to(self.device) for obs in batch_obs_n
Expand Down Expand Up @@ -117,6 +112,3 @@ def learn(self, agents):
critic_cost = critic_cost.cpu().detach().numpy()

return critic_cost

def add_experience(self, obs, act, reward, next_obs, terminal):
self.rpm.append(obs, act, reward, next_obs, terminal)
157 changes: 78 additions & 79 deletions benchmark/torch/maddpg/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import time
import argparse
import numpy as np
import parl
from simple_model import MAModel
from simple_agent import MAAgent
from parl.algorithms import MADDPG
from parl.env.multiagent_simple_env import MAenv
from parl.utils import logger, summary
from parl.utils import ReplayMemory

from actor import Actor

CRITIC_LR = 0.01 # learning rate for the critic model
ACTOR_LR = 0.01 # learning rate of the actor model
Expand All @@ -32,51 +36,6 @@
STAT_RATE = 1000 # statistical interval of save model or count reward


def run_episode(env, agents):
obs_n = env.reset()
total_reward = 0
agents_reward = [0 for _ in range(env.n)]
steps = 0
while True:
steps += 1
action_n = [agent.predict(obs) for agent, obs in zip(agents, obs_n)]
next_obs_n, reward_n, done_n, _ = env.step(action_n)
done = all(done_n)
terminal = (steps >= MAX_STEP_PER_EPISODE)

# store experience
for i, agent in enumerate(agents):
agent.add_experience(obs_n[i], action_n[i], reward_n[i],
next_obs_n[i], done_n[i])

# compute reward of every agent
obs_n = next_obs_n
for i, reward in enumerate(reward_n):
total_reward += reward
agents_reward[i] += reward

# check the end of an episode
if done or terminal:
break

# show animation
if args.show:
time.sleep(0.1)
env.render()

# show model effect without training
if args.restore and args.show:
continue

# learn policy
for i, agent in enumerate(agents):
critic_loss = agent.learn(agents)
if critic_loss != 0.0:
summary.add_scalar('critic_loss_%d' % i, critic_loss,
agent.global_train_step)

return total_reward, agents_reward, steps


def train_agent():
env = MAenv(args.env)
Expand Down Expand Up @@ -140,43 +99,75 @@ def train_agent():
'model file {} does not exits'.format(model_file))
agents[i].restore(model_file)

rpms = []
memory_size = int(1e5)
for i in range(len(agents)):
rpm = ReplayMemory(
max_size=memory_size,
obs_dim=env.obs_shape_n[i],
act_dim=env.act_shape_n[i])
rpms.append(rpm)

parl.connect(args.master_address)
remote_actors = [Actor(args) for _ in range(args.actor_num)]

t_start = time.time()
logger.info('Starting...')
while total_episodes <= MAX_EPISODES:
# run an episode
ep_reward, ep_agent_rewards, steps = run_episode(env, agents)
summary.add_scalar('train_reward/episode', ep_reward, total_episodes)
summary.add_scalar('train_reward/step', ep_reward, total_steps)
if args.show:
print('episode {}, reward {}, agents rewards {}, steps {}'.format(
total_episodes, ep_reward, ep_agent_rewards, steps))

# Record reward
total_steps += steps
total_episodes += 1
episode_rewards.append(ep_reward)
for i in range(env.n):
agent_rewards[i].append(ep_agent_rewards[i])

# Keep track of final episode reward
if total_episodes % STAT_RATE == 0:
mean_episode_reward = round(
np.mean(episode_rewards[-STAT_RATE:]), 3)
final_ep_ag_rewards = [] # agent rewards for training curve
for rew in agent_rewards:
final_ep_ag_rewards.append(round(np.mean(rew[-STAT_RATE:]), 2))
use_time = round(time.time() - t_start, 3)
logger.info(
'Steps: {}, Episodes: {}, Mean episode reward: {}, mean agents rewards {}, Time: {}'
.format(total_steps, total_episodes, mean_episode_reward,
final_ep_ag_rewards, use_time))
t_start = time.time()
summary.add_scalar('mean_episode_reward/episode',
mean_episode_reward, total_episodes)
summary.add_scalar('mean_episode_reward/step', mean_episode_reward,
total_steps)
summary.add_scalar('use_time/1000episode', use_time,
total_episodes)
latest_weights = [agent.get_weights() for agent in agents]
for remote_actor in remote_actors:
remote_actor.set_weights(latest_weights)
result_object_id = [remote_actor.run_episode() for remote_actor in remote_actors]
for future_object in result_object_id:
experience, ep_reward, ep_agent_rewards, steps = future_object.get()

summary.add_scalar('train_reward/episode', ep_reward, total_episodes)
summary.add_scalar('train_reward/step', ep_reward, total_steps)
if args.show:
print('episode {}, reward {}, agents rewards {}, steps {}'.format(
total_episodes, ep_reward, ep_agent_rewards, steps))

# add experience
for obs_n, action_n, reward_n, next_obs_n, done_n in experience:
for i, rpm in enumerate(rpms):
rpm.append(obs_n[i], action_n[i], reward_n[i],
next_obs_n[i], done_n[i])

# train/sample rate
for _ in range(len(experience)):
# learn policy
for i, agent in enumerate(agents):
critic_loss = agent.learn(agents, rpms)
if critic_loss != 0.0:
summary.add_scalar('critic_loss_%d' % i, critic_loss,
agent.global_train_step)

# Record reward
total_steps += steps
total_episodes += 1
episode_rewards.append(ep_reward)
for i in range(env.n):
agent_rewards[i].append(ep_agent_rewards[i])

# Keep track of final episode reward
if total_episodes % STAT_RATE == 0:
mean_episode_reward = round(
np.mean(episode_rewards[-STAT_RATE:]), 3)
final_ep_ag_rewards = [] # agent rewards for training curve
for rew in agent_rewards:
final_ep_ag_rewards.append(round(np.mean(rew[-STAT_RATE:]), 2))
use_time = round(time.time() - t_start, 3)
logger.info(
'Steps: {}, Episodes: {}, Mean episode reward: {}, mean agents rewards {}, Time: {}'
.format(total_steps, total_episodes, mean_episode_reward,
final_ep_ag_rewards, use_time))
t_start = time.time()
summary.add_scalar('mean_episode_reward/episode',
mean_episode_reward, total_episodes)
summary.add_scalar('mean_episode_reward/step', mean_episode_reward,
total_steps)
summary.add_scalar('use_time/1000episode', use_time,
total_episodes)

# save model
if not args.restore:
Expand Down Expand Up @@ -208,6 +199,14 @@ def train_agent():
type=str,
default='./model',
help='directory for saving model')
parser.add_argument(
'--master_address',
type=str,
default='localhost:8010')
parser.add_argument(
'--actor_num',
type=int,
default=1)

args = parser.parse_args()
logger.set_dir('./train_log/' + str(args.env))
Expand Down