通信形式
根据agent的不同通信方式,可分为:
- 竞争型:两个或多个的agent试图击败对方以最大化他们的奖励
- 协作型:一组agent需要共同努力以实现某个目标
强化学习方法
agent共享我们正在优化的策略,但是观察将从agent的角度给出,并包含有关其他agent位置的信息。环境也要进行预处理,对多个agent进行处理。
随机环境
基于MAgent实现一个tiger deer的随机森林环境
import os
import sys
sys.path.append(os.path.join(os.getcwd(), "MAgent/python"))
import magent
from magent.builtin.rule_model import RandomActor
MAP_SIZE = 64
if __name__ == "__main__":
env = magent.GridWorld("forest", map_size=MAP_SIZE)
env.set_render_dir("render")
deer_handle, tiger_handle = env.get_handles()
models = [
RandomActor(env, deer_handle),
RandomActor(env, tiger_handle),
]
env.reset()
env.add_walls(method="random", n=MAP_SIZE * MAP_SIZE * 0.04)
env.add_agents(deer_handle, method="random", n=5)
env.add_agents(tiger_handle, method="random", n=2)
"""
输出的示例:
Tiger view:(9, 9, 5), feature(20, )
Deer view:(3, 3, 5), feature(16, )
这表示每只tiger都会得到具有5个不同信息层面的agent位于中心的9x9矩阵:
walls:如果单元格包含墙则为1,否则为0
Group1(agent这组):如果单元格包含来自该组的其他agent则为1,否则为0
Group1的体力:agent在此单元格的相对体力状况
Group2 agent:如果单元格有一个敌人则为1
Group2的体力:敌人的相对体力状况 若没有敌人则为0
feature表示为数字向量 包含one-hot编码的agent ID、最后一个动作、最后一个奖励和标准化的位置
"""
v = env.get_view_space(tiger_handle)
r = env.get_feature_space(tiger_handle)
print("Tiger view: %s, features: %s" % (v, r))
vv = env.get_view_space(deer_handle)
rr = env.get_feature_space(deer_handle)
print("Deer view: %s, features: %s" % (vv, rr))
"""
Tiger obs: (2, 9, 9, 5), (2, 20)
Deer obs: (5, 3, 3, 5), (5, 16)
0 : HP deers: [1. 1. 1. 1. 1.]
0 : HP tigers: [1. 1.]
"""
done = False
step_idx = 0
while not done:
deer_obs = env.get_observation(deer_handle)
tiger_obs = env.get_observation(tiger_handle)
if step_idx == 0:
print("Tiger obs: %s, %s" % (
tiger_obs[0].shape, tiger_obs[1].shape))
print("Deer obs: %s, %s" % (
deer_obs[0].shape, deer_obs[1].shape))
print("%d: HP deers: %s" % (
step_idx, deer_obs[0][:, 1, 1, 2]))
print("%d: HP tigers: %s" % (
step_idx, tiger_obs[0][:, 4, 4, 2]))
deer_act = models[0].infer_action(deer_obs)
tiger_act = models[1].infer_action(tiger_obs)
env.set_action(deer_handle, deer_act)
env.set_action(tiger_handle, tiger_act)
env.render()
done = env.step()
env.clear_dead()
t_reward = env.get_reward(tiger_handle)
d_reward = env.get_reward(deer_handle)
print("Rewards: deer %s, tiger %s" % (d_reward, t_reward))
step_idx += 1
Tiger的Deep Q-network
接下来将DQN应用于Tiger agent组,所有agent将共享神经网络,所以它们的行为相同 在看训练代码之前,先来看看如何包装MAgent环境
包装MAgent环境
class MAgentEnv(VectorEnv):
"""
继承自gym.vector.vector_env.VectorEnv类 该类有同步和异步两种模式
构造函数接收3个参数:MAgent环境示例、要控制的组句柄、reset_env_func函数(将MAgent重置为初始状态)
"""
def __init__(self, env: magent.GridWorld, handle,
reset_env_func: Callable[[], None],
is_slave: bool = False,
steps_limit: Optional[int] = None):
reset_env_func()
action_space = self.handle_action_space(env, handle)
observation_space = self.handle_obs_space(env, handle)
count = env.get_num(handle)
super(MAgentEnv, self).__init__(count, observation_space,
action_space)
self.action_space = self.single_action_space
self._env = env
self._handle = handle
self._reset_env_func = reset_env_func
self._is_slave = is_slave
self._steps_limit = steps_limit
self._steps_done = 0
@classmethod
def handle_action_space(cls, env: magent.GridWorld,
handle) -> gym.Space:
return spaces.Discrete(env.get_action_space(handle)[0])
"""
观察空间涉及两个部分:空间和特征向量
空间特征重置以满足pytorch的(C, W, H)形式
然后构造两个spaces.Box实例 并使用spaces.Tuple组合到一起
"""
@classmethod
def handle_obs_space(cls, env: magent.GridWorld, handle) -> gym.Space:
v = env.get_view_space(handle)
r = env.get_feature_space(handle)
view_shape = (v[-1],) + v[:2]
view_space = spaces.Box(low=0.0, high=1.0,
shape=view_shape)
extra_space = spaces.Box(low=0.0, high=1.0, shape=r)
return spaces.Tuple((view_space, extra_space))
def reset_wait(self):
self._steps_done = 0
if not self._is_slave:
self._reset_env_func()
return self.handle_observations(self._env, self._handle)
"""
从当前环境状态构建观察
查询观察值 将两个分量都复制到numpy数组中
更改轴顺序 并在第一维上拆分两个观察值 将它们转换为元组列表
返回列表中的每个元组都包含改组中每个活着的agent的观察值
这些观察将被添加到回放缓冲区并进行采样以用于后续训练
因此我们需要将其拆分为多个条目
"""
@classmethod
def handle_observations(cls, env: magent.GridWorld,
handle) -> List[Tuple[np.ndarray,
np.ndarray]]:
view_obs, feats_obs = env.get_observation(handle)
entries = view_obs.shape[0]
if entries == 0:
return []
view_obs = np.array(view_obs)
feats_obs = np.array(feats_obs)
view_obs = np.moveaxis(view_obs, 3, 1)
res = []
for o_view, o_feats in zip(np.vsplit(view_obs, entries),
np.vsplit(feats_obs, entries)):
res.append((o_view[0], o_feats[0]))
return res
def step_async(self, actions):
act = np.array(actions, dtype=np.int32)
self._env.set_action(self._handle, act)
"""
模拟执行一步 然后清楚所有组中死亡的agent并准备结果
由于agent可能在片段中死亡 因此观察和奖励的长度可能会随着时间流逝而减少
如果所有agent死亡 则片段结束 将重置片段并返回新的观察结果
"""
def step_wait(self):
self._steps_done += 1
if not self._is_slave:
done = self._env.step()
self._env.clear_dead()
if self._steps_limit is not None and self._steps_limit <= self._steps_done:
done = True
else:
done = False
obs = self.handle_observations(self._env, self._handle)
r = self._env.get_reward(self._handle).tolist()
dones = [done] * len(r)
if done:
obs = self.reset()
dones = [done] * self.num_envs
r = [0.0] * self.num_envs
return obs, r, dones, {}
训练
开始训练./forest_tigers_dqn.py -n run_name –cuda 激活老虎合作模式./forest_tigers_dqn.py -n run_name --mode double_ attack --cuda
import os
import sys
sys.path.append(os.path.join(os.getcwd(), "MAgent/python"))
import ptan
import torch
import argparse
import magent
from typing import Tuple
import ptan.ignite as ptan_ignite
from torch import optim
from types import SimpleNamespace
from lib import data, model, common
from ignite.engine import Engine
MAP_SIZE = 64
COUNT_TIGERS = 10
COUNT_DEERS = 50
WALLS_DENSITY = 0.04
PARAMS = SimpleNamespace(**{
'run_name': 'tigers',
'stop_reward': None,
'replay_size': 1000000,
'replay_initial': 100,
'target_net_sync': 1000,
'epsilon_frames': 5*10**5,
'epsilon_start': 1.0,
'epsilon_final': 0.02,
'learning_rate': 1e-4,
'gamma': 0.99,
'batch_size': 32
})
def test_model(net: model.DQNModel, device: torch.device, gw_config) -> Tuple[float, float]:
test_env = magent.GridWorld(gw_config, map_size=MAP_SIZE)
deer_handle, tiger_handle = test_env.get_handles()
def reset_env():
test_env.reset()
test_env.add_walls(method="random", n=MAP_SIZE * MAP_SIZE * WALLS_DENSITY)
test_env.add_agents(deer_handle, method="random", n=COUNT_DEERS)
test_env.add_agents(tiger_handle, method="random", n=COUNT_TIGERS)
env = data.MAgentEnv(test_env, tiger_handle, reset_env_func=reset_env)
preproc = model.MAgentPreprocessor(device)
agent = ptan.agent.DQNAgent(net, ptan.actions.ArgmaxActionSelector(), device, preprocessor=preproc)
obs = env.reset()
steps = 0
rewards = 0.0
while True:
actions = agent(obs)[0]
obs, r, dones, _ = env.step(actions)
steps += len(obs)
rewards += sum(r)
if dones[0]:
break
return rewards / COUNT_TIGERS, steps / COUNT_TIGERS
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--cuda", default=False, action='store_true', help="Enable CUDA computations")
parser.add_argument("-n", "--name", required=True, help="Run name")
parser.add_argument("--mode", default='forest', choices=['forest', 'double_attack', 'double_attack_nn'],
help="GridWorld mode, could be 'forest', 'double_attack' or 'double_attck_nn', default='forest'")
args = parser.parse_args()
config = args.mode
if args.mode == 'double_attack':
COUNT_TIGERS = 20
COUNT_DEERS = 1024
config = data.config_double_attack(MAP_SIZE)
elif args.mode == 'double_attack_nn':
COUNT_TIGERS = 20
COUNT_DEERS = 1024
config = 'double_attack'
device = torch.device("cuda" if args.cuda else "cpu")
saves_path = os.path.join("saves", args.name)
os.makedirs(saves_path, exist_ok=True)
m_env = magent.GridWorld(config, map_size=MAP_SIZE)
deer_handle, tiger_handle = m_env.get_handles()
def reset_env():
m_env.reset()
m_env.add_walls(method="random", n=MAP_SIZE * MAP_SIZE * WALLS_DENSITY)
m_env.add_agents(deer_handle, method="random", n=COUNT_DEERS)
m_env.add_agents(tiger_handle, method="random", n=COUNT_TIGERS)
env = data.MAgentEnv(m_env, tiger_handle, reset_env_func=reset_env)
if args.mode == 'double_attack_nn':
net = model.DQNNoisyModel(env.single_observation_space.spaces[0].shape,
env.single_observation_space.spaces[1].shape,
m_env.get_action_space(tiger_handle)[0]).to(device)
else:
net = model.DQNModel(env.single_observation_space.spaces[0].shape,
env.single_observation_space.spaces[1].shape,
m_env.get_action_space(tiger_handle)[0]).to(device)
tgt_net = ptan.agent.TargetNet(net)
print(net)
if args.mode == 'double_attack':
action_selector = ptan.actions.ArgmaxActionSelector()
epsilon_tracker = None
else:
action_selector = ptan.actions.EpsilonGreedyActionSelector(
epsilon=PARAMS.epsilon_start)
epsilon_tracker = common.EpsilonTracker(action_selector, PARAMS)
preproc = model.MAgentPreprocessor(device)
agent = ptan.agent.DQNAgent(net, action_selector, device, preprocessor=preproc)
exp_source = ptan.experience.ExperienceSourceFirstLast(
env, agent, PARAMS.gamma, vectorized=True)
buffer = ptan.experience.ExperienceReplayBuffer(
exp_source, PARAMS.replay_size)
optimizer = optim.Adam(net.parameters(), lr=PARAMS.learning_rate)
def process_batch(engine, batch):
res = {}
optimizer.zero_grad()
loss_v = model.calc_loss_dqn(
batch, net, tgt_net.target_model, preproc,
gamma=PARAMS.gamma, device=device)
loss_v.backward()
optimizer.step()
if epsilon_tracker is not None:
epsilon_tracker.frame(engine.state.iteration)
res['epsilon'] = action_selector.epsilon
if engine.state.iteration % PARAMS.target_net_sync == 0:
tgt_net.sync()
res['loss'] = loss_v.item()
return res
engine = Engine(process_batch)
common.setup_ignite(engine, PARAMS, exp_source, args.name,
extra_metrics=('test_reward', 'test_steps'))
best_test_reward = None
@engine.on(ptan_ignite.PeriodEvents.ITERS_10000_COMPLETED)
def test_network(engine):
net.train(False)
reward, steps = test_model(net, device, config)
net.train(True)
engine.state.metrics['test_reward'] = reward
engine.state.metrics['test_steps'] = steps
print("Test done: got %.3f reward after %.2f steps" % (
reward, steps
))
global best_test_reward
if best_test_reward is None:
best_test_reward = reward
elif best_test_reward < reward:
print("Best test reward updated %.3f <- %.3f, save model" % (
best_test_reward, reward
))
best_test_reward = reward
torch.save(net.state_dict(), os.path.join(saves_path, "best_%.3f.dat" % reward))
engine.run(common.batch_generator(buffer, PARAMS.replay_initial,
PARAMS.batch_size))
老虎的合作
修改double_attack文件 为老虎和鹿的每一步都给予0.1的奖励
def config_double_attack(map_size):
gw = magent.gridworld
cfg = gw.Config()
cfg.set({"map_width": map_size, "map_height": map_size})
cfg.set({"embedding_size": 10})
deer = cfg.register_agent_type("deer", {
'width': 1, 'length': 1, 'hp': 5, 'speed': 1,
'view_range': gw.CircleRange(1),
'attack_range': gw.CircleRange(0),
'step_recover': 0.2,
'kill_supply': 8,
'step_reward': 0.1,
})
tiger = cfg.register_agent_type("tiger", {
'width': 1, 'length': 1, 'hp': 10, 'speed': 1,
'view_range': gw.CircleRange(4),
'attack_range': gw.CircleRange(1),
'damage': 1, 'step_recover': -0.2,
'step_reward': 0.1,
})
deer_group = cfg.add_group(deer)
tiger_group = cfg.add_group(tiger)
a = gw.AgentSymbol(tiger_group, index='any')
b = gw.AgentSymbol(tiger_group, index='any')
c = gw.AgentSymbol(deer_group, index='any')
e1 = gw.Event(a, 'attack', c)
e2 = gw.Event(b, 'attack', c)
cfg.add_reward_rule(e1 & e2, receiver=[a, b], value=[1, 1])
return cfg
同时训练老虎和鹿
设置了两个独立的神经网络、回访缓冲区和经验源 在每个训练步骤中 从两个回访缓冲区中批采样 分别训练两个神经网络
import os
import sys
sys.path.append(os.path.join(os.getcwd(), "MAgent/python"))
import magent
import argparse
import torch
import numpy as np
from lib import model, data
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-md", "--model_deer", required=True,
help="Model file to load in deer agent")
parser.add_argument("-mt", "--model_tiger", required=True,
help="Model file to load in tiger agent")
parser.add_argument("--map-size", type=int, default=64,
help="Size of the map, default=64")
parser.add_argument("--render", default="render",
help="Directory to store renders, default=render")
parser.add_argument("--walls-density", type=float, default=0.04,
help="Density of walls, default=0.04")
parser.add_argument("--tigers", type=int, default=10,
help="Count of tigers, default=10")
parser.add_argument("--deers", type=int, default=50,
help="Count of deers, default=50")
parser.add_argument("--mode", default='forest', choices=['forest', 'double_attack'],
help="GridWorld mode, could be 'forest' or 'double_attack', default='forest'")
args = parser.parse_args()
if args.mode == 'forest':
config = data.config_forest(args.map_size)
elif args.mode == 'double_attack':
config = data.config_double_attack(args.map_size)
else:
config = None
env = magent.GridWorld(config, map_size=args.map_size)
env.set_render_dir(args.render)
deer_handle, tiger_handle = env.get_handles()
env.reset()
env.add_walls(method="random", n=args.map_size *
args.map_size *
args.walls_density)
env.add_agents(deer_handle, method="random", n=args.deers)
env.add_agents(tiger_handle, method="random", n=args.tigers)
v = env.get_view_space(tiger_handle)
v = (v[-1], ) + v[:2]
net_tiger = model.DQNModel(v, env.get_feature_space(
tiger_handle), env.get_action_space(tiger_handle)[0])
net_tiger.load_state_dict(torch.load(args.model_tiger))
print(net_tiger)
v = env.get_view_space(deer_handle)
v = (v[-1], ) + v[:2]
net_deer = model.DQNModel(v, env.get_feature_space(
deer_handle), env.get_action_space(deer_handle)[0])
net_deer.load_state_dict(torch.load(args.model_deer))
print(net_deer)
deer_total_reward = tiger_total_reward = 0.0
while True:
view_obs, feats_obs = env.get_observation(tiger_handle)
view_obs = np.array(view_obs)
feats_obs = np.array(feats_obs)
view_obs = np.moveaxis(view_obs, 3, 1)
view_t = torch.tensor(view_obs, dtype=torch.float32)
feats_t = torch.tensor(feats_obs, dtype=torch.float32)
qvals = net_tiger((view_t, feats_t))
actions = torch.max(qvals, dim=1)[1].cpu().numpy()
actions = actions.astype(np.int32)
env.set_action(tiger_handle, actions)
view_obs, feats_obs = env.get_observation(deer_handle)
view_obs = np.array(view_obs)
feats_obs = np.array(feats_obs)
view_obs = np.moveaxis(view_obs, 3, 1)
view_t = torch.tensor(view_obs, dtype=torch.float32)
feats_t = torch.tensor(feats_obs, dtype=torch.float32)
qvals = net_deer((view_t, feats_t))
actions = torch.max(qvals, dim=1)[1].cpu().numpy()
actions = actions.astype(np.int32)
env.set_action(deer_handle, actions)
done = env.step()
if done:
break
env.render()
env.clear_dead()
tiger_total_reward += env.get_reward(tiger_handle).sum()
deer_total_reward += env.get_reward(deer_handle).sum()
print("Average reward: tigers %.3f, deers %.3f" % (
tiger_total_reward / args.tigers,
deer_total_reward / args.deers
))
|