IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> parl实现强化学习并行训练 -> 正文阅读

[Python知识库]parl实现强化学习并行训练

上一节使用parl实现了并行计算
使用parl实现并行计算
但是这是建立在项目里面不包含除了主文件以外的文件夹的情况下。那么怎样实现强化学习并行训练呢,包括实现多机并行训练。
parl实现强化学习并行训练
要实现强行学习并行训练,那么需要分发文件。
文件分发是分布式并行计算的重要功能。它负责把用户的代码还有配置文件分发到不同的机器上,让所有的机器都运行同样的代码进行并行计算。默认情况下,XPARL分发主文件(例如: main.py )所在目录下所有 .py 结尾文件。但是有时候用户需要分发一些特定的文件,比如模型文件、配置文件、子目录下的Python代码(用于import的子模块)。为了满足这个需求,parl.connect 提供了接口,用户可直接指定需要分发的文件或代码。
示例:?
文件目录结构如下,我们想分发policy文件夹中的 .py 文件。 我们可以在 connect 的时候传入想要分发的文件到 distributed_files 参数中,该参数支持正则表达式。
.
├── main.py
└── policy
├── agent.py
├── config.ini
└── init.py
parl.connect(“localhost:8004”, distributed_files=[’./policy/.py’, './policy/.ini’])

具体实现实例如下:

# -*- coding: UTF-8 -*-

import numpy as np
import parl
from Agent.DoNothingAgent import DoNothingAgent
from utilize.form_action import *
from utilize.collect_data import collection_data
from utilize.rangion_file import collect_pv, plt_pv
from Agent.RandomAgent import RandomAgent
from Environment.base_env import Environment
from utilize.settings import settings
from Agent.model import DDPGModel
from parl.utils import logger, summary, ReplayMemory
import os
import matplotlib.pyplot as plt
from parl.utils.logger import info as infos
from Agent.agent_DDPG import Agent_DDPG
from yml_creator import main
from parl.env.vector_env import VectorEnv
ACTOR_LR = 1e-4  # Actor网络的 learning rate
CRITIC_LR = 1e-4  # Critic网络的 learning rate
GAMMA = 0.999  # reward 的衰减因子
TAU = 0.001  # 软更新的系数
MEMORY_SIZE = int(1e5)  # 经验池大小
MEMORY_WARMUP_SIZE = MEMORY_SIZE // 20  # 预存一部分经验之后再开始训练
BATCH_SIZE = 10
REWARD_SCALE = 10  # reward 缩放系数(使得s,a,r,s‘在一个数量级上)
NOISE = 0.05  # 动作噪声方差
TRAIN_EPISODE = 1000  # 训练的总episode数
show_every = 20
max_timestep = 288  # 最大时间步数
from time import time

collection_datas_obs = collection_data()
collection_datas_reward = collection_data()
collection_datas_act = collection_data()
collection_p = []
collection_v = []
import paddle
#paddle.set_device('cpu')


# obs = env.reset()
obs_dim = 3272  # obs_array.shape[0]
act_dim = 108

rpm = ReplayMemory(
    max_size=MEMORY_SIZE, obs_dim=obs_dim, act_dim=act_dim)
# 使用PARL框架创建agent
model = DDPGModel(obs_dim, act_dim)
algorithm = parl.algorithms.DDPG(
    model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
my_agent = Agent_DDPG(algorithm, act_dim)
# 加载模型
load_path = './DDPG_model_every.ckpt'
if os.path.exists(load_path):
    my_agent.restore('./save_files/grid_model_every.pdparams')

# 训练一个episode

@parl.remote_class
class Trian_evaluate(object):

    def create_env(self):
        self.envs = []
        for _ in range(5):
            env = Environment(settings, "EPRIReward")
            self.envs.append(env)
        self.vector_env = VectorEnv(self.envs)

        return self.vector_env

    def run_task(self):
        #my_agent = self.agent
        #rpm = self.rpm
        max_reward = -np.inf
        env = Environment(settings, "EPRIReward")
        #env  = self.create_env()
        max_episode = TRAIN_EPISODE  # 回合数
        # 创建经验池
        # rpm = ReplayMemory(MEMORY_SIZE)
        print("sss")
        # 往经验池中预存数据
        rpm_file = "./save_files/rpm.npz"
        rpm_path = "./save_files/rpm"
        if not os.path.exists(rpm_file):
            while rpm.size() < MEMORY_WARMUP_SIZE:
                self.run_episode(my_agent, env, rpm)
            rpm.save(rpm_path)
        else:
            rpm.load(rpm_file)

        # 开始训练
        episode = 0
        total_rewards = []
        test_rewards = []
        threads = []
        while episode < max_episode:
            # print('------ episode ', episode)
            # env = Environment(settings, "EPRIReward")
            # print('------ reset ')
            # obs = env.reset()
            # reward = 0.0
            # done = False
            total_reward = self.run_episode(my_agent, env, rpm, episode=episode, max_episode=max_episode)
            episode += 1
            if episode % show_every == 0:
                test_reward = self.evaluate(env, my_agent)
                total_rewards.append(total_reward)
                test_rewards.append(test_reward)
                # 保存模型
                save_path_every = './DDPG_model_every.ckpt'
                my_agent.save(save_path_every)
                paddle.save(my_agent.alg.model.state_dict(), './save_files/grid_model_every.pdparams')
                if np.mean(test_rewards) > max_reward:
                    print("Reward is rising.....save model!!")
                    max_reward = np.mean(test_rewards)
                    # 保存模型
                    save_path = './DDPG_model.ckpt'
                    my_agent.save(save_path)
                    paddle.save(my_agent.alg.model.state_dict(), './save_files/grid_model.pdparams')

                metric = 'Episode:{}/{},Train total rewards:{},Test total rewards:{}'.format(episode, max_episode,
                                                                                          np.mean(total_rewards),
                                                                                          np.mean(test_rewards))
                return metric
        # pv = collect_pv(collection_p,collection_v)
        # plt_pv(pv)
        #plt.plot(total_rewards, label='Train Rewards')
        #plt.plot(test_rewards, label="Test Rewards")
        #plt.legend()
        #plt.savefig('rewards.png', dpi=300)
        #plt.show()

    def yml_creator(self):
        main()

    def run_episode(self, agent, env, rpm, episode=None, max_episode=None):
        obs = env.reset()
        sample_id = env.sample_idx
        # obs_array = agent.obs_class_to_numpy(obs)
        # collection_datas_obs.collect_and_get_mean_std(obs_array, "obs")  # 得到mean和std
        total_reward = 0
        done = False
        critic_losses, actor_losses = [], []
        # while not done:
        for timestep in range(max_timestep):

            sample_id = sample_id + 1
            if episode is not None:
                action_ = agent.act(obs, sample_id,episode)
            else:
                action_ = agent.act(obs, sample_id)
            adjust_gen_p = action_[:54]
            adjust_gen_v = action_[54:]
            action = form_action(adjust_gen_p, adjust_gen_v)

            # 增加探索扰动, 输出限制在 [-1.0, 1.0] 范围内
            # print(env.sample_idx)
            next_obs, reward, done, info = env.step(action)
            # if info == {}:
            # collection_p.append(adjust_gen_p)
            # collection_v.append(adjust_gen_v)
            # print("sss",next_obs_array[:6])
            # action = [action]  # 方便存入replaymemory  (改动的地方)
            obs_array = agent.obs_class_to_numpy(obs)
            next_obs_array = agent.obs_class_to_numpy(next_obs)

            # 是否收集数据,更新mean和std值
            collect = False
            if len(collection_datas_reward) < 10000 and collect:
                collection_datas_obs.collect_and_get_mean_std(obs_array, "obs")  # 得到mean和std
                collection_datas_act.collect_and_get_mean_std(action_, 'act')
                collection_datas_reward.collect_and_get_mean_std(REWARD_SCALE * reward, 'reward')
            rpm.append(obs_array, action_, REWARD_SCALE * reward, next_obs_array, done)

            if rpm.size() >= MEMORY_WARMUP_SIZE:
                (batch_obs, batch_action, batch_reward, batch_next_obs,
                 batch_done) = rpm.sample_batch(BATCH_SIZE)
                critic_loss, actor_loss = agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs, batch_done)
                critic_losses.append(critic_loss.numpy()[0])
                actor_losses.append(actor_loss.numpy()[0])

            obs = next_obs
            total_reward += reward
            #if episode is not None:
                #infos(
                    #"Train...episode:{}/{},timestep:{},reward:{},toatal reward:{},critic_loss:{},actor_loss:{},存储数据大小:{},info:{}".format(
                        #episode + 1, max_episode, timestep,
                        #reward, total_reward, np.mean(critic_losses), np.mean(actor_losses),
                        #len(collection_datas_reward), info))

            if done or timestep >= 288:
                break

        return total_reward

    # 评估 agent
    def evaluate(self, env, agent):

        obs = env.reset()
        sample_id = env.sample_idx
        total_reward = 0
        done = False

        # while not done:
        for timestep in range(max_timestep):
            sample_id = sample_id + 1
            _, action_ = agent.predict(obs, sample_id)

            adjust_gen_p = action_[:54]
            adjust_gen_v = action_[54:]

            action = form_action(adjust_gen_p, adjust_gen_v)

            next_obs, reward, done, info = env.step(action)

            obs = next_obs
            total_reward += reward
            infos("Evaluate...,timestep:{},reward:{},toatal reward:{}".format(timestep, reward, total_reward))
            if done or timestep >= 288:
                break
        return total_reward




import paddle
import threading
if __name__ == "__main__":
    #parl.connect("localhost:8010")
    parl.connect("192.168.8.104:8010", distributed_files=['./Environment/*.py',"./Reward/*.py",
                                                      "./utilize/*.py","./utilize/exceptions/*.py",
                                                      "./utilize/parameters/*.yml","./model_jm/*.model",
                                                      "./Observation/*.py","./Agent/*.py","./*.so",
                                                      "./*.so.2.0.0","./*.so.*","./save_files/*.txt", "./data/*.csv"])
    threads = []
    for _ in range(11):
        a = Trian_evaluate()
        th = threading.Thread(target=a.run_task)
        #print(a.run_task())
        th.start()
        threads.append(th)
    for th in threads:
        th.join()

如果需要实现多机并行训练,那么首先需要保证所有机器连接在同一个局域网内。
在一台机器开启了一个CPU集群后,接着使用命令把其他机器的CPU加入集群。
启动集群后,就可以直接使用集群了,如果CPU资源不够用,你可以在任何时候和任何机器(包括本机或其他机器)上,通过执行 xparl connect 命令把更多CPU资源加入到集群中。

xparl connect --address localhost:6006

它会启动一个工作节点(worker),并把当前机器的CPU资源加入到 --address 指定的master集群。worker默认会把当前机器所有的可用的CPU资源加入到集群中,如果你需要指定加入的CPU数量,也可以在上述命令上加入选项 --cpu_num [CPU_NUM] 。

请注意 xparl connect 可以在任何时候用于添加更多的CPUs到集群。
参考:parl 官网

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-10-12 23:23:18  更:2021-10-12 23:23:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 21:43:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计