2022/4/7
DQN代码复现
一、经验回放
把所有样本存放在一起,随机抽取其中的样本进行训练。
- 去除了序列决策的样本关联
- 让样本可以重复利用
代码实现
import collections
import random
import numpy as np
class ReplayMemory:
def __init__(self, max_size):
self.buffer = collections.deque(maxlen=max_size)
def append(self, exp):
self.buffer.append(exp)
def sample(self, batch_size):
mini_batch = random.sample(self.buffer, batch_size)
obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []
for experience in mini_batch:
s, a, r, s_p, done = experience
obs_batch.append(s)
action_batch.append(a)
reward_batch.append(r)
next_obs_batch.append(s_p)
done_batch.append(done)
return np.array(obs_batch), np.array(action_batch), np.array(reward_batch), np.array(next_obs_batch), np.array(done_batch)
def __len__(self):
return len(self.buffer)
二、固定Q目标
采用两个网络进行训练。
- 解决了算法更新不平稳
三、Model
import numpy as np
from parl import layers
class Model:
def __init__(self, act_dim=1):
hid1_size = 128
hid2_size = 128
self.fc1 = layers.fc(size=hid1_size, act='relu')
self.fc2 = layers.fc(size=hid2_size, act='relu')
self.fc3 = layers.fc(size=act_dim, act=None)
def value(self, obs, act, num):
total = np.array([obs, act, num])
h1 = self.fc1(total)
h2 = self.fc2(h1)
Q = self.fc3(h2)
return Q
四、矩阵处理
import numpy as np
from numpy import random
def is_connect(mat,x1,y1,x2,y2):
if(mat[x1][y1]==mat[x2][y2] and ((x1==x2 and abs(y1-y2)<=1) or (abs(x1-x2)==1and abs(y1-y2)<=1))) :
return 1
else:
return 2
def all_selsect(mat,new,x,y):
for i in range(5):
for j in range(5):
if (is_connect(mat, x, y, i, j) == 1):
new[x*5+y][i * 5 + j] = 1
return new
def iter_paths(adj, min_length=2, path=None):
if not path:
for start_node in range(len(adj)):
yield from iter_paths(adj, min_length, [start_node])
else:
if len(path) >= min_length:
yield path
if path[-1] in path[:-1]:
return
current_node = path[-1]
for next_node in range(len(adj[current_node])):
if adj[current_node][next_node] == 1:
yield from iter_paths(adj, min_length, path + [next_node])
def all_paths(m):
list = []
for i in range(len(m)):
if (len(set(m[i])) == len(m[i])):
list.append(m[i])
return list
def max_path(list):
l = -1
k = -1
for i in range(len(list)):
if (len(list[i]) >= l):
k = i
l = len(list[i])
return list[k]
def path_transformation(path):
ll = []
for i in range(len(path)):
ll.append((int(path[i] / 5), path[i] % 5))
return ll
def zero_pad(list, matrix):
for i in range(len(list)-1):
matrix[list[i][0]][list[i][1]] = 0
matrix[list[-1][0]][list[-1][1]]= 2 * matrix[list[-1][0]][list[-1][1]]
return matrix
def matrix_min(martix):
k = np.max(martix)
for i in range(martix.shape[0]):
for j in range(martix.shape[1]):
if(martix[i][j]<k and martix[i][j] !=0):
k = martix[i][j]
return k
def all_action(martix):
max = np.max(martix)
min = matrix_min(martix)
list = []
for i in range(int(np.log2(max))):
if(i>=np.log2(min)-1):
list.append(2**(i+1))
return list
2022/4/8
1 动作空间
我们2048的游戏场景中,动作空间是一个大规模离散动作空间,但其中有很多的是无效动作,同时存在一些动作对当前局面的影响是相似的情况,因此我们可以采取一定的方法缩小动作空间,例如使用knn对所有的有效动作进行聚类,聚成10个类,那么我们最后的动作空间规模就缩小到10。
1.1 聚类具体过程
-
穷举出当前状态所有动作。 -
去除无效动作。 -
找出每一个动作的特征,例如:可连接数目,单调性等。 -
使用k-means进行聚类。
1.2 代码实现
找出所有有效动作
def all_action(martix, n):
max = np.max(martix)
min = matrix_min(martix)
list1 = []
for i in range(int(np.log2(max))):
if(i>=np.log2(min)-1):
list1.append(2**(i+1))
my_list = list(itertools.permutations(list1, n))
return my_list
K-Means聚类 待完成…
2022/4/9
1. 讨论
今天和潘超凡师兄讨论了一下,发下自己思路走偏了。。。做了几天无用功。
1.1 讨论结果
我们采用DQN的目标是怎样在最短的时间内让游戏达到死局。因此我们用此时的DQN网络去给一个动作打分,它的分越低,说明它让游戏达到死局所需的步数就越长,所以我们可以根据需要,去选择不同分数的动作,让游戏在我们要求的步数内到达死局。 DQN建模
- 输入:状态+动作
- 输出:Q值
- 奖励:局面评估函数的得分结果
- 目标:在最短时间内让游戏最难
1.2 下一步工作
- 复现DQN代码,重点是怎么把状态和动作同时作为输入得到Q值,师兄建议可以了解一下多模态网络。
- 局面评估函数
|