上一节使用parl实现了并行计算 使用parl实现并行计算, 但是这是建立在项目里面不包含除了主文件以外的文件夹的情况下。那么怎样实现强化学习并行训练呢,包括实现多机并行训练。 parl实现强化学习并行训练 要实现强行学习并行训练,那么需要分发文件。 文件分发是分布式并行计算的重要功能。它负责把用户的代码还有配置文件分发到不同的机器上,让所有的机器都运行同样的代码进行并行计算。默认情况下,XPARL分发主文件(例如: main.py )所在目录下所有 .py 结尾文件。但是有时候用户需要分发一些特定的文件,比如模型文件、配置文件、子目录下的Python代码(用于import的子模块)。为了满足这个需求,parl.connect 提供了接口,用户可直接指定需要分发的文件或代码。 示例:? 文件目录结构如下,我们想分发policy文件夹中的 .py 文件。 我们可以在 connect 的时候传入想要分发的文件到 distributed_files 参数中,该参数支持正则表达式。 . ├── main.py └── policy ├── agent.py ├── config.ini └── init.py parl.connect(“localhost:8004”, distributed_files=[’./policy/.py’, './policy/.ini’])
具体实现实例如下:
import numpy as np
import parl
from Agent.DoNothingAgent import DoNothingAgent
from utilize.form_action import *
from utilize.collect_data import collection_data
from utilize.rangion_file import collect_pv, plt_pv
from Agent.RandomAgent import RandomAgent
from Environment.base_env import Environment
from utilize.settings import settings
from Agent.model import DDPGModel
from parl.utils import logger, summary, ReplayMemory
import os
import matplotlib.pyplot as plt
from parl.utils.logger import info as infos
from Agent.agent_DDPG import Agent_DDPG
from yml_creator import main
from parl.env.vector_env import VectorEnv
ACTOR_LR = 1e-4
CRITIC_LR = 1e-4
GAMMA = 0.999
TAU = 0.001
MEMORY_SIZE = int(1e5)
MEMORY_WARMUP_SIZE = MEMORY_SIZE // 20
BATCH_SIZE = 10
REWARD_SCALE = 10
NOISE = 0.05
TRAIN_EPISODE = 1000
show_every = 20
max_timestep = 288
from time import time
collection_datas_obs = collection_data()
collection_datas_reward = collection_data()
collection_datas_act = collection_data()
collection_p = []
collection_v = []
import paddle
obs_dim = 3272
act_dim = 108
rpm = ReplayMemory(
max_size=MEMORY_SIZE, obs_dim=obs_dim, act_dim=act_dim)
model = DDPGModel(obs_dim, act_dim)
algorithm = parl.algorithms.DDPG(
model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
my_agent = Agent_DDPG(algorithm, act_dim)
load_path = './DDPG_model_every.ckpt'
if os.path.exists(load_path):
my_agent.restore('./save_files/grid_model_every.pdparams')
@parl.remote_class
class Trian_evaluate(object):
def create_env(self):
self.envs = []
for _ in range(5):
env = Environment(settings, "EPRIReward")
self.envs.append(env)
self.vector_env = VectorEnv(self.envs)
return self.vector_env
def run_task(self):
max_reward = -np.inf
env = Environment(settings, "EPRIReward")
max_episode = TRAIN_EPISODE
print("sss")
rpm_file = "./save_files/rpm.npz"
rpm_path = "./save_files/rpm"
if not os.path.exists(rpm_file):
while rpm.size() < MEMORY_WARMUP_SIZE:
self.run_episode(my_agent, env, rpm)
rpm.save(rpm_path)
else:
rpm.load(rpm_file)
episode = 0
total_rewards = []
test_rewards = []
threads = []
while episode < max_episode:
total_reward = self.run_episode(my_agent, env, rpm, episode=episode, max_episode=max_episode)
episode += 1
if episode % show_every == 0:
test_reward = self.evaluate(env, my_agent)
total_rewards.append(total_reward)
test_rewards.append(test_reward)
save_path_every = './DDPG_model_every.ckpt'
my_agent.save(save_path_every)
paddle.save(my_agent.alg.model.state_dict(), './save_files/grid_model_every.pdparams')
if np.mean(test_rewards) > max_reward:
print("Reward is rising.....save model!!")
max_reward = np.mean(test_rewards)
save_path = './DDPG_model.ckpt'
my_agent.save(save_path)
paddle.save(my_agent.alg.model.state_dict(), './save_files/grid_model.pdparams')
metric = 'Episode:{}/{},Train total rewards:{},Test total rewards:{}'.format(episode, max_episode,
np.mean(total_rewards),
np.mean(test_rewards))
return metric
def yml_creator(self):
main()
def run_episode(self, agent, env, rpm, episode=None, max_episode=None):
obs = env.reset()
sample_id = env.sample_idx
total_reward = 0
done = False
critic_losses, actor_losses = [], []
for timestep in range(max_timestep):
sample_id = sample_id + 1
if episode is not None:
action_ = agent.act(obs, sample_id,episode)
else:
action_ = agent.act(obs, sample_id)
adjust_gen_p = action_[:54]
adjust_gen_v = action_[54:]
action = form_action(adjust_gen_p, adjust_gen_v)
next_obs, reward, done, info = env.step(action)
obs_array = agent.obs_class_to_numpy(obs)
next_obs_array = agent.obs_class_to_numpy(next_obs)
collect = False
if len(collection_datas_reward) < 10000 and collect:
collection_datas_obs.collect_and_get_mean_std(obs_array, "obs")
collection_datas_act.collect_and_get_mean_std(action_, 'act')
collection_datas_reward.collect_and_get_mean_std(REWARD_SCALE * reward, 'reward')
rpm.append(obs_array, action_, REWARD_SCALE * reward, next_obs_array, done)
if rpm.size() >= MEMORY_WARMUP_SIZE:
(batch_obs, batch_action, batch_reward, batch_next_obs,
batch_done) = rpm.sample_batch(BATCH_SIZE)
critic_loss, actor_loss = agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs, batch_done)
critic_losses.append(critic_loss.numpy()[0])
actor_losses.append(actor_loss.numpy()[0])
obs = next_obs
total_reward += reward
if done or timestep >= 288:
break
return total_reward
def evaluate(self, env, agent):
obs = env.reset()
sample_id = env.sample_idx
total_reward = 0
done = False
for timestep in range(max_timestep):
sample_id = sample_id + 1
_, action_ = agent.predict(obs, sample_id)
adjust_gen_p = action_[:54]
adjust_gen_v = action_[54:]
action = form_action(adjust_gen_p, adjust_gen_v)
next_obs, reward, done, info = env.step(action)
obs = next_obs
total_reward += reward
infos("Evaluate...,timestep:{},reward:{},toatal reward:{}".format(timestep, reward, total_reward))
if done or timestep >= 288:
break
return total_reward
import paddle
import threading
if __name__ == "__main__":
parl.connect("192.168.8.104:8010", distributed_files=['./Environment/*.py',"./Reward/*.py",
"./utilize/*.py","./utilize/exceptions/*.py",
"./utilize/parameters/*.yml","./model_jm/*.model",
"./Observation/*.py","./Agent/*.py","./*.so",
"./*.so.2.0.0","./*.so.*","./save_files/*.txt", "./data/*.csv"])
threads = []
for _ in range(11):
a = Trian_evaluate()
th = threading.Thread(target=a.run_task)
th.start()
threads.append(th)
for th in threads:
th.join()
如果需要实现多机并行训练,那么首先需要保证所有机器连接在同一个局域网内。 在一台机器开启了一个CPU集群后,接着使用命令把其他机器的CPU加入集群。 启动集群后,就可以直接使用集群了,如果CPU资源不够用,你可以在任何时候和任何机器(包括本机或其他机器)上,通过执行 xparl connect 命令把更多CPU资源加入到集群中。
xparl connect --address localhost:6006
它会启动一个工作节点(worker),并把当前机器的CPU资源加入到 --address 指定的master集群。worker默认会把当前机器所有的可用的CPU资源加入到集群中,如果你需要指定加入的CPU数量,也可以在上述命令上加入选项 --cpu_num [CPU_NUM] 。
请注意 xparl connect 可以在任何时候用于添加更多的CPUs到集群。 参考:parl 官网
|