From 4fdd631571333a29596af03ffe1b3a4ebcbc4129 Mon Sep 17 00:00:00 2001 From: zenghsh3 Date: Fri, 25 Feb 2022 13:37:01 +0800 Subject: [PATCH] distributed maddpg --- benchmark/torch/maddpg/actor.py | 86 ++++++++++++++ benchmark/torch/maddpg/simple_agent.py | 22 ++-- benchmark/torch/maddpg/train.py | 157 ++++++++++++------------- 3 files changed, 171 insertions(+), 94 deletions(-) create mode 100644 benchmark/torch/maddpg/actor.py diff --git a/benchmark/torch/maddpg/actor.py b/benchmark/torch/maddpg/actor.py new file mode 100644 index 000000000..05916d7e3 --- /dev/null +++ b/benchmark/torch/maddpg/actor.py @@ -0,0 +1,86 @@ +import parl +from simple_model import MAModel +from simple_agent import MAAgent +from parl.algorithms import MADDPG +from parl.env.multiagent_simple_env import MAenv +from parl.utils import logger + +CRITIC_LR = 0.01 # learning rate for the critic model +ACTOR_LR = 0.01 # learning rate of the actor model +GAMMA = 0.95 # reward discount factor +TAU = 0.01 # soft update +BATCH_SIZE = 1024 +MAX_STEP_PER_EPISODE = 25 # maximum step per episode + +@parl.remote_class(wait=False) +class Actor(object): + def __init__(self, args): + env = MAenv(args.env) + + from gym import spaces + from multiagent.multi_discrete import MultiDiscrete + for space in env.action_space: + assert (isinstance(space, spaces.Discrete) + or isinstance(space, MultiDiscrete)) + + critic_in_dim = sum(env.obs_shape_n) + sum(env.act_shape_n) + logger.info('critic_in_dim: {}'.format(critic_in_dim)) + + agents = [] + for i in range(env.n): + model = MAModel(env.obs_shape_n[i], env.act_shape_n[i], critic_in_dim) + algorithm = MADDPG( + model, + agent_index=i, + act_space=env.action_space, + gamma=GAMMA, + tau=TAU, + critic_lr=CRITIC_LR, + actor_lr=ACTOR_LR) + agent = MAAgent( + algorithm, + agent_index=i, + obs_dim_n=env.obs_shape_n, + act_dim_n=env.act_shape_n, + batch_size=BATCH_SIZE, + speedup=(not args.restore)) + agents.append(agent) + + self.env = env + self.agents = agents + + def set_weights(self, weights): + for i, agent in enumerate(self.agents): + agent.set_weights(weights[i]) + + def run_episode(self): + obs_n = self.env.reset() + total_reward = 0 + agents_reward = [0 for _ in range(self.env.n)] + steps = 0 + + experience = [] + while True: + steps += 1 + action_n = [agent.predict(obs) for agent, obs in zip(self.agents, obs_n)] + next_obs_n, reward_n, done_n, _ = self.env.step(action_n) + done = all(done_n) + terminal = (steps >= MAX_STEP_PER_EPISODE) + + # store experience + experience.append((obs_n, action_n, reward_n, + next_obs_n, done_n)) + + # compute reward of every agent + obs_n = next_obs_n + for i, reward in enumerate(reward_n): + total_reward += reward + agents_reward[i] += reward + + # check the end of an episode + if done or terminal: + break + + return experience, total_reward, agents_reward, steps + + diff --git a/benchmark/torch/maddpg/simple_agent.py b/benchmark/torch/maddpg/simple_agent.py index e43d444f7..4d88fb8a9 100644 --- a/benchmark/torch/maddpg/simple_agent.py +++ b/benchmark/torch/maddpg/simple_agent.py @@ -15,8 +15,6 @@ import parl import torch import numpy as np -from parl.utils import ReplayMemory -from parl.utils import machine_info, get_gpu_count class MAAgent(parl.Agent): @@ -39,12 +37,7 @@ def __init__(self, self.speedup = speedup self.n = len(act_dim_n) - self.memory_size = int(1e5) self.min_memory_size = batch_size * 25 # batch_size * args.max_episode_len - self.rpm = ReplayMemory( - max_size=self.memory_size, - obs_dim=self.obs_dim_n[agent_index], - act_dim=self.act_dim_n[agent_index]) self.global_train_step = 0 self.device = torch.device("cuda" if torch.cuda. is_available() else "cpu") @@ -62,7 +55,7 @@ def predict(self, obs, use_target_model=False): act_numpy = act.detach().cpu().numpy().flatten() return act_numpy - def learn(self, agents): + def learn(self, agents, rpms): """ sample batch, compute q_target and train """ self.global_train_step += 1 @@ -70,8 +63,10 @@ def learn(self, agents): # only update parameter every 100 steps if self.global_train_step % 100 != 0: return 0.0 + + rpm = rpms[self.agent_index] - if self.rpm.size() <= self.min_memory_size: + if rpm.size() <= self.min_memory_size: return 0.0 batch_obs_n = [] @@ -79,14 +74,14 @@ def learn(self, agents): batch_obs_next_n = [] # sample batch - rpm_sample_index = self.rpm.make_index(self.batch_size) + rpm_sample_index = rpm.make_index(self.batch_size) for i in range(self.n): batch_obs, batch_act, _, batch_obs_next, _ \ - = agents[i].rpm.sample_batch_by_index(rpm_sample_index) + = rpms[i].sample_batch_by_index(rpm_sample_index) batch_obs_n.append(batch_obs) batch_act_n.append(batch_act) batch_obs_next_n.append(batch_obs_next) - _, _, batch_rew, _, batch_isOver = self.rpm.sample_batch_by_index( + _, _, batch_rew, _, batch_isOver = rpm.sample_batch_by_index( rpm_sample_index) batch_obs_n = [ torch.FloatTensor(obs).to(self.device) for obs in batch_obs_n @@ -117,6 +112,3 @@ def learn(self, agents): critic_cost = critic_cost.cpu().detach().numpy() return critic_cost - - def add_experience(self, obs, act, reward, next_obs, terminal): - self.rpm.append(obs, act, reward, next_obs, terminal) diff --git a/benchmark/torch/maddpg/train.py b/benchmark/torch/maddpg/train.py index a3f175ecc..5608cd98f 100644 --- a/benchmark/torch/maddpg/train.py +++ b/benchmark/torch/maddpg/train.py @@ -16,11 +16,15 @@ import time import argparse import numpy as np +import parl from simple_model import MAModel from simple_agent import MAAgent from parl.algorithms import MADDPG from parl.env.multiagent_simple_env import MAenv from parl.utils import logger, summary +from parl.utils import ReplayMemory + +from actor import Actor CRITIC_LR = 0.01 # learning rate for the critic model ACTOR_LR = 0.01 # learning rate of the actor model @@ -32,51 +36,6 @@ STAT_RATE = 1000 # statistical interval of save model or count reward -def run_episode(env, agents): - obs_n = env.reset() - total_reward = 0 - agents_reward = [0 for _ in range(env.n)] - steps = 0 - while True: - steps += 1 - action_n = [agent.predict(obs) for agent, obs in zip(agents, obs_n)] - next_obs_n, reward_n, done_n, _ = env.step(action_n) - done = all(done_n) - terminal = (steps >= MAX_STEP_PER_EPISODE) - - # store experience - for i, agent in enumerate(agents): - agent.add_experience(obs_n[i], action_n[i], reward_n[i], - next_obs_n[i], done_n[i]) - - # compute reward of every agent - obs_n = next_obs_n - for i, reward in enumerate(reward_n): - total_reward += reward - agents_reward[i] += reward - - # check the end of an episode - if done or terminal: - break - - # show animation - if args.show: - time.sleep(0.1) - env.render() - - # show model effect without training - if args.restore and args.show: - continue - - # learn policy - for i, agent in enumerate(agents): - critic_loss = agent.learn(agents) - if critic_loss != 0.0: - summary.add_scalar('critic_loss_%d' % i, critic_loss, - agent.global_train_step) - - return total_reward, agents_reward, steps - def train_agent(): env = MAenv(args.env) @@ -140,43 +99,75 @@ def train_agent(): 'model file {} does not exits'.format(model_file)) agents[i].restore(model_file) + rpms = [] + memory_size = int(1e5) + for i in range(len(agents)): + rpm = ReplayMemory( + max_size=memory_size, + obs_dim=env.obs_shape_n[i], + act_dim=env.act_shape_n[i]) + rpms.append(rpm) + + parl.connect(args.master_address) + remote_actors = [Actor(args) for _ in range(args.actor_num)] + t_start = time.time() logger.info('Starting...') while total_episodes <= MAX_EPISODES: - # run an episode - ep_reward, ep_agent_rewards, steps = run_episode(env, agents) - summary.add_scalar('train_reward/episode', ep_reward, total_episodes) - summary.add_scalar('train_reward/step', ep_reward, total_steps) - if args.show: - print('episode {}, reward {}, agents rewards {}, steps {}'.format( - total_episodes, ep_reward, ep_agent_rewards, steps)) - - # Record reward - total_steps += steps - total_episodes += 1 - episode_rewards.append(ep_reward) - for i in range(env.n): - agent_rewards[i].append(ep_agent_rewards[i]) - - # Keep track of final episode reward - if total_episodes % STAT_RATE == 0: - mean_episode_reward = round( - np.mean(episode_rewards[-STAT_RATE:]), 3) - final_ep_ag_rewards = [] # agent rewards for training curve - for rew in agent_rewards: - final_ep_ag_rewards.append(round(np.mean(rew[-STAT_RATE:]), 2)) - use_time = round(time.time() - t_start, 3) - logger.info( - 'Steps: {}, Episodes: {}, Mean episode reward: {}, mean agents rewards {}, Time: {}' - .format(total_steps, total_episodes, mean_episode_reward, - final_ep_ag_rewards, use_time)) - t_start = time.time() - summary.add_scalar('mean_episode_reward/episode', - mean_episode_reward, total_episodes) - summary.add_scalar('mean_episode_reward/step', mean_episode_reward, - total_steps) - summary.add_scalar('use_time/1000episode', use_time, - total_episodes) + latest_weights = [agent.get_weights() for agent in agents] + for remote_actor in remote_actors: + remote_actor.set_weights(latest_weights) + result_object_id = [remote_actor.run_episode() for remote_actor in remote_actors] + for future_object in result_object_id: + experience, ep_reward, ep_agent_rewards, steps = future_object.get() + + summary.add_scalar('train_reward/episode', ep_reward, total_episodes) + summary.add_scalar('train_reward/step', ep_reward, total_steps) + if args.show: + print('episode {}, reward {}, agents rewards {}, steps {}'.format( + total_episodes, ep_reward, ep_agent_rewards, steps)) + + # add experience + for obs_n, action_n, reward_n, next_obs_n, done_n in experience: + for i, rpm in enumerate(rpms): + rpm.append(obs_n[i], action_n[i], reward_n[i], + next_obs_n[i], done_n[i]) + + # train/sample rate + for _ in range(len(experience)): + # learn policy + for i, agent in enumerate(agents): + critic_loss = agent.learn(agents, rpms) + if critic_loss != 0.0: + summary.add_scalar('critic_loss_%d' % i, critic_loss, + agent.global_train_step) + + # Record reward + total_steps += steps + total_episodes += 1 + episode_rewards.append(ep_reward) + for i in range(env.n): + agent_rewards[i].append(ep_agent_rewards[i]) + + # Keep track of final episode reward + if total_episodes % STAT_RATE == 0: + mean_episode_reward = round( + np.mean(episode_rewards[-STAT_RATE:]), 3) + final_ep_ag_rewards = [] # agent rewards for training curve + for rew in agent_rewards: + final_ep_ag_rewards.append(round(np.mean(rew[-STAT_RATE:]), 2)) + use_time = round(time.time() - t_start, 3) + logger.info( + 'Steps: {}, Episodes: {}, Mean episode reward: {}, mean agents rewards {}, Time: {}' + .format(total_steps, total_episodes, mean_episode_reward, + final_ep_ag_rewards, use_time)) + t_start = time.time() + summary.add_scalar('mean_episode_reward/episode', + mean_episode_reward, total_episodes) + summary.add_scalar('mean_episode_reward/step', mean_episode_reward, + total_steps) + summary.add_scalar('use_time/1000episode', use_time, + total_episodes) # save model if not args.restore: @@ -208,6 +199,14 @@ def train_agent(): type=str, default='./model', help='directory for saving model') + parser.add_argument( + '--master_address', + type=str, + default='localhost:8010') + parser.add_argument( + '--actor_num', + type=int, + default=1) args = parser.parse_args() logger.set_dir('./train_log/' + str(args.env))