比赛网页:https://www.datafountain.cn/competitions/549
repo:https://github.com/opendilab/Gobigger-Explore/
入门篇
赛题说明
本次竞赛采用 Go-Bigger 作为游戏环境。Go-Bigger 是一款多人组队竞技游戏。更多细节请参考 Go-Bigger 文档。在游戏中,每支竞赛参赛队伍控制游戏中一支队伍(每支队伍由多个玩家组成)。竞赛参赛队伍需要通过提交智能体的方式,来对游戏中的某个队伍及其所包含的玩家进行控制,通过团队配合来获取更高的分数,从而在游戏中取得更高的排名。
游戏细节
在本次大赛中,对于一局游戏,参赛队伍将控制游戏中的一支队伍,并以此对抗游戏中的其余三支队伍。参赛队伍的目标是吃掉其他单位并获得更大的体积。只要在球比敌方大的时候和敌方进行碰撞,就能把敌方吃掉,获得更大的体积,但是,更大的体积意味着更慢的移动速度,我们需要在平衡两者的基础上进行决策。有以下几点需要注意:
- 每局比赛持续 10 分钟(3000个env step)
- 地图尺寸为 1000*1000
- 比赛模式为 FFA (Free For All),意味着每个参赛队伍都需要对抗任何其他队伍
- 一局比赛中,每支队伍会有 3 个可操控的玩家,因此用户提交的智能体需要控制队伍中的所有 3 个玩家。团队合作可以让你的队伍拿到更高的分数。
- 竞赛将会维护一个天梯系统用于为竞赛中的智能体打分。
其他游戏环境设置
食物球
在一局比赛开始的时候,地图中会初始化生成 2000 个食物球。每隔 2 秒,将会有新的 30 个食物球生成。食物球总数量的上限为 2500。每个食物球都是一个半径为 2 的圆形,并一直存在于地图中直到被吃掉。吃掉后体积增大。
荆棘球(尖刺)
与食物球类似,地图初始化的时候会生成 15 个荆棘球。每隔 2 秒,将会有新的 2 个荆棘球生成。荆棘球总数量的上限为 20。每个荆棘球的半径长度范围是 12 到 20。这种球往往比较大,需要达到一定大小才能吃掉,吃掉后球会分散成若干个小球,移动速度加快,小球会慢慢自己合并成大球,体积比原来的大球大。
玩家技能
当智能体体积大于10的时候,就可以使用分裂(split)和吐孢子(eject)技能。
分身
每个智能体可以分裂为总数量上限为 16 的分身球。每个分身球都会有基于本身重量的速度上限。对于每个分身球,重量越大,速度上限会越小。每个分身球的半径范围是 3 到 100。当分身球分裂之后,需要经过一个冷却期(20秒)才能够再度与该玩家的其他分身球进行合并。
孢子
每个智能体或分身球都可以吐出孢子,孢子可以被任何球吃掉。所有的孢子球都会有相同的半径,默认值为 3。吐出孢子后,球的体积会变小,吞噬孢子后,球体积变大,孢子能够推动荆棘球。
一些小技巧
- 利用分身后移速加快的特性,能够更快收集地图资源
- 在分身过程中遇到敌人,可以通过吐孢子的形式把质量迅速集中到大球上。
- 可以通过用孢子推动荆棘球使得地方分裂从而实现反打。
- 通过分裂可以迅速追上地方小球
提交方式
最终提交的 .tar.gz 文件解压后的目录格式如下:
- my_submission
| - __init__.py
| - requirements.txt
| - my_submission.py
| - supplements/
| - checkpoints or other materials
-
__init__.py 应当是一个空文件。 -
requirements.txt存放着运行你的程序需要的包,注意有的包服务器已经装好,不要重复安装。 -
my_submission.py存放着给定状态,根据自己的策略输出动作的关键任务,这是代码示例: from gobigger.agents import BotAgent
class BotSubmission(BaseSubmission):
def __init__(self, team_name, player_names):
super(BotSubmission, self).__init__(team_name, player_names)
self.agents = {}
for player_name in self.player_names:
self.agents[player_name] = BotAgent(name=player_name)
def get_actions(self, obs):
global_state, player_states = obs
actions = {}
for player_name, agent in self.agents.items():
action = agent.step(player_states[player_name])
actions[player_name] = action
return actions
可以看到,我们必须实现的函数只有get_actions,输入状态,返回预测的动作,而这里我们控制的不只有一个智能体。竞赛将会为每个submission提供 team_name 和 player_names 作为类的输入参数。team_name 代表的是该提交所控制的队伍的名称,在每一场比赛都会不一样。player_names 代表当前队伍中的所有控制的智能体的名称。竞赛在评测用户提交的时候会调用 get_actions() 以获取该提交在当前帧的 get_actions() 方法以 obs 作为输入(与 tutorial 相似)。 我们可以直接运行submission_example下的test.py文件,来测试submission是否满足要求。往往我们提交的文件也是复制到这里,然后进行测试,再提交。 运行完成之后,就会生成一个.tar.gz文件,可以直接提交到官网上。注意:.tar.gz是linux常用的打包方式,如果windows下打包失败,可以下载一个7z手动打包。
关于评分的方法,可以查看https://github.com/opendilab/GoBigger-Challenge-2021/blob/main/evaluation_zh.md
状态输入和动作输出
当我们上传我们的submission后,评价的依据是get_action中返回的各个智能体的动作在环境中的表现。而这个函数接收的状态obs包含两个字典:
global_state, player_state = obs
gloabl_state 包含地图尺寸,总时间,已持续时间,以及排行榜等信息。具体如下:
注意:当队友之间的距离越远,彰显地图的范围越大
{
'border': [map_width, map_height],
'total_time': match_time,
'last_time': last_time,
'leaderboard': {
team_name: team_size
}
}
player_state 包含玩家channel信息,玩家视角所在位置,玩家视角内所有出现的单位信息,当前玩家所属队伍名称。具体如下:
{
player_name: {
'feature_layers': list(numpy.ndarray),
'rectangle': [left_top_x, left_top_y, right_bottom_x, right_bottom_y],
'overlap': {
'food': [[position.x, position.y, radius], ...],
'thorns': [[position.x, position.y, radius], ...],
'spore': [[position.x, position.y, radius], ...],
'clone': [[[position.x, position.y, radius, player_name, team_name], ...],
},
'team_name': team_name,
}
}
其中,feature_layers相当于一张图,包含了地图中各个智能体的位置,可以减少训练时对overlap的处理,其中包含了15个channel,前12个channel代表了视野中各个队伍中每个智能体的信息,第13个channel中包含了视野中所有食物球的信息,第14个channel中包含了视野中各个孢子的信息,第15个channel中包含了视野中各个荆棘球的信息。
overlap中提供了各种球的位置和半径信息,玩家的球另外提供了智能体名称以及队伍名称。
为了让训练变得简单,在训练的时候,GoBigger提供了全局视野的借口哦,指定with_al_vision=True就能获取全局视野。在此模式下,由于不同玩家的视野是相同的,为了减少信息传输压力,我们只会在第一个玩家的信息字典中给出对应的全局视野信息。
{
'0': {
'feature_layers': list(numpy.ndarray),
'rectangle': None,
'overlap': {
'food': [{'position': position, 'radius': radius}, ...],
'thorns': [{'position': position, 'radius': radius}, ...],
'spore': [{'position': position, 'radius': radius}, ...],
'clone': [{'position': position, 'radius': radius, 'player': player_name, 'team': team_name}, ...],
},
'team_name': team_name,
},
'1': {
'feature_layers': None,
'rectangle': None,
'overlap': None,
'team_name': team_name,
},
'2': {
'feature_layers': None,
'rectangle': None,
'overlap': None,
'team_name': team_name,
},
'3': {
'feature_layers': None,
'rectangle': None,
'overlap': None,
'team_name': team_name,
},
}
需要注意的是,竞赛只会为当前提交提供属于他的智能体的信息。例如,当前submission所控制的队伍名下有两个智能体,分别为智能体A和智能体B,同时还有智能体C属于另一队伍,那么该提交能获取到的信息只包含智能体A和智能体B的信息,不会包含智能体C的信息。
get_action返回的动作是一个字典,包含各个智能体的名称和对应动作:
{
player_a: actions_a,
player_b: actions_b
player_c: actions_c
}
其中的每个动作对应了一个三维向量:
action = [x,y,action_type]
其中的x,y代表平面上两个方向的移动速递,action_type代表了技能释放的种类。其中action_type=-1代表不放任何技能,0代表在移动方向上吐出孢子并按给定的方向移动,1代表分裂并按照给定的方向移动,2代表停下来把分裂的球聚集,3代表吐出孢子但不改变方向,4代表分裂但不改变方向。
baseline的运行
观察空间
- 在baseline中,食物球,孢子等的数量实惠随时发生变化的,为了简便,这里直接采取了200个单位的信息,多于200个直接丢弃,少于200个则补零。
- 由于2D的视野范围是不断变化的,所以这里将2D图像在环境中缩放为固定的统一尺寸。
- 坐标需要采取相对坐标的形式,否则训练不稳定
动作空间
这个环境的动作空间是混合动作空间(动作类型 + 动作参数),我们可以用比较简单粗暴的方法离散化,把x,y直接离散成上下左右四个方向。
奖励函数
这个环境比拼的就是谁的队伍总体积/总体量/总重量更大,因此我们可以将整个队伍相邻两帧之间的大小差作为队伍的整体奖励。我们可以直接使用每一个step相加的奖励作为一个episode的奖励,一个episode奖励最大的队伍会获得胜利。我们还需要处理一下奖励函数的范围以防止它过大。
算法选择
由于动作空间被我们改造成离散的了,所以可以通过最经典的DQN算法 + 多模态编码器神经网络来训练我们的智能体。
网络模型选择
第一,我们可以把2D图像输入到模型中,经过几层网络变成一个特征向量。第二,我们需要建模各个单位之间的联系,每个单位是一个向量表示,那就可以复用Transformer中的编码器结构,并将编码器的输出在单位维度进行平均,获得最终的单位特征向量。第三,全局信息,就是一个非常常见的长向量,使用由全连接层构成的多层感知机即可。这三个部分,拼接在一起,可以构成长向量,然后通过一系列的全连接层,Dueling DQN和Double DQN也添加到了算法中。
训练流程
比赛环境设计到了多智能体,因此这属于多智能体强化学习的研究领域,但在这里,简化设计为使用Independent Q-Learning + Self-Play的方式来实现训练流程。
因此我们可以用baseline中的IQL算法,每一个玩家对应一个DQN策略,而每个策略都使用全局的奖励函数来指导优化,并且所有玩家的DQN策略共享一套神经网络参数。按照这样的逻辑,可以高度并行化地实现整个优化过程。这样的好处是,即使是不同的队伍,我们也可以使用同一套模型和参数进行训练。
电脑要求:内存32G及以上,显卡1060(6G)及以上。
首先我们要安装一个DI-engine库,我们用这个库所提供的算法来训练我们的环境。
DI-engine是一个通用决策智能平台。它支持大多数常用的深度强化学习算法,例如DQN,PPO,SAC以及许多研究子领域的相关算法——多智能体强化学习 中的QMIX,逆强化学习中的GAIL,探索问题中的RND。所有现已支持的算法和相关算法性能介绍可以查看 算法概述
安装命令:
git clone https://github.com/opendilab/DI-engine
cd YOUR_PATH/DI-engine/
pip install -e . --user
然后直接运行Gobigger-Explore/my_submission/entry/gobigger_vsbot_baseline_simple_main.py文件即可。跑完之后依旧运行test文件生成一个压缩包即可提交。
查看tensorboard:
tensorboard --bind_all --logdir 路径(到log文件夹)
进阶篇
游戏自定义
GoBigger 将可配置参数统一放在 gobigger/server/server_default_config.py 。玩家可以通过修改对应的参数,来实现各种不同的环境。同时,也可以设计自己的代理环境,用于对算法的快速验证。例如修改队伍和智能体数量,地图大小等。
参考文档:https://gobigger.readthedocs.io/zh_CN/latest/advanced/cfg_intro.html
同时,用户可以自由设定游戏开局时的场景,包括该场景中存在的各种球的数量、位置和大小。这样就能使得学习特定任务成为可能。
用户可以通过设置传入给 server 的初始化 config 来实现自定义开局游戏场景。通过 custom_init 字段的相关内容可以分别设置食物球、荆棘球、孢子球和玩家的分身球的相关细节。对于食物球、荆棘球、孢子球,我们开放了位置和半径的设置接口,对于玩家的分身球,除了位置和半径以外,用户还需要指定该球所属的玩家名称和队伍名称。注意,玩家名称和队伍名称的命名需要符合游戏的习惯,例如,在某局由 4 个队伍,每个队伍 3 名玩家组成的游戏中,如果要进行自定义开局设计,则玩家的名称的取值范围是从 0 到 11 的 str 类型,队伍名称的取值范围则是从 0 到 3 的 str 类型。如果相关的玩家没有被设置,他将会随机以最小半径出生在地图中。
我们也可以把过去的对局重载,从而探索解决方案。这个时候可以使用我们提供的接口来重载对局以达到上述目标!在传入 server 的初始化 config 中,有几个字断需要设置:
save_bin=False,
load_bin=False,
load_bin_path='',
load_bin_frame_num = 'all',
如上所示。要使用重载对局的功能,我们需要在第一局设置 save_bin 为 True ,这样第一局的信息将会保存在用户提供的 save_path 下,以 .pkl 为文件后缀。因此这一局结束之后,我们可以假设得到了名称为 /path/to/d11355d8-4c4e-11ec-8503-b49691521104.pkl 的信息文件。假设我们在第二局想要加载第一局的前300个动作帧,那么可以进行如下设置:
load_bin=True,
load_bin_path='/path/to/d11355d8-4c4e-11ec-8503-b49691521104.pkl',
load_bin_frame_num = 300,
那么在用户调用 server.reset() 的时候,我们会加载第一局文件信息中的前300帧,并将游戏停在300帧结束的位置。此时用户可以在此基础上继续 step 或者 obs 。
为了防止重载对局的加载时间过长,我们也可以单独保存某一帧的所有状态例如:
首先,我们在第301次step的时候,传入 save_frame_full_path 参数并将值设为文件保存路径(必须是文件路径,包含文件名称)。如下所示。
server.step(actions=actions, save_frame_full_path='./frame_test.pkl')
然后这次step结束之后,我们可以找到对应保存的文件,即这一帧内所有单位的信息。为了复现该帧,我们在传入 server 的初始化 config 中,新增了一个字段:
jump_to_frame_file = '',
该字段需要我们刚才得到的保存的文件的路径(包括文件名称)。如果设为 '' ,则意味着不进行跳跃。否则,将会把 server 的初始状态设置为对应帧的状态。
Gobigger还提供了特定条件下的小环境,方便我们做算法验证,以及训练。
https://gobigger.readthedocs.io/zh_CN/latest/advanced/hyper.html
训练和评估代码
参数文件
这个文件用来存放训练使用的所有参数配置,其他的文件的参数都要从这里调用,方便管理和修改。
from easydict import EasyDict
gobigger_config = dict(
exp_name='gobigger_baseline_v030',
env=dict(
collector_env_num=8,
evaluator_env_num=3,
n_evaluator_episode=3,
stop_value=1e10,
team_num=4,
player_num_per_team=3,
match_time=60*10,
map_height=1000,
map_width=1000,
spatial=False,
speed = False,
all_vision = False,
manager=dict(shared_memory=False, ),
),
policy=dict(
cuda=True,
on_policy=False,
priority=False,
priority_IS_weight=False,
model=dict(
scalar_shape=5,
food_shape=2,
food_relation_shape=150,
thorn_relation_shape=12,
clone_shape=17,
clone_relation_shape=12,
hidden_shape=128,
encode_shape=32,
action_type_shape=16,
),
learn=dict(
update_per_collect=8,
batch_size=128,
learning_rate=0.001,
target_theta=0.005,
discount_factor=0.99,
ignore_done=False,
learner=dict(
hook=dict(save_ckpt_after_iter=1000)),
),
collect=dict(n_sample=512, unroll_len=1, alpha=1.0),
eval=dict(evaluator=dict(eval_freq=1000,)),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.5,
decay=100000,
),
replay_buffer=dict(replay_buffer_size=100000, ),
),
),
)
main_config = EasyDict(gobigger_config)
gobigger_create_config = dict(
env=dict(
type='gobigger',
import_names=['dizoo.gobigger.envs.gobigger_env'],
),
env_manager=dict(type='subprocess'),
policy=dict(type='dqn'),
)
create_config = EasyDict(gobigger_create_config)
训练入口
import os
import numpy as np
import copy
from tensorboardX import SummaryWriter
import sys
sys.path.append('..')
from ding.config import compile_config
from ding.worker import BaseLearner, BattleSampleSerialCollector, BattleInteractionSerialEvaluator, NaiveReplayBuffer
from ding.envs import SyncSubprocessEnvManager
from policy.gobigger_policy import DQNPolicy
from ding.utils import set_pkg_seed
from ding.rl_utils import get_epsilon_greedy_fn
from gobigger.agents import BotAgent
from envs import GoBiggerSimpleEnv
from model import GoBiggerHybridActionSimpleV3
from config.gobigger_no_spatial_config import main_config
import torch
class RulePolicy:
def __init__(self, team_id: int, player_num_per_team: int):
self.collect_data = False
self.team_id = team_id
self.player_num = player_num_per_team
start, end = team_id * player_num_per_team, (team_id + 1) * player_num_per_team
self.bot = [BotAgent(str(i)) for i in range(start, end)]
def forward(self, data: dict, **kwargs) -> dict:
ret = {}
for env_id in data.keys():
action = []
for bot, raw_obs in zip(self.bot, data[env_id]['collate_ignore_raw_obs']):
raw_obs['overlap']['clone'] = [[x[0], x[1], x[2], int(x[3]), int(x[4])] for x in raw_obs['overlap']['clone']]
action.append(bot.step(raw_obs))
ret[env_id] = {'action': np.array(action)}
return ret
def reset(self, data_id: list = []) -> None:
pass
def main(cfg, seed=0, max_iterations=int(1e10)):
cfg.exp_name = 'gobigger-v030'
cfg = compile_config(
cfg,
SyncSubprocessEnvManager,
DQNPolicy,
BaseLearner,
BattleSampleSerialCollector,
BattleInteractionSerialEvaluator,
NaiveReplayBuffer,
save_cfg=True
)
collector_env_num, evaluator_env_num = cfg.env.collector_env_num, cfg.env.evaluator_env_num
collector_env_cfg = copy.deepcopy(cfg.env)
collector_env_cfg.train = True
evaluator_env_cfg = copy.deepcopy(cfg.env)
evaluator_env_cfg.train = False
collector_env = SyncSubprocessEnvManager(
env_fn=[lambda: GoBiggerSimpleEnv(collector_env_cfg) for _ in range(collector_env_num)], cfg=cfg.env.manager
)
rule_evaluator_env = SyncSubprocessEnvManager(
env_fn=[lambda: GoBiggerSimpleEnv(evaluator_env_cfg) for _ in range(evaluator_env_num)], cfg=cfg.env.manager
)
collector_env.seed(seed)
rule_evaluator_env.seed(seed, dynamic_seed=False)
set_pkg_seed(seed, use_cuda=cfg.policy.cuda)
model = GoBiggerHybridActionSimpleV3(**cfg.policy.model)
policy = DQNPolicy(cfg.policy, model=model)
team_num = cfg.env.team_num
rule_collect_policy = [RulePolicy(team_id, cfg.env.player_num_per_team) for team_id in range(1, team_num)]
rule_eval_policy = [RulePolicy(team_id, cfg.env.player_num_per_team) for team_id in range(1, team_num)]
eps_cfg = cfg.policy.other.eps
epsilon_greedy = get_epsilon_greedy_fn(eps_cfg.start, eps_cfg.end, eps_cfg.decay, eps_cfg.type)
tb_logger = SummaryWriter(os.path.join('./{}/log/'.format(cfg.exp_name), 'serial'))
learner = BaseLearner(
cfg.policy.learn.learner, policy.learn_mode, tb_logger, exp_name=cfg.exp_name, instance_name='learner'
)
collector = BattleSampleSerialCollector(
cfg.policy.collect.collector,
collector_env, [policy.collect_mode] + rule_collect_policy,
tb_logger,
exp_name=cfg.exp_name
)
rule_evaluator = BattleInteractionSerialEvaluator(
cfg.policy.eval.evaluator,
rule_evaluator_env, [policy.eval_mode] + rule_eval_policy,
tb_logger,
exp_name=cfg.exp_name,
instance_name='rule_evaluator'
)
replay_buffer = NaiveReplayBuffer(cfg.policy.other.replay_buffer, exp_name=cfg.exp_name)
for _ in range(max_iterations):
if rule_evaluator.should_eval(learner.train_iter):
rule_stop_flag, rule_reward, _ = rule_evaluator.eval(
learner.save_checkpoint, learner.train_iter, collector.envstep
)
if rule_stop_flag:
break
eps = epsilon_greedy(collector.envstep)
new_data, _ = collector.collect(train_iter=learner.train_iter, policy_kwargs={'eps': eps})
replay_buffer.push(new_data[0], cur_collector_envstep=collector.envstep)
for i in range(cfg.policy.learn.update_per_collect):
train_data = replay_buffer.sample(learner.policy.get_attribute('batch_size'), learner.train_iter)
learner.train(train_data, collector.envstep)
torch.cuda.empty_cache()
if __name__ == "__main__":
main(main_config)
模型文件
这个文件定义了神经网络模型的输入,输出,中间结构等信息,其中的三个方法是相互包含的关系,训练时使用的是GoBiggerHybridActionSimpleV3,而GoBiggerHybridActionSimpleV3中包含了Encoder,Encoder中又包含了RelationGCN,为了简洁明了,分成3个方法来写:
import torch
import torch.nn as nn
from ding.torch_utils import MLP, get_lstm, Transformer
from ding.model import DiscreteHead
from ding.utils import list_split
class RelationGCN(nn.Module):
def __init__(
self,
hidden_shape: int,
activation=nn.ReLU(inplace=True),
) -> None:
super(RelationGCN, self).__init__()
self.act = activation
self.thorn_relation_layers = MLP(
hidden_shape, hidden_shape, hidden_shape, layer_num=1, activation=activation
)
self.clone_relation_layers = MLP(
hidden_shape, hidden_shape, hidden_shape, layer_num=1, activation=activation
)
self.agg_relation_layers = MLP(
4 * hidden_shape, hidden_shape, hidden_shape, layer_num=1, activation=activation
)
def forward(self, food_relation, thorn_relation, clone, clone_relation, thorn_mask, clone_mask):
b, t, c = clone.shape[0], thorn_relation.shape[2], clone.shape[1]
thorn_relation = self.thorn_relation_layers(thorn_relation) * thorn_mask.view(b, 1, t, 1)
thorn_relation = thorn_relation.max(2).values
clone_relation = self.clone_relation_layers(clone_relation) * clone_mask.view(b, 1, c, 1)
clone_relation = clone_relation.max(2).values
agg_relation = torch.cat([clone, food_relation, thorn_relation, clone_relation], dim=2)
clone = self.agg_relation_layers(agg_relation)
return clone
class Encoder(nn.Module):
def __init__(
self,
scalar_shape: int,
food_shape: int,
food_relation_shape: int,
thorn_relation_shape: int,
clone_shape: int,
clone_relation_shape: int,
hidden_shape: int,
encode_shape: int,
activation=nn.ReLU(inplace=True),
) -> None:
super(Encoder, self).__init__()
self.scalar_encoder = MLP(
scalar_shape, hidden_shape // 4, hidden_shape, layer_num=2, activation=activation
)
layers = []
kernel_size = [5, 3, 1]
stride = [4, 2, 1]
shape = [hidden_shape // 4, hidden_shape // 2, hidden_shape]
input_shape = food_shape
for i in range(len(kernel_size)):
layers.append(nn.Conv2d(input_shape, shape[i], kernel_size[i], stride[i], kernel_size[i] // 2))
layers.append(activation)
input_shape = shape[i]
self.food_encoder = nn.Sequential(*layers)
self.food_relation_encoder = MLP(
food_relation_shape, hidden_shape // 2, hidden_shape, layer_num=2, activation=activation
)
self.thorn_relation_encoder = MLP(
thorn_relation_shape, hidden_shape // 4, hidden_shape, layer_num=2, activation=activation
)
self.clone_encoder = MLP(
clone_shape, hidden_shape // 4, hidden_shape, layer_num=2, activation=activation
)
self.clone_relation_encoder = MLP(
clone_relation_shape, hidden_shape // 4, hidden_shape, layer_num=2, activation=activation
)
self.gcn = RelationGCN(
hidden_shape, activation=activation
)
self.agg_encoder = MLP(
3 * hidden_shape, hidden_shape, encode_shape, layer_num=2, activation=activation
)
def forward(self, scalar, food, food_relation, thorn_relation, thorn_mask, clone, clone_relation, clone_mask):
scalar = self.scalar_encoder(scalar)
food = self.food_encoder(food)
food = food.reshape(*food.shape[:2], -1).max(-1).values
food_relation = self.food_relation_encoder(food_relation)
thorn_relation = self.thorn_relation_encoder(thorn_relation)
clone = self.clone_encoder(clone)
clone_relation = self.clone_relation_encoder(clone_relation)
clone = self.gcn(food_relation, thorn_relation, clone, clone_relation, thorn_mask, clone_mask)
clone = clone.max(1).values
return self.agg_encoder(torch.cat([scalar, food, clone], dim=1))
class GoBiggerHybridActionSimpleV3(nn.Module):
r"""
Overview:
The GoBiggerHybridAction model.
Interfaces:
``__init__``, ``forward``, ``compute_encoder``, ``compute_critic``
"""
def __init__(
self,
scalar_shape: int,
food_shape: int,
food_relation_shape: int,
thorn_relation_shape: int,
clone_shape: int,
clone_relation_shape: int,
hidden_shape: int,
encode_shape: int,
action_type_shape: int,
rnn: bool = False,
activation=nn.ReLU(inplace=True),
) -> None:
super(GoBiggerHybridActionSimpleV3, self).__init__()
self.activation = activation
self.action_type_shape = action_type_shape
self.encoder = Encoder(scalar_shape, food_shape, food_relation_shape, thorn_relation_shape, clone_shape, clone_relation_shape, hidden_shape, encode_shape, activation)
self.action_type_head = DiscreteHead(32, action_type_shape, layer_num=2, activation=self.activation)
def forward(self, inputs):
scalar = inputs['scalar']
food = inputs['food']
food_relation = inputs['food_relation']
thorn_relation = inputs['thorn_relation']
thorn_mask = inputs['thorn_mask']
clone = inputs['clone']
clone_relation = inputs['clone_relation']
clone_mask = inputs['clone_mask']
fused_embedding_total = self.encoder(scalar, food, food_relation, thorn_relation, thorn_mask, clone, clone_relation, clone_mask)
B = inputs['batch']
A = inputs['player_num_per_team']
action_type_logit = self.action_type_head(fused_embedding_total)['logit']
action_type_logit = action_type_logit.reshape(B, A, *action_type_logit.shape[1:])
result = {
'logit': action_type_logit,
}
return result
环境文件
from typing import Any, List, Union, Optional, Tuple
import time
import copy
import math
from collections import OrderedDict
import cv2
import numpy as np
from ding.envs import BaseEnv, BaseEnvTimestep, BaseEnvInfo
from ding.envs.common.env_element import EnvElement, EnvElementInfo
from ding.torch_utils import to_tensor, to_ndarray, to_list
from ding.utils import ENV_REGISTRY
from gobigger.server import Server
from gobigger.render import EnvRender
from .gobigger_env import GoBiggerEnv
@ENV_REGISTRY.register('gobigger_simple',force_overwrite=True)
class GoBiggerSimpleEnv(GoBiggerEnv):
'''
feature:
- old unit id setting, self team's team id is always 0, other team's team ids are rearranged to 1, 2, ... , team_num - 1, self player's id is always 0.
- old reward setting, which is defined as clip(new team size - old team size, -1, 1)
'''
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self._player_num_per_team = cfg.player_num_per_team
self._team_num = cfg.team_num
self._player_num = self._player_num_per_team * self._team_num
self._match_time = cfg.match_time
self._map_height = cfg.map_height
self._map_width = cfg.map_width
self._spatial = cfg.spatial
self._train = cfg.train
self._last_team_size = None
self._init_flag = False
self._speed = cfg.speed
self._all_vision = cfg.all_vision
self._cfg['obs_settings'] = dict(
with_spatial=self._spatial,
with_speed=self._speed,
with_all_vision=self._all_vision)
def _unit_id(self, unit_player, unit_team, ego_player, ego_team, team_size):
return unit_id(unit_player, unit_team, ego_player, ego_team, team_size)
def _obs_transform_eval(self, obs: tuple) -> list:
player_bot_obs = copy.deepcopy(obs)
global_state, player_state = obs
player_state = OrderedDict(player_state)
total_time = global_state['total_time']
last_time = global_state['last_time']
rest_time = total_time - last_time
obs = []
for n, value in player_state.items():
left_top_x, left_top_y, right_bottom_x, right_bottom_y = value['rectangle']
center_x, center_y = (left_top_x + right_bottom_x) / 2, (left_top_y + right_bottom_y) / 2
left_margin, right_margin = left_top_x, self._map_width - right_bottom_x
top_margin, bottom_margin = left_top_y, self._map_height - right_bottom_y
scalar_obs = np.array([rest_time / 1000, left_margin / 1000, right_margin / 1000, top_margin / 1000, bottom_margin / 1000])
overlap = value['overlap']
team_id, player_id = self._unit_id(n, value['team_name'], n, value['team_name'], self._player_num_per_team)
fake_thorn = np.array([[center_x, center_y, 0]]) if not self._speed else np.array([[center_x, center_y, 0, 0, 0]])
fake_clone = np.array([[center_x, center_y, 0, team_id, player_id]]) if not self._speed else np.array([[center_x, center_y, 0, 0, 0, team_id, player_id]])
food = overlap['food'] + overlap['spore']
thorn = np.array(overlap['thorns']) if len(overlap['thorns']) > 0 else fake_thorn
clone = np.array([[*x[:-2], *self._unit_id(x[-2], x[-1], n, value['team_name'], self._player_num_per_team)] for x in overlap['clone']]) if len(overlap['clone']) > 0 else fake_clone
overlap['spore'] = [x[:3] for x in overlap['spore']]
overlap['thorns'] = [x[:3] for x in overlap['thorns']]
overlap['clone'] = [[*x[:3], int(x[-2]), int(x[-1])] for x in overlap['clone']]
food, food_relation = food_encode(clone, food, left_top_x, left_top_y, right_bottom_x, right_bottom_y, team_id, player_id)
cl_ego = np.where((clone[:,-1]==team_id) & (clone[:,-2]==player_id))
cl_ego = clone[cl_ego]
cl_other = np.where((clone[:,-1]!=team_id) | (clone[:,-2]!=player_id))
cl_other = clone[cl_other]
if cl_other.size == 0:
cl_other = np.array([[center_x, center_y, 0, team_id+1, player_id]]) if not self._speed else np.array([[center_x, center_y, 0, 0, 0, team_id+1, player_id]])
thorn_relation = relation_encode(cl_ego, thorn)
clone_relation = relation_encode(cl_ego, cl_other)
clone = clone_encode(cl_ego, speed=self._speed)
player_obs = {
'scalar': scalar_obs.astype(np.float32),
'food': food.astype(np.float32),
'food_relation': food_relation.astype(np.float32),
'thorn_relation': thorn_relation.astype(np.float32),
'clone': clone.astype(np.float32),
'clone_relation': clone_relation.astype(np.float32),
'collate_ignore_raw_obs': {'overlap': overlap},
}
obs.append(player_obs)
team_obs = []
team_obs.append(team_obs_stack(obs[:self._player_num_per_team]))
return team_obs
def _obs_transform(self, obs: tuple) -> list:
player_bot_obs = copy.deepcopy(obs)
global_state, player_state = obs
player_state = OrderedDict(player_state)
total_time = global_state['total_time']
last_time = global_state['last_time']
rest_time = total_time - last_time
obs = []
for n, value in player_state.items():
left_top_x, left_top_y, right_bottom_x, right_bottom_y = value['rectangle']
center_x, center_y = (left_top_x + right_bottom_x) / 2, (left_top_y + right_bottom_y) / 2
left_margin, right_margin = left_top_x, self._map_width - right_bottom_x
top_margin, bottom_margin = left_top_y, self._map_height - right_bottom_y
scalar_obs = np.array([rest_time / 1000, left_margin / 1000, right_margin / 1000, top_margin / 1000, bottom_margin / 1000])
overlap = value['overlap']
team_id, player_id = self._unit_id(n, value['team_name'], n, value['team_name'], self._player_num_per_team)
fake_thorn = np.array([[center_x, center_y, 0]]) if not self._speed else np.array([[center_x, center_y, 0, 0, 0]])
fake_clone = np.array([[center_x, center_y, 0, team_id, player_id]]) if not self._speed else np.array([[center_x, center_y, 0, 0, 0, team_id, player_id]])
food = overlap['food'] + overlap['spore']
thorn = np.array(overlap['thorns']) if len(overlap['thorns']) > 0 else fake_thorn
clone = np.array([[*x[:-2], *self._unit_id(x[-2], x[-1], n, value['team_name'], self._player_num_per_team)] for x in overlap['clone']]) if len(overlap['clone']) > 0 else fake_clone
overlap['spore'] = [x[:3] for x in overlap['spore']]
overlap['thorns'] = [x[:3] for x in overlap['thorns']]
overlap['clone'] = [[*x[:3], int(x[-2]), int(x[-1])] for x in overlap['clone']]
food, food_relation = food_encode(clone, food, left_top_x, left_top_y, right_bottom_x, right_bottom_y, team_id, player_id)
cl_ego = np.where((clone[:,-1]==team_id) & (clone[:,-2]==player_id))
cl_ego = clone[cl_ego]
cl_other = np.where((clone[:,-1]!=team_id) | (clone[:,-2]!=player_id))
cl_other = clone[cl_other]
if cl_other.size == 0:
cl_other = np.array([[center_x, center_y, 0, team_id+1, player_id]]) if not self._speed else np.array([[center_x, center_y, 0, 0, 0, team_id+1, player_id]])
thorn_relation = relation_encode(cl_ego, thorn)
clone_relation = relation_encode(cl_ego, cl_other)
clone = clone_encode(cl_ego, speed=self._speed)
player_obs = {
'scalar': scalar_obs.astype(np.float32),
'food': food.astype(np.float32),
'food_relation': food_relation.astype(np.float32),
'thorn_relation': thorn_relation.astype(np.float32),
'clone': clone.astype(np.float32),
'clone_relation': clone_relation.astype(np.float32),
'collate_ignore_raw_obs': {'overlap': overlap},
}
obs.append(player_obs)
team_obs = []
for i in range(self._team_num):
team_obs.append(team_obs_stack(obs[i * self._player_num_per_team: (i + 1) * self._player_num_per_team]))
return team_obs
def info(self) -> BaseEnvInfo:
T = EnvElementInfo
return BaseEnvInfo(
agent_num=self._player_num,
obs_space=T(
{
'scalar': (5, ),
'food': (2, ),
},
{
'min': 0,
'max': 1,
'dtype': np.float32,
},
),
act_space=T(
(1, ),
{
'min': 0,
'max': 16,
'dtype': int,
},
),
rew_space=T(
(1, ),
{
'min': -1000.0,
'max': 1000.0,
'dtype': np.float32,
},
),
use_wrappers=None,
)
def unit_id(unit_player, unit_team, ego_player, ego_team, team_size):
unit_player, unit_team, ego_player, ego_team = int(unit_player) % team_size, int(unit_team), int(ego_player) % team_size, int(ego_team)
if unit_team != ego_team:
player_id = unit_player
team_id = unit_team if unit_team > ego_team else unit_team + 1
else:
if unit_player != ego_player:
player_id = unit_player if unit_player > ego_player else unit_player + 1
else:
player_id = 0
team_id = 0
return [team_id, player_id]
def food_encode(clone, food, left_top_x, left_top_y, right_bottom_x, right_bottom_y, team_id, player_id):
w = (right_bottom_x - left_top_x) // 16 + 1
h = (right_bottom_y - left_top_y) // 16 + 1
food_map = np.zeros((2, h, w))
w_ = (right_bottom_x - left_top_x) // 8 + 1
h_ = (right_bottom_y - left_top_y) // 8 + 1
food_grid = [ [ [] for j in range(w_) ] for i in range(h_) ]
food_relation = np.zeros((len(clone), 7 * 7 + 1, 3))
for p in food:
x = min(max(p[0], left_top_x), right_bottom_x) - left_top_x
y = min(max(p[1], left_top_y), right_bottom_y) - left_top_y
radius = p[2]
i, j = int(y // 16), int(x // 16)
food_map[0, i, j] += radius * radius
i, j = int(y // 8), int(x // 8)
food_grid[i][j].append([(x - 8 * j) / 8, (y - 8 * i) / 8, radius])
for c_id, p in enumerate(clone):
x = min(max(p[0], left_top_x), right_bottom_x) - left_top_x
y = min(max(p[1], left_top_y), right_bottom_y) - left_top_y
radius = p[2]
i, j = int(y // 16), int(x // 16)
if int(p[3]) == team_id and int(p[4]) == player_id:
food_map[1, i, j] += radius * radius
i, j = int(y // 8), int(x // 8)
t, b, l, r = max(i - 3, 0), min(i + 4, h_), max(j - 3, 0), min(j + 4, w_)
for ii in range(t, b):
for jj in range(l, r):
for f in food_grid[ii][jj]:
food_relation[c_id][(ii - t) * 7 + jj - l][0] = f[0]
food_relation[c_id][(ii - t) * 7 + jj - l][1] = f[1]
food_relation[c_id][(ii - t) * 7 + jj - l][2] += f[2] * f[2]
food_relation[c_id][-1][0] = (x - j * 8) / 8
food_relation[c_id][-1][1] = (y - i * 8) / 8
food_relation[c_id][-1][2] = radius / 10
food_map[0, :, :] = np.sqrt(food_map[0, :, :]) / 2
food_map[1, :, :] = np.sqrt(food_map[1, :, :]) / 10
food_relation[:, :-1, 2] = np.sqrt(food_relation[:, :-1, 2]) / 2
food_relation = food_relation.reshape(len(clone), -1)
return food_map, food_relation
def clone_encode(clone, speed=False):
pos = clone[:, :2] / 100
rds = clone[:, 2:3] / 10
ids = np.zeros((len(clone), 12))
ids[np.arange(len(clone)), (clone[:, -2] * 3 + clone[:, -1]).astype(np.int64)] = 1.0
split = (clone[:, 2:3] - 10) / 10
eject = (clone[:, 2:3] - 10) / 10
if not speed:
clone = np.concatenate([pos, rds, ids, split, eject], axis=1)
else:
spd = clone[:, 3:5] / 60
clone = np.concatenate([pos, rds, ids, split, eject, spd], axis=1)
return clone
def relation_encode(point_1, point_2):
pos_rlt_1 = point_2[None,:,:2] - point_1[:,None,:2]
pos_rlt_2 = np.linalg.norm(pos_rlt_1, ord=2, axis=2, keepdims=True)
pos_rlt_3 = point_1[:,None,2:3] - pos_rlt_2
pos_rlt_4 = point_2[None,:,2:3] - pos_rlt_2
pos_rlt_5 = (2 + np.sqrt(0.5)) * point_1[:,None,2:3] - pos_rlt_2
pos_rlt_6 = (2 + np.sqrt(0.5)) * point_2[None,:,2:3] - pos_rlt_2
rds_rlt_1 = point_1[:,None,2:3] - point_2[None,:,2:3]
rds_rlt_2 = np.sqrt(0.5) * point_1[:,None,2:3] - point_2[None,:,2:3]
rds_rlt_3 = np.sqrt(0.5) * point_2[None,:,2:3] - point_1[:,None,2:3]
rds_rlt_4 = point_1[:,None,2:3].repeat(len(point_2), axis=1)
rds_rlt_5 = point_2[None,:,2:3].repeat(len(point_1), axis=0)
relation = np.concatenate([pos_rlt_1 / 100, pos_rlt_2 / 100, pos_rlt_3 / 100, pos_rlt_4 / 100, pos_rlt_5 / 100, pos_rlt_6 / 100, rds_rlt_1 / 10, rds_rlt_2 / 10, rds_rlt_3 / 10, rds_rlt_4 / 10, rds_rlt_5 / 10], axis=2)
return relation
def team_obs_stack(team_obs):
result = {}
for k in team_obs[0].keys():
result[k] = [o[k] for o in team_obs]
return result
policy文件
from typing import List, Dict, Any, Tuple
from collections import namedtuple
import copy
import torch
from ding.torch_utils import Adam, to_device
from ding.rl_utils import q_nstep_td_data, q_nstep_td_error, get_nstep_return_data, get_train_sample
from ding.model import model_wrap
from ding.utils import POLICY_REGISTRY
from ding.utils.data import default_collate, default_decollate
from ding.policy.base_policy import Policy
import torch.nn.functional as F
def gobigger_collate(data):
r"""
Arguments:
- data (:obj:`list`): Lsit type data, [{scalar:[player_1_scalar, player_2_scalar, ...], ...}, ...]
"""
B, player_num_per_team = len(data), len(data[0]['scalar'])
data = {k: sum([d[k] for d in data], []) for k in data[0].keys() if not k.startswith('collate_ignore')}
clone_num = max([x.shape[0] for x in data['clone']])
thorn_num = max([x.shape[1] for x in data['thorn_relation']])
food_h = max([x.shape[1] for x in data['food']])
food_w = max([x.shape[2] for x in data['food']])
data['scalar'] = torch.stack([torch.as_tensor(x) for x in data['scalar']]).float()
data['food'] = torch.stack([F.pad(torch.as_tensor(x), (0, food_w - x.shape[2], 0, food_h - x.shape[1])) for x in data['food']]).float()
data['food_relation'] = torch.stack([F.pad(torch.as_tensor(x), (0, 0, 0, clone_num - x.shape[0])) for x in data['food_relation']]).float()
data['thorn_mask'] = torch.stack([torch.arange(thorn_num) < x.shape[1] for x in data['thorn_relation']]).float()
data['thorn_relation'] = torch.stack([F.pad(torch.as_tensor(x), (0, 0, 0, thorn_num - x.shape[1], 0, clone_num - x.shape[0])) for x in data['thorn_relation']]).float()
data['clone_mask'] = torch.stack([torch.arange(clone_num) < x.shape[0] for x in data['clone']]).float()
data['clone'] = torch.stack([F.pad(torch.as_tensor(x), (0, 0, 0, clone_num - x.shape[0])) for x in data['clone']]).float()
data['clone_relation'] = torch.stack([F.pad(torch.as_tensor(x), (0, 0, 0, clone_num - x.shape[1], 0, clone_num - x.shape[0])) for x in data['clone_relation']]).float()
data['batch'] = B
data['player_num_per_team'] = player_num_per_team
return data
def default_preprocess_learn(
data: List[Any],
use_priority_IS_weight: bool = False,
use_priority: bool = False,
use_nstep: bool = False,
ignore_done: bool = False,
) -> dict:
tmp = [d['obs'] for d in data]
tmp = {k: sum([d[k] for d in tmp], []) for k in tmp[0].keys() if not k.startswith('collate_ignore')}
max_clone_num = max([x.shape[0] for x in tmp['clone']])
limit = 52
mini_bs = int(len(data)//2)
if max_clone_num > limit:
split_data1 = data[:mini_bs]
split_data2 = data[mini_bs:]
re = []
for dt in (split_data1, split_data2):
obs = [d['obs'] for d in dt]
next_obs = [d['next_obs'] for d in dt]
for i in range(len(dt)):
dt[i] = {k: v for k, v in dt[i].items() if not 'obs' in k}
dt = default_collate(dt)
dt['obs'] = gobigger_collate(obs)
dt['next_obs'] = gobigger_collate(next_obs)
if ignore_done:
dt['done'] = torch.zeros_like(dt['done']).float()
else:
dt['done'] = dt['done'].float()
if use_priority_IS_weight:
assert use_priority, "Use IS Weight correction, but Priority is not used."
if use_priority and use_priority_IS_weight:
dt['weight'] = dt['IS']
else:
dt['weight'] = dt.get('weight', None)
if use_nstep:
reward = dt['reward']
if len(reward.shape) == 1:
reward = reward.unsqueeze(1)
dt['reward'] = reward.permute(1, 0).contiguous()
re.append(dt)
return re
obs = [d['obs'] for d in data]
next_obs = [d['next_obs'] for d in data]
for i in range(len(data)):
data[i] = {k: v for k, v in data[i].items() if not 'obs' in k}
data = default_collate(data)
data['obs'] = gobigger_collate(obs)
data['next_obs'] = gobigger_collate(next_obs)
if ignore_done:
data['done'] = torch.zeros_like(data['done']).float()
else:
data['done'] = data['done'].float()
if use_priority_IS_weight:
assert use_priority, "Use IS Weight correction, but Priority is not used."
if use_priority and use_priority_IS_weight:
data['weight'] = data['IS']
else:
data['weight'] = data.get('weight', None)
if use_nstep:
reward = data['reward']
if len(reward.shape) == 1:
reward = reward.unsqueeze(1)
data['reward'] = reward.permute(1, 0).contiguous()
return data
@POLICY_REGISTRY.register('gobigger_dqn')
class DQNPolicy(Policy):
r"""
Overview:
Policy class of DQN algorithm, extended by Double DQN/Dueling DQN/PER/multi-step TD.
Config:
== ==================== ======== ============== ======================================== =======================
ID Symbol Type Default Value Description Other(Shape)
== ==================== ======== ============== ======================================== =======================
1 ``type`` str dqn | RL policy register name, refer to | This arg is optional,
| registry ``POLICY_REGISTRY`` | a placeholder
2 ``cuda`` bool False | Whether to use cuda for network | This arg can be diff-
| erent from modes
3 ``on_policy`` bool False | Whether the RL algorithm is on-policy
| or off-policy
4 ``priority`` bool False | Whether use priority(PER) | Priority sample,
| update priority
5 | ``priority_IS`` bool False | Whether use Importance Sampling Weight
| ``_weight`` | to correct biased update. If True,
| priority must be True.
6 | ``discount_`` float 0.97, | Reward's future discount factor, aka. | May be 1 when sparse
| ``factor`` [0.95, 0.999] | gamma | reward env
7 ``nstep`` int 1, | N-step reward discount sum for target
[3, 5] | q_value estimation
8 | ``learn.update`` int 3 | How many updates(iterations) to train | This args can be vary
| ``per_collect`` | after collector's one collection. Only | from envs. Bigger val
| valid in serial training | means more off-policy
9 | ``learn.multi`` bool False | whether to use multi gpu during
| ``_gpu``
10 | ``learn.batch_`` int 64 | The number of samples of an iteration
| ``size``
11 | ``learn.learning`` float 0.001 | Gradient step length of an iteration.
| ``_rate``
12 | ``learn.target_`` int 100 | Frequence of target network update. | Hard(assign) update
| ``update_freq``
13 | ``learn.ignore_`` bool False | Whether ignore done for target value | Enable it for some
| ``done`` | calculation. | fake termination env
14 ``collect.n_sample`` int [8, 128] | The number of training samples of a | It varies from
| call of collector. | different envs
15 | ``collect.unroll`` int 1 | unroll length of an iteration | In RNN, unroll_len>1
| ``_len``
16 | ``other.eps.type`` str exp | exploration rate decay type | Support ['exp',
| 'linear'].
17 | ``other.eps. float 0.95 | start value of exploration rate | [0,1]
| start``
18 | ``other.eps. float 0.1 | end value of exploration rate | [0,1]
| end``
19 | ``other.eps. int 10000 | decay length of exploration | greater than 0. set
| decay`` | decay=10000 means
| the exploration rate
| decay from start
| value to end value
| during decay length.
== ==================== ======== ============== ======================================== =======================
"""
config = dict(
type='dqn',
cuda=False,
on_policy=False,
priority=False,
priority_IS_weight=False,
discount_factor=0.97,
nstep=1,
learn=dict(
multi_gpu=False,
update_per_collect=3,
batch_size=64,
learning_rate=0.001,
target_update_freq=100,
ignore_done=False,
),
collect=dict(
unroll_len=1,
),
eval=dict(),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=10000,
),
replay_buffer=dict(replay_buffer_size=10000, ),
),
)
def _init_learn(self) -> None:
"""
Overview:
Learn mode init method. Called by ``self.__init__``, initialize the optimizer, algorithm arguments, main \
and target model.
"""
self._priority = self._cfg.priority
self._priority_IS_weight = self._cfg.priority_IS_weight
self._optimizer = Adam(self._model.parameters(), lr=self._cfg.learn.learning_rate)
self._gamma = self._cfg.discount_factor
self._nstep = self._cfg.nstep
self._target_model = copy.deepcopy(self._model)
self._target_model = model_wrap(
self._target_model,
wrapper_name='target',
update_type='assign',
update_kwargs={'freq': self._cfg.learn.target_update_freq}
)
self._learn_model = model_wrap(self._model, wrapper_name='argmax_sample')
self._learn_model.reset()
self._target_model.reset()
def _forward_learn(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Overview:
Forward computation graph of learn mode(updating policy).
Arguments:
- data (:obj:`Dict[str, Any]`): Dict type data, a batch of data for training, values are torch.Tensor or \
np.ndarray or dict/list combinations.
Returns:
- info_dict (:obj:`Dict[str, Any]`): Dict type data, a info dict indicated training result, which will be \
recorded in text log and tensorboard, values are python scalar or a list of scalars.
ArgumentsKeys:
- necessary: ``obs``, ``action``, ``reward``, ``next_obs``, ``done``
- optional: ``value_gamma``, ``IS``
ReturnsKeys:
- necessary: ``cur_lr``, ``total_loss``, ``priority``
- optional: ``action_distribution``
"""
data = default_preprocess_learn(
data,
use_priority=self._priority,
use_priority_IS_weight=self._cfg.priority_IS_weight,
ignore_done=self._cfg.learn.ignore_done,
use_nstep=True
)
if isinstance(data, list):
self._optimizer.zero_grad()
for dt in data:
if self._cuda:
dt = to_device(dt, self._device)
self._learn_model.train()
self._target_model.train()
q_value = self._learn_model.forward(dt['obs'])['logit']
with torch.no_grad():
target_q_value = self._target_model.forward(dt['next_obs'])['logit']
target_q_action = self._learn_model.forward(dt['next_obs'])['action']
data_n = q_nstep_td_data(
q_value, target_q_value, dt['action'], target_q_action, dt['reward'], dt['done'], dt['weight']
)
value_gamma = dt.get('value_gamma')
loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep, value_gamma=value_gamma)
loss.backward()
if self._cfg.learn.multi_gpu:
self.sync_gradients(self._learn_model)
self._optimizer.step()
self._target_model.update(self._learn_model.state_dict())
return {
'cur_lr': self._optimizer.defaults['lr'],
'total_loss': loss.item(),
'q_value': q_value.mean().item(),
'priority': td_error_per_sample.abs().tolist(),
}
if self._cuda:
data = to_device(data, self._device)
self._learn_model.train()
self._target_model.train()
q_value = self._learn_model.forward(data['obs'])['logit']
with torch.no_grad():
target_q_value = self._target_model.forward(data['next_obs'])['logit']
target_q_action = self._learn_model.forward(data['next_obs'])['action']
data_n = q_nstep_td_data(
q_value, target_q_value, data['action'], target_q_action, data['reward'], data['done'], data['weight']
)
value_gamma = data.get('value_gamma')
loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep, value_gamma=value_gamma)
self._optimizer.zero_grad()
loss.backward()
if self._cfg.learn.multi_gpu:
self.sync_gradients(self._learn_model)
self._optimizer.step()
self._target_model.update(self._learn_model.state_dict())
return {
'cur_lr': self._optimizer.defaults['lr'],
'total_loss': loss.item(),
'q_value': q_value.mean().item(),
'priority': td_error_per_sample.abs().tolist(),
}
def _monitor_vars_learn(self) -> List[str]:
return ['cur_lr', 'total_loss', 'q_value']
def _state_dict_learn(self) -> Dict[str, Any]:
"""
Overview:
Return the state_dict of learn mode, usually including model and optimizer.
Returns:
- state_dict (:obj:`Dict[str, Any]`): the dict of current policy learn state, for saving and restoring.
"""
return {
'model': self._learn_model.state_dict(),
'target_model': self._target_model.state_dict(),
'optimizer': self._optimizer.state_dict(),
}
def _load_state_dict_learn(self, state_dict: Dict[str, Any]) -> None:
"""
Overview:
Load the state_dict variable into policy learn mode.
Arguments:
- state_dict (:obj:`Dict[str, Any]`): the dict of policy learn state saved before.
.. tip::
If you want to only load some parts of model, you can simply set the ``strict`` argument in \
load_state_dict to ``False``, or refer to ``ding.torch_utils.checkpoint_helper`` for more \
complicated operation.
"""
self._learn_model.load_state_dict(state_dict['model'])
self._target_model.load_state_dict(state_dict['target_model'])
self._optimizer.load_state_dict(state_dict['optimizer'])
def _init_collect(self) -> None:
"""
Overview:
Collect mode init method. Called by ``self.__init__``, initialize algorithm arguments and collect_model, \
enable the eps_greedy_sample for exploration.
"""
self._unroll_len = self._cfg.collect.unroll_len
self._gamma = self._cfg.discount_factor
self._nstep = self._cfg.nstep
self._collect_model = model_wrap(self._model, wrapper_name='eps_greedy_sample')
self._collect_model.reset()
def _forward_collect(self, data: Dict[int, Any], eps: float) -> Dict[int, Any]:
"""
Overview:
Forward computation graph of collect mode(collect training data), with eps_greedy for exploration.
Arguments:
- data (:obj:`Dict[str, Any]`): Dict type data, stacked env data for predicting policy_output(action), \
values are torch.Tensor or np.ndarray or dict/list combinations, keys are env_id indicated by integer.
- eps (:obj:`float`): epsilon value for exploration, which is decayed by collected env step.
Returns:
- output (:obj:`Dict[int, Any]`): The dict of predicting policy_output(action) for the interaction with \
env and the constructing of transition.
ArgumentsKeys:
- necessary: ``obs``
ReturnsKeys
- necessary: ``logit``, ``action``
"""
data_id = list(data.keys())
data = gobigger_collate(list(data.values()))
if self._cuda:
data = to_device(data, self._device)
self._collect_model.eval()
with torch.no_grad():
output = self._collect_model.forward(data, eps=eps)
if self._cuda:
output = to_device(output, 'cpu')
output = default_decollate(output)
return {i: d for i, d in zip(data_id, output)}
def _get_train_sample(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Overview:
For a given trajectory(transitions, a list of transition) data, process it into a list of sample that \
can be used for training directly. A train sample can be a processed transition(DQN with nstep TD) \
or some continuous transitions(DRQN).
Arguments:
- data (:obj:`List[Dict[str, Any]`): The trajectory data(a list of transition), each element is the same \
format as the return value of ``self._process_transition`` method.
Returns:
- samples (:obj:`dict`): The list of training samples.
.. note::
We will vectorize ``process_transition`` and ``get_train_sample`` method in the following release version. \
And the user can customize the this data processing procecure by overriding this two methods and collector \
itself.
"""
data = get_nstep_return_data(data, self._nstep, gamma=self._gamma)
return get_train_sample(data, self._unroll_len)
def _process_transition(self, obs: Any, policy_output: Dict[str, Any], timestep: namedtuple) -> Dict[str, Any]:
"""
Overview:
Generate a transition(e.g.: <s, a, s', r, d>) for this algorithm training.
Arguments:
- obs (:obj:`Any`): Env observation.
- policy_output (:obj:`Dict[str, Any]`): The output of policy collect mode(``self._forward_collect``),\
including at least ``action``.
- timestep (:obj:`namedtuple`): The output after env step(execute policy output action), including at \
least ``obs``, ``reward``, ``done``, (here obs indicates obs after env step).
Returns:
- transition (:obj:`dict`): Dict type transition data.
"""
transition = {
'obs': obs,
'next_obs': timestep.obs,
'action': policy_output['action'],
'reward': timestep.reward,
'done': timestep.done,
}
return transition
def _init_eval(self) -> None:
r"""
Overview:
Evaluate mode init method. Called by ``self.__init__``, initialize eval_model.
"""
self._eval_model = model_wrap(self._model, wrapper_name='argmax_sample')
self._eval_model.reset()
def _forward_eval(self, data: Dict[int, Any]) -> Dict[int, Any]:
"""
Overview:
Forward computation graph of eval mode(evaluate policy performance), at most cases, it is similar to \
``self._forward_collect``.
Arguments:
- data (:obj:`Dict[str, Any]`): Dict type data, stacked env data for predicting policy_output(action), \
values are torch.Tensor or np.ndarray or dict/list combinations, keys are env_id indicated by integer.
Returns:
- output (:obj:`Dict[int, Any]`): The dict of predicting action for the interaction with env.
ArgumentsKeys:
- necessary: ``obs``
ReturnsKeys
- necessary: ``action``
"""
data_id = list(data.keys())
data = gobigger_collate(list(data.values()))
if self._cuda:
data = to_device(data, self._device)
self._eval_model.eval()
with torch.no_grad():
output = self._eval_model.forward(data)
if self._cuda:
output = to_device(output, 'cpu')
output = default_decollate(output)
return {i: d for i, d in zip(data_id, output)}
def default_model(self) -> Tuple[str, List[str]]:
"""
Overview:
Return this algorithm default model setting for demonstration.
Returns:
- model_info (:obj:`Tuple[str, List[str]]`): model name and mode import_names
.. note::
The user can define and use customized network model but must obey the same inferface definition indicated \
by import_names path. For DQN, ``ding.model.template.q_learning.DQN``
"""
return 'dqn', ['ding.model.template.q_learning']
评估策略
训练完成后,运行此文件来评估策略:
import os
import numpy as np
import copy
from tensorboardX import SummaryWriter
import sys
sys.path.append('..')
from ding.config import compile_config
from ding.worker import BaseLearner, BattleSampleSerialCollector, BattleInteractionSerialEvaluator, NaiveReplayBuffer
from ding.envs import SyncSubprocessEnvManager, BaseEnvManager
from policy.gobigger_policy import DQNPolicy
from ding.utils import set_pkg_seed
from gobigger.agents import BotAgent
from envs import GoBiggerSimpleEnv
from model import GoBiggerHybridActionSimpleV3
from config.gobigger_no_spatial_config import main_config
import torch
import argparse
class RulePolicy:
def __init__(self, team_id: int, player_num_per_team: int):
self.collect_data = False
self.team_id = team_id
self.player_num = player_num_per_team
start, end = team_id * player_num_per_team, (team_id + 1) * player_num_per_team
self.bot = [BotAgent(str(i)) for i in range(start, end)]
def forward(self, data: dict, **kwargs) -> dict:
ret = {}
for env_id in data.keys():
action = []
for bot, raw_obs in zip(self.bot, data[env_id]['collate_ignore_raw_obs']):
raw_obs['overlap']['clone'] = [[x[0], x[1], x[2], int(x[3]), int(x[4])] for x in raw_obs['overlap']['clone']]
action.append(bot.step(raw_obs))
ret[env_id] = {'action': np.array(action)}
return ret
def reset(self, data_id: list = []) -> None:
pass
def main(cfg, ckpt_path, seed=0):
cfg.exp_name = 'gobigger_vsbot_eval'
cfg.env.spatial = True
cfg.env.evaluator_env_num = 3
cfg.env.n_evaluator_episode = 3
cfg = compile_config(
cfg,
BaseEnvManager,
DQNPolicy,
BaseLearner,
BattleSampleSerialCollector,
BattleInteractionSerialEvaluator,
NaiveReplayBuffer,
save_cfg=True
)
evaluator_env_num = cfg.env.evaluator_env_num
rule_env_cfgs = []
for i in range(evaluator_env_num):
rule_env_cfg = copy.deepcopy(cfg.env)
rule_env_cfg.train = False
rule_env_cfg.save_video = True
rule_env_cfg.save_quality = 'low'
rule_env_cfg.save_path = './{}/rule'.format(cfg.exp_name)
if not os.path.exists(rule_env_cfg.save_path):
os.makedirs(rule_env_cfg.save_path)
rule_env_cfgs.append(rule_env_cfg)
rule_evaluator_env = BaseEnvManager(
env_fn=[lambda: GoBiggerSimpleEnv(x) for x in rule_env_cfgs], cfg=cfg.env.manager
)
rule_evaluator_env.seed(seed, dynamic_seed=False)
set_pkg_seed(seed, use_cuda=cfg.policy.cuda)
model = GoBiggerHybridActionSimpleV3(**cfg.policy.model)
policy = DQNPolicy(cfg.policy, model=model)
policy.eval_mode.load_state_dict(torch.load(ckpt_path))
team_num = cfg.env.team_num
rule_eval_policy = [RulePolicy(team_id, cfg.env.player_num_per_team) for team_id in range(1, team_num)]
tb_logger = SummaryWriter(os.path.join('./{}/log/'.format(cfg.exp_name), 'serial'))
rule_evaluator = BattleInteractionSerialEvaluator(
cfg.policy.eval.evaluator,
rule_evaluator_env, [policy.eval_mode] + rule_eval_policy,
tb_logger,
exp_name=cfg.exp_name,
instance_name='rule_evaluator'
)
rule_evaluator.eval()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='evaluation')
parser.add_argument('--ckpt', '-c', help='checkpoint for evaluation')
args = parser.parse_args()
main(main_config, ckpt_path = args.ckpt)
my_submission.py文件:
这个文件是运行我们程序的接口文件,我们上传的submission进行评估时调用get_actions返回相应的动作,通过这些动作来控制游戏中的智能体与敌人对战:
import random
import os
import gym
import numpy as np
import copy
import torch
import time
from ding.config import compile_config
from .policy.gobigger_policy import DQNPolicy
from .envs import GoBiggerSimpleEnv
from .model import GoBiggerHybridActionSimpleV3
from .config.gobigger_no_spatial_config import main_config
class BaseSubmission:
def __init__(self, team_name, player_names):
self.team_name = team_name
self.player_names = player_names
def get_actions(self, obs):
'''
Overview:
You must implement this function.
'''
raise NotImplementedError
class MySubmission(BaseSubmission):
def __init__(self, team_name, player_names):
super(MySubmission, self).__init__(team_name, player_names)
self.cfg = copy.deepcopy(main_config)
self.cfg = compile_config(
self.cfg,
policy=DQNPolicy,
save_cfg=False,
)
self.cfg.env.train = False
print(self.cfg)
self.root_path = os.path.abspath(os.path.dirname(__file__))
self.model = GoBiggerHybridActionSimpleV3(**self.cfg.policy.model)
self.model.load_state_dict(torch.load(os.path.join(self.root_path, 'supplements', 'ckpt_best.pth.tar'), map_location='cpu')['model'])
self.policy = DQNPolicy(self.cfg.policy, model=self.model).eval_mode
self.env = GoBiggerSimpleEnv(self.cfg.env)
def get_actions(self, obs):
obs_transform = self.env._obs_transform_eval(obs)[0]
obs_transform = {0: obs_transform}
raw_actions = self.policy.forward(obs_transform)[0]['action']
raw_actions = raw_actions.tolist()
actions = {n: GoBiggerSimpleEnv._to_raw_action(a) for n, a in zip(obs[1].keys(), raw_actions)}
return actions
高级篇
模仿学习
对于比较复杂的任务,通常在进行强化学习之前需要进行模仿学习,也就是预训练,从而得到一个比随机策略合理得多的网络,再进行强化学习的时候就能大大加快训练速度,提高训练的上限。官方提供的baseline4.0的版本就在强化学习之前进行了行为克隆(Behavior Cloning),使得训练速度大大加快。
首先,先运行generate_data_opensource.py文件,这个文件能够产生大量的数据,仅仅运行了一个晚上,就能产生上百G的数据(注意电脑空间,量力而行)。
import os
import sys
import importlib
import logging
import time
import argparse
from uuid import uuid1
import pickle
import multiprocessing
import random
from gobigger.server import Server
from gobigger.render import RealtimeRender, RealtimePartialRender, EnvRender
logging.basicConfig(level=logging.INFO)
"""
GoBigger 离线数据集 For SL
保存内容:
1. replay 文件,包含随机种子和每一帧的动作。依靠这个文件可以复现这局游戏
2. 更详细的 obs 和 action
每场对局都会保存replay文件(以.replay结尾)和 obs & action(以.data结尾)
.replay文件结构:是一个字典,包含以下字段
seed: 对局随机数种子
actions: 对局中每个动作帧所执行的动作
agent_name: 参与对局的agent名称
leaderboard: 本次对局最终排名和分数
.data文件结构:是一个字典,包含以下字段
observation: 对局中每个动作帧获取到的obs,是最原始的obs
actions: 对局中每个动作帧所执行的动作
使用方式:
python -u generate_data.py
"""
AVAILABLE_AGENTS = ['bot', 'bot', 'bot', 'bot']
class BotSubmission:
def __init__(self, team_name, player_names):
self.team_name = team_name
self.player_names = player_names
self.agents = {}
for player_name in self.player_names:
self.agents[player_name] = BotAgent(name=player_name)
def get_actions(self, obs):
global_state, player_states = obs
actions = {}
for player_name, agent in self.agents.items():
action = agent.step(player_states[player_name])
actions[player_name] = action
return actions
class DataUtil:
def __init__(self, agent_names, save_path_prefix):
self.agent_names = agent_names
self.save_path_prefix = save_path_prefix
if not os.path.isdir(self.save_path_prefix):
os.mkdir(self.save_path_prefix)
if self.agent_names == '':
self.agent_names = random.sample(AVAILABLE_AGENTS, 4)
def launch_a_game(self, seed=None):
data_simple = {'seed': None, 'actions': [], 'agent_names': self.agent_names}
data_hard = {
'observations': [],
'actions': []
}
if seed is None:
t = str(time.time()).strip().split('.')
seed = int(t[0]+t[1])
data_simple['seed'] = seed
server = Server(dict(
team_num=4,
player_num_per_team=3,
match_time=60*10,
obs_settings=dict(
with_spatial=False,
)
), seed)
render = EnvRender(server.map_width, server.map_height)
server.set_render(render)
server.reset()
team_player_names = server.get_team_names()
team_names = list(team_player_names.keys())
agents, teams_agents_dict = self.init_agents(team_names, team_player_names)
for i in range(1000000):
obs = server.obs()
global_state, player_states = obs
actions = {}
for agent in agents:
agent_obs = [global_state, {
player_name: player_states[player_name] for player_name in agent.player_names
}]
try:
actions.update(agent.get_actions(agent_obs))
except:
fake_action = {
player_name: [None, None, -1] for player_name in agent.player_names
}
actions.update(fake_action)
finish_flag = server.step(actions=actions)
data_simple['actions'].append(actions)
data_hard['observations'].append(obs)
data_hard['actions'].append(actions)
logging.debug('{} lastime={:.3f}, leaderboard={}'.format(i, server.last_time, global_state['leaderboard']))
if finish_flag:
data_simple['leaderboard'] = global_state['leaderboard']
logging.debug('Game Over')
break
file_name = str(uuid1()) + "-" + str(seed)
replay_path = os.path.join(self.save_path_prefix, file_name+'.replay')
with open(replay_path, "wb") as f:
pickle.dump(data_simple, f)
data_path = os.path.join(self.save_path_prefix, file_name+'.data')
with open(data_path, "wb") as f:
pickle.dump(data_hard, f)
logging.info('save as: {} {}'.format(replay_path, data_path))
def init_agents(self, team_names, team_player_names):
agents = []
teams_agents_dict = {}
for index, agent_name in enumerate(self.agent_names):
agents.append(BotSubmission(team_name=team_names[index],
player_names=team_player_names[team_names[index]]))
teams_agents_dict[team_names[index]] = agent_name
return agents, teams_agents_dict
def generate_data(agent_names, save_path_prefix):
data_util = DataUtil(agent_names, save_path_prefix)
while True:
data_util.launch_a_game()
def generate_data_multi(num_worker, agent_names, save_path_prefix):
all_p = []
for i in range(0, num_worker):
try:
p = multiprocessing.Process(target=generate_data, args=(agent_names, save_path_prefix,))
p.start()
all_p.append(p)
time.sleep(1)
time.sleep(random.random())
except Exception as e:
print('!!!!!!!!!!!!!!!! {} failed, because {} !!!!!!!!!!!!!!!!!'.format(i, str(e)))
continue
for p in all_p:
p.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--agent-names', type=str, default='')
parser.add_argument('-s', '--save-path-prefix', type=str, default='replays')
parser.add_argument('-n', '--num-worker', type=int, default=1)
args = parser.parse_args()
if args.agent_names != '':
args.agent_names = args.agent_names.strip().split(',')
generate_data_multi(args.num_worker, args.agent_names, args.save_path_prefix)
生成的数据会保存在reply文件夹中。在linux环境下我们在该目录下使用命令:
ls *.data > ../replays.txt.train
把数据目录写进一个文件中,便于代码调用。
然后在sl文件夹下执行以下命令,就能开始训练
python train.py -c ./exp/sample/config.yaml
训练分为两个阶段,第一阶段是用我们上面收集到的数据,进行监督式的模仿学习。这就是train.py要做的事情,完成模仿学习后,把神经网络模型导入到强化学习agent的策略模型中,就能继续进行强化学习。
train.py代码:
import sys
sys.path.append('..')
sys.path.append('.')
from easydict import EasyDict
import argparse
import yaml
from sl_learner import SLLearner
def parse_config(config_file):
with open(config_file) as f:
config = yaml.load(f, Loader=yaml.FullLoader)
config = EasyDict(config)
return config
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', type=str, default='')
args = parser.parse_args()
assert args.config != '', 'Please set config!'
cfg = parse_config(args.config)
sl_learner = SLLearner(cfg)
sl_learner.train()
sl_learner.py(看train函数):
import os
import sys
import random
import torch
import torch.nn.functional as F
import pickle
import time
from easydict import EasyDict
from collections import OrderedDict
import numpy as np
import copy
import logging
import datetime
import multiprocessing as mp
from tensorboardX import SummaryWriter
sys.path.append('..')
sys.path.append('.')
from data import SLDataLoader, SLShareDataLoader
from model import GoBiggerHybridActionSimpleV3
from ding.model import model_wrap
from ding.torch_utils import to_device
from utils.misc import AverageMeter, accuracy, create_logger, get_logger
logging.basicConfig(level=logging.INFO)
class SLLearner:
@staticmethod
def default_config():
cfg = dict(
use_cuda=True,
learning_rate=0.01,
milestones=[20000,40000,60000,80000],
gamma=0.8,
weight_decay=0.00005,
max_iterations=100000,
save_frequency=5000,
exp_path='exp',
exp_name='sample',
print_freq=1,
resume_ckpt='',
model=dict(
scalar_shape=5,
food_shape=2,
food_relation_shape=150,
thorn_relation_shape=12,
clone_shape=17,
clone_relation_shape=12,
hidden_shape=128,
encode_shape=32,
action_type_shape=16,
),
data=dict(
team_num=4,
player_num_per_team=3,
batch_size=40,
cache_size=120,
train_data_prefix='PATH/replays',
train_data_file='PATH/replays.txt.train',
worker_num=40,
angle_split_num=4,
action_type_num=4,
),
)
return EasyDict(cfg)
def __init__(self, cfg):
self.cfg = cfg
assert self.cfg.model.action_type_shape == self.cfg.data.angle_split_num * self.cfg.data.action_type_num
self.model = GoBiggerHybridActionSimpleV3(**self.cfg.model)
self.model = model_wrap(self.model, wrapper_name='argmax_sample')
if self.cfg.use_cuda:
self.model.cuda()
self.loader = SLShareDataLoader(self.cfg.data)
self.criterion = torch.nn.CrossEntropyLoss()
self.optimizer = torch.optim.Adam(
self.model.parameters(),
lr=self.cfg.learning_rate,
weight_decay=self.cfg.weight_decay
)
self.lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
self.optimizer,
milestones=self.cfg.milestones,
gamma=self.cfg.gamma
)
if not os.path.isdir(self.cfg.exp_path):
os.mkdir(self.cfg.exp_path)
self.root_path = os.path.join(self.cfg.exp_path, self.cfg.exp_name)
if not os.path.isdir(self.root_path):
os.mkdir(self.root_path)
self.log_path = os.path.join(self.root_path, 'log.txt')
self.save_path = os.path.join(self.root_path, 'ckpts')
self.event_path = os.path.join(self.root_path, 'events')
if not os.path.isdir(self.save_path):
os.mkdir(self.save_path)
if not os.path.isdir(self.event_path):
os.mkdir(self.event_path)
create_logger(self.log_path, level=logging.INFO)
self.logger = get_logger(self.log_path)
self.logger.info(self.cfg)
self.tb_logger = SummaryWriter(self.event_path)
def pre_train(self):
self.meters = EasyDict()
self.meters.batch_time = AverageMeter(self.cfg.print_freq)
self.meters.forward_time = AverageMeter(self.cfg.print_freq)
self.meters.data_time = AverageMeter(self.cfg.print_freq)
self.meters.losses = AverageMeter(self.cfg.print_freq)
self.meters.top1 = AverageMeter(self.cfg.print_freq)
self.model.train()
def load_ckpt(self):
if self.cfg.resume_ckpt and os.path.isfile(self.cfg.resume_ckpt):
self.state = torch.load(self.cfg.resume_ckpt, map_location='cpu')
self.logger.info("Recovering from {}".format(self.cfg.resume_ckpt))
self.model.load_state_dict(self.state['model'], strict=False)
def train(self):
self.load_ckpt()
self.pre_train()
for i in range(self.cfg.max_iterations):
t1 = time.time()
data, label = next(self.loader)
self.lr_scheduler.step(i)
current_lr = self.lr_scheduler.get_lr()[0]
self.meters.data_time.update(time.time() - t1)
label = torch.tensor(label, dtype=torch.long)
if self.cfg.use_cuda:
data = to_device(data, 'cuda:0')
label = to_device(label, 'cuda:0').view(-1)
else:
data = to_device(data, 'cpu')
label = to_device(label, 'cpu').view(-1)
t3 = time.time()
output = self.model.forward(data)
self.meters.forward_time.update(time.time() - t3)
logit = output['logit'].view(-1, self.cfg.model.action_type_shape)
action = output['action']
loss = self.criterion(logit, label)
loss = loss.sum()
prec1 = accuracy(logit, label, topk=(1,))[0]
reduced_loss = loss.clone()
self.meters.losses.reduce_update(reduced_loss)
self.meters.top1.update(prec1)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.meters.batch_time.update(time.time() - t1)
if i % self.cfg.print_freq == 0:
self.tb_logger.add_scalar('loss_train', self.meters.losses.avg, i)
self.tb_logger.add_scalar('acc1_train', self.meters.top1.avg, i)
self.tb_logger.add_scalar('lr', current_lr, i)
remain_secs = (self.cfg.max_iterations - i) * self.meters.batch_time.avg
remain_time = datetime.timedelta(seconds=round(remain_secs))
finish_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+remain_secs))
log_msg = 'Iter: [{}/{}]\t'.format(i, self.cfg.max_iterations)
log_msg += 'Time {:.3f} ({:.3f})\t'.format(self.meters.batch_time.val, self.meters.data_time.avg)
log_msg += 'Data {:.3f} ({:.3f})\t'.format(self.meters.data_time.val, self.meters.data_time.avg)
log_msg += 'Forward {:.3f} ({:.3f})\t'.format(self.meters.forward_time.val, self.meters.forward_time.avg)
log_msg += 'Loss {:.4f} ({:.4f})\t'.format(self.meters.losses.val, self.meters.losses.avg)
log_msg += 'Prec@1 {:.3f} ({:.3f})\t'.format(self.meters.top1.val.item(), self.meters.top1.avg.item())
log_msg += 'LR {:.4f}\t'.format(current_lr)
log_msg += 'Remaining Time {} ({})'.format(remain_time, finish_time)
self.logger.info(log_msg)
if i % self.cfg.save_frequency == 0:
state = {}
state['model'] = self.model.state_dict()
state['optimizer'] = self.optimizer.state_dict()
state['last_iter'] = i
ckpt_name = os.path.join(self.save_path, '{}.pth.tar'.format(i))
torch.save(state, ckpt_name)
在运行完train.py后,我们可以得到一系列的模型文件(存放于exp/sample/ckpts中),我们通过tensorboard来查看训练的准确率(路径填exp/sample/events的路径):
tensorboard --logdir <PATH>
训练完后,正确率接近90%。
强化学习
在baseline4.0中,官方使用了一种名为go-explore的方法,加快了模型的训练速度,我们只需要运行go-explore文件夹中的gobigger_vsbot_explore_main.py即可。这个文件与3.0版本的gobigger_vsbot_baseline_simple_main.py类似,只有环境文件略有不同。在进行强化学习之前,需要导入我们模仿学习的成果。
把我们训练好的模仿学习的模型文件(最后一个保存的)拖入到go-explore文件夹中,改名为start.pth.tar。在gobigger_policy.py中_init_learn函数中添加以下几行程序:
if self._cfg.pretrain:
ckpt = torch.load(self._cfg.pretrain_path)
ckpt.pop('optimizer')
ckpt.pop('last_iter')
encoder_ckpt = OrderedDict()
for k,v in ckpt['model'].items():
if 'action_type_head' not in k:
encoder_ckpt[k] = v
self._learn_model.load_state_dict(encoder_ckpt,strict = False)
self._target_model.load_state_dict(encoder_ckpt,strict = False)
print("Successful to load your pretrain model",self._cfg.pretrain_path)
最前面还要加上:
from collections import OrderedDict
在config文件中加上对应的属性,policy属性中加上pretrain = True,pretrain_path = “start.pth.tar”。
然后运行gobigger_vsbot_explore_main.py即可读取模仿学习的模型文件进行强化学习。
提交之前:
在我们使用gobigger_vsbot_explore_main.py后需要更改my_submission.py:
import random
import os
import gym
import numpy as np
import copy
import torch
import time
from ding.config import compile_config
from .policy.gobigger_policy import DQNPolicy
from .envs import GoBiggerExploreEnv
from .model import GoBiggerHybridActionSimpleV3
from .go_explore.config import main_config
class BaseSubmission:
def __init__(self, team_name, player_names):
self.team_name = team_name
self.player_names = player_names
def get_actions(self, obs):
raise NotImplementedError
class MySubmission(BaseSubmission):
def __init__(self, team_name, player_names):
super(MySubmission, self).__init__(team_name, player_names)
self.cfg = copy.deepcopy(main_config)
self.cfg = compile_config(
self.cfg,
policy=DQNPolicy,
save_cfg=False,
)
self.cfg.env.train = False
print(self.cfg)
self.root_path = os.path.abspath(os.path.dirname(__file__))
self.model = GoBiggerHybridActionSimpleV3(**self.cfg.policy.model)
self.model.load_state_dict(torch.load(os.path.join(self.root_path, 'supplements', 'ckpt_best.pth.tar'), map_location='cpu')['model'])
self.policy = DQNPolicy(self.cfg.policy, model=self.model).eval_mode
self.env = GoBiggerExploreEnv(self.cfg.env)
def get_actions(self, obs):
obs_transform = self.env._obs_transform_eval(obs)[0]
obs_transform = {0: obs_transform}
raw_actions = self.policy.forward(obs_transform)[0]['action']
raw_actions = raw_actions.tolist()
actions = {n: GoBiggerExploreEnv._to_raw_action(a) for n, a in zip(obs[1].keys(), raw_actions)}
return actions
我们需要把go-explore文件夹改为go_explore,否则头文件导入不了。
同时我们还需要给GoBiggerExploreEnv加上_obs_transform_eval函数,从GoBiggerSimpleEnv幅值过来即可。
复制一个训练好的模型到根目录下supplements文件夹改名为ckpt_best.pth.tar。
用test函数检查通过,就可以提交了。 注意:如果训练中断,则可能需要改小batch_size和relpay_buffer_size,以及num_workers。
方法改进
在本次比赛中,为了得到更好的效果,可以在baseline的基础上进行以下改进:
1、写一个更强的硬编码游戏AI,就能得到更好的模仿学习模型,从而使得强化学习的起点和上限更高。
2、改进强化学习的神经网络模型,其中包括考虑提取什么样的特征作为神经网络的输入更为有效,采取连续还是离散的输出更为有效,隐藏层的结构等等,这部分通常比较复杂。
3、让智能体在多样化、针对性的环境中进行学习,官方提供了类似的针对性的环境,有利于智能体针对性学习某种技能。
4、使用更加强大,更加鲁棒的强化学习算法,baseline用的是DQN,且未涉及多智能体算法,我们可以对其进行替换。
5、对奖励函数进行设计,在baseline中只有一个奖励的指标,那就是球获得的质量,我们可以基于自己对游戏的理解上设定各种各样的指标,奖励函数设计的好往往有奇效,比上面的方法更简单,效果更直接。
|