Deep Deterministic Policy Gradient
算法思想
? 在介绍DDPG之前,先来介绍几个概念:确定性策略是指某个状态执行的动作是一定的;随机性策略是指在某个状态得到的是执行动作的概率,可能执行不同的动作。确定性策略是和随机策略相对而言的,对于某一些动作集合来说,它可能是连续值,或者非常高维的离散值,这样动作的空间维度极大。如果我们使用随机策略,即像DQN一样研究它所有的可能动作的概率,并计算各个可能的动作的价值的话,那需要的样本量是非常大才可行的。于是有人就想出使用确定性策略来简化这个问题。
? 深度确定性策略梯度(DDPG)是基于Actor-Critic、off-pollicy,针对连续行为的策略学习方法,可以在连续空间中进行操作。可以直接以原始像素作为输入。
? 随着DQN的流行,使用神经网络来对action-value function进行估计非常常见,但是DQN只能解决离散以及低维的动作空间,但是真实的物理环境中,大部分的动作空间都是连续且高维的,在处理时,DQN是不适合的,因为它依赖于在每一次最优迭代中寻找最大化action-value function(表现为在Q神经网络中输出每个动作的值函数),针对连续动作空间,DQN没有办法输出每个动作的动作值函数。解决连续问题的一个很简单的思路是将动作离散化,但是这个存在的一个问题是维度爆炸,随着自由度的提高,动作数量是指数增长的。并且直接的离散化会丢失action domain里的结构信息,这些信息可能对于解决问题是非常重要的。
? 对于某些问题,这种带有神经函数approximator的actor-critic方法的简单应用是不稳定的。DQN以一种稳定鲁棒的方式学习到function approximator,主要由于两个方面的创新:1、使用回放缓冲区(replay buffer)中的样本对网络进行训练,以最小化样本之间的相关性。2、使用目标Q网络对网络进行训练,以便在备份期间提供一致的目标。
? DDPG该方法的一个关键特征是其简单性:它只需要一个简单的AC体系结构和学习算法,只有很少的“moving parts”,使得它易于实现和扩展到更困难的问题和更大的网络。
DDPG Innovation
? DDPG从当前网络到目标Q网络的复制和DQN不一样。在DQN中,我们是直接把将当前Q网络的参数复制到目标Q网络,即
ω
′
=
ω
{\omega'} = {\omega}
ω′=ω DDPG这里没有使用这种硬更新,而是使用了软更新,即每次参数只更新一点点,即:
ω
′
?
γ
ω
+
(
1
?
γ
)
ω
′
(1)
{\omega'} {\longleftarrow} {\gamma\omega}+(1-{\gamma}){\omega'} \tag{1}
ω′?γω+(1?γ)ω′(1)
θ
′
?
γ
θ
+
(
1
?
γ
)
θ
′
(2)
{\theta'} {\longleftarrow} {\gamma\theta}+(1-{\gamma}){\theta'} \tag{2}
θ′?γθ+(1?γ)θ′(2)
? 其中
γ
{\gamma}
γ 是更新系数,一般取的比较小,比如0.1或者0.01这样的值。
? 这意味着target value受到约束,这可能会减慢学习的速率,但这会极大提高学习的稳定性。缓解了学习action-value function相对不稳定的问题,与监督学习的情况更接近。
? 当从低维特征向量观察中学习时,观察可能有不同的物理单位(例如位置与速度)并且范围可跨越环境而变化。这使得人们难以对网络进行有效的学习,并且可能很难找到超越环境的泛化的超参数,因此解决方法是手动缩放特征。使用batch normalization
? 在连续动作空间中学习的一个主要挑战是探索。DDPG离线策略算法的优势是我们可以将探索独立与学习算法之上。
? 这里使用的是Ornstein-Uhlenbeck方法
伪代码
算法实现
import tensorflow as tf
import numpy as np
import gym
import os
MAX_EPISODES = 100
MAX_EP_STEPS = 200
LR_A = 0.001
LR_C = 0.002
GAMMA = 0.9
TAU = 0.01
MEMORY_CAPACITY = 10000
BATCH_SIZE = 32
RENDER = False
ALG_NAME = 'DDPG'
ENV_NAME = 'Pendulum-v0'
File = 'DDPG1_1'
class DDPG(object):
def __init__(self, a_dim, s_dim, a_bound, ):
self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)
self.pointer = 0
self.action_dim, self.state_dim, self.a_bound = a_dim, s_dim, a_bound,
self.actor_model = self.build_actor()
self.critic_model = self.build_critic()
self.target_actor = self.build_actor()
self.target_critic = self.build_critic()
self.update_target(self.target_actor.variables, self.actor_model.variables, TAU)
self.update_target(self.target_critic.variables, self.critic_model.variables, TAU)
self.optimizer_actor = tf.keras.optimizers.Adam(LR_A)
self.optimizer_critic = tf.keras.optimizers.Adam(LR_C)
def choose_action(self, s):
return self.actor_model(s[np.newaxis, :])[0]
def learn(self):
self.update_target(self.target_actor.variables, self.actor_model.variables, TAU)
self.update_target(self.target_critic.variables, self.critic_model.variables, TAU)
indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
bt = self.memory[indices, :]
bs = bt[:, :self.state_dim]
ba = bt[:, self.state_dim: self.state_dim + self.action_dim]
br = bt[:, -self.state_dim - 1: -self.state_dim]
bs_ = bt[:, -self.state_dim:]
with tf.GradientTape() as tape:
action = self.actor_model(bs)
critic = self.critic_model([bs, action])
a_loss = -tf.reduce_mean(critic)
actor_grad = tape.gradient(a_loss, self.actor_model.trainable_variables)
self.optimizer_actor.apply_gradients(zip(actor_grad, self.actor_model.trainable_variables))
with tf.GradientTape() as tape:
q_target = br + GAMMA * self.target_critic([bs_, self.target_actor(bs_)])
td_error = tf.compat.v1.losses.mean_squared_error(labels=q_target, predictions=self.critic_model([bs, ba]))
critic_grad = tape.gradient(td_error, self.critic_model.trainable_variables)
self.optimizer_critic.apply_gradients(zip(critic_grad, self.critic_model.trainable_variables))
def update_target(self, target_weights, weights, tau):
for (a, b) in zip(target_weights, weights):
a.assign(b * tau + a * (1 - tau))
def store_transition(self, s, a, r, s_):
transition = np.hstack((s, a, [r], s_))
index = self.pointer % MEMORY_CAPACITY
self.memory[index, :] = transition
self.pointer += 1
def build_actor(self):
input = tf.keras.layers.Input(shape=[None, self.state_dim])
layer1 = tf.keras.layers.Dense(units=30, activation=tf.nn.relu)(input)
layer2 = tf.keras.layers.Dense(units=self.action_dim, activation=tf.nn.tanh)(layer1)
output = tf.multiply(layer2, self.a_bound)
model = tf.keras.Model(input, output)
return model
def build_critic(self):
state_input = tf.keras.layers.Input(shape=(self.state_dim))
state_out = tf.keras.layers.Dense(30, use_bias=False)(state_input)
action_input = tf.keras.layers.Input(shape=(self.action_dim))
action_out = tf.keras.layers.Dense(30, use_bias=True)(action_input)
out = tf.nn.relu(state_out + action_out)
out = tf.keras.layers.Dense(1)(out)
model = tf.keras.Model([state_input, action_input], out)
return model
def saveModel(self):
path = os.path.join('model', '_'.join([File, ALG_NAME, ENV_NAME]))
if not os.path.exists(path):
os.makedirs(path)
self.actor_model.save_weights(os.path.join(path, 'actor.tf'), save_format='tf')
self.critic_model.save_weights(os.path.join(path, 'critic.tf'), save_format='tf')
print('Saved weights.')
def loadModel(self):
path = os.path.join('model', '_'.join([File, ALG_NAME, ENV_NAME]))
if os.path.exists(path):
print('Load DDPG Network parametets ...')
self.actor_model.load_weights(os.path.join(path, 'actor.tf'))
self.critic_model.load_weights(os.path.join(path, 'critic.tf'))
print('Load weights!')
else:
print("No model file find, please train model first...")
if __name__ == "__main__":
env = gym.make(ENV_NAME)
env = env.unwrapped
env.seed(1)
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
a_bound = env.action_space.high
ddpg = DDPG(a_dim, s_dim, a_bound)
var = 3
for episode in range(MAX_EPISODES):
s = env.reset()
ep_reward = 0
for j in range(MAX_EP_STEPS):
if RENDER:
env.render()
a = ddpg.choose_action(s)
a = np.clip(np.random.normal(a, var), -2, 2)
s_, r, done, info = env.step(a)
ddpg.store_transition(s, a, r / 10, s_)
if ddpg.pointer > MEMORY_CAPACITY:
var *= .9995
ddpg.learn()
s = s_
ep_reward += r
if j == MAX_EP_STEPS - 1:
print('Episode:', episode, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var, )
break
ddpg.saveModel()
附录
- Ornstein-Uhlenbeck(奥恩斯坦-乌伦贝克)
- 高斯噪声
- 贝尔曼方程
参考文献
|