文章目录
前言IPPO 算法原理代码实践1. 环境与依赖库导入2. 工具函数:计算优势函数 (GAE)3. 智能体定义:策略与价值网络4. PPO 算法核心实现5. IPPO 训练函数6. 算法调用与训练
总结
前言
单智能体强化学习(Single-Agent Reinforcement Learning)假设环境是静态的,即状态转移概率和奖励函数是固定的。然而,当我们进入多智能体(Multi-Agent)的世界时,情况变得复杂起来。在多智能体系统中,多个智能体在共享环境中同时进行实时交互和学习,每个智能体的策略更新都会影响环境的动态,从而对其他智能体产生影响。
从每个独立智能体的视角来看,环境是非静态的(non-stationary)。即使在相同的状态下采取相同的动作,得到的下一个状态和奖励信号也可能因为其他智能体的行为而不断变化。这给模型的训练带来了巨大挑战,因为强化学习算法的收敛性通常依赖于环境的马尔可夫性质。
为了应对这些挑战,研究者提出了多种方法。最直接的思路是基于我们熟悉的单智能体算法进行扩展,主要分为两种范式:
完全中心化 (Fully Centralized):将所有智能体视为一个“超级智能体”,聚合所有智能体的状态和动作用于决策。这种方法可以保证环境的稳定性,但随着智能体数量的增加,会面临维度爆炸的问题,导致训练复杂度急剧上升。完全去中心化 (Fully Decentralized):每个智能体独立地在环境中学习,不考虑其他智能体的存在。这种方法的扩展性很好,但由于忽略了环境的非静态问题,算法的收敛性无法得到保证。
本文将介绍一种介于两者之间且更偏向于去中心化思想的算法——独立近端策略优化 (Independent Proximal Policy Optimization, IPPO)。IPPO 是一种简单而有效的多智能体强化学习算法,它为每个智能体配备独立的 Actor 和 Critic 网络,并使用 PPO 算法进行独立训练。
在本文中,我们将通过一个具体的 PyTorch 代码实例,深入探讨 IPPO 的实现细节。我们将使用 库中的
ma-gym 环境,这是一个多智能体战斗场景,非常适合用来演示 IPPO 的训练过程。
Combat
完整代码:下载链接
IPPO 算法原理
IPPO 算法属于完全去中心化的方法,它让每个智能体都基于自身的观测独立进行学习。其核心思想是,系统中的 N 个智能体分别拥有自己的策略和价值函数,并在训练过程中独立优化。
算法流程如下:
对于系统中的 N 个智能体,为每个智能体初始化各自的策略以及价值函数。开始训练循环(for k = 0, 1, 2…):
所有智能体在环境中交互,分别获得各自的一条轨迹数据。对每个智能体,基于当前价值函数用 GAE (Generalized Advantage Estimation) 计算优势函数的估计。对每个智能体,通过最大化 PPO-截断的目标来更新其策略。对每个智能体,通过均方误差损失函数优化其价值函数。 结束循环。
在我们的代码实践中,为了提高数据利用率和训练稳定性,我们还会引入一个关键技巧:参数共享 (Parameter Sharing)。即所有智能体共享同一套策略和价值网络参数。这种做法的前提是所有智能体都是同质的(Homogeneous),意味着它们拥有相同的状态和动作空间以及优化目标。
接下来,让我们深入代码,看看 IPPO 是如何实现的。
代码实践
1. 环境与依赖库导入
首先,我们需要导入必要的库,并设置 环境。
ma-gym 是一个基于 OpenAI Gym 的多智能体环境库。我们选择其中的
ma-gym 环境,这是一个在二维格子世界里进行的对战模拟游戏。
Combat
Combat 环境简介:
世界:一个二维网格世界。队伍:两个队伍进行对抗。动作:每个智能体的动作集包括向四周移动一格、攻击周围 3×3 范围内的其他敌对智能体,或者不采取任何行动。生命值:初始时,每个智能体有 3 点生命值。如果智能体在敌人的攻击范围内被攻击到,则会扣掉 1 点生命值,生命值掉为 0 则死亡。胜利条件:存活的队伍获胜。冷却机制:每个智能体的攻击有一轮的冷却时间。
import torch # PyTorch深度学习框架
import torch.nn as nn # 神经网络模块
import torch.optim as optim # 优化器模块
import torch.nn.functional as F # 神经网络函数模块,包含激活函数、损失函数等
import numpy as np # 数值计算库
from tqdm import tqdm # 进度条库,用于显示训练进度
import matplotlib.pyplot as plt # 绘图库
import random # 随机数生成库
import collections # 集合数据类型库,用于deque等
import sys # 系统相关功能库
import os # 操作系统接口库
import pickle # 序列化库,用于保存和加载模型
from collections import deque, namedtuple # 双端队列和命名元组
import warnings # 警告处理库
warnings.filterwarnings('ignore') # 忽略警告信息
# 添加ma-gym环境路径到系统路径
sys.path.append("./ma-gym")
# 导入Combat多智能体战斗环境
from ma_gym.envs.combat.combat import Combat
# 设置随机种子以确保实验的可重复性
def set_seed(seed=42):
"""
设置所有随机数生成器的种子
参数:
seed (int): 随机种子值,默认为42
"""
torch.manual_seed(seed) # 设置PyTorch CPU随机种子
torch.cuda.manual_seed_all(seed) # 设置PyTorch GPU随机种子
np.random.seed(seed) # 设置NumPy随机种子
random.seed(seed) # 设置Python内置random模块种子
2. 工具函数:计算优势函数 (GAE)
在 PPO 算法中,我们需要估计优势函数
A
(
s
,
a
)
A(s, a)
A(s,a),它表示在状态
s
s
s 下采取动作
a
a
a 相对于平均水平的好坏程度。我们使用泛化优势估计 (GAE) 来计算,它通过参数
l
a
m
b
d
a
\lambda
lambda 在偏差和方差之间进行权衡。
其中
d
e
l
t
a
_
t
=
r
_
t
+
g
a
m
m
a
V
(
s
_
t
+
1
)
−
V
(
s
_
t
)
\delta\_t = r\_t + \gamma V(s\_{t+1}) – V(s\_t)
delta_t=r_t+gammaV(s_t+1)−V(s_t) 是时序差分误差 (TD error)。
# 工具函数
def compute_advantage(gamma, lmbda, td_delta):
"""
使用GAE(Generalized Advantage Estimation)计算优势函数
参数:
gamma (float): 折扣因子,用于未来奖励的折扣
lmbda (float): GAE参数,控制偏差-方差权衡
td_delta (torch.Tensor): TD误差,维度为 [episode_length]
返回:
torch.Tensor: 优势值,维度为 [episode_length]
"""
td_delta = td_delta.detach().numpy() # 转换为numpy数组,维度为 [episode_length]
advantage_list = [] # 存储优势值的列表
advantage = 0.0 # 初始化优势值
# 从后向前计算优势值(反向遍历)
for delta in td_delta[::-1]: # delta为标量,表示单步TD误差
advantage = gamma * lmbda * advantage + delta # GAE公式计算
advantage_list.append(advantage) # 添加到列表,advantage为标量
advantage_list.reverse() # 恢复正确的时间顺序
return torch.tensor(np.array(advantage_list), dtype=torch.float) # 返回维度 [episode_length]
3. 智能体定义:策略与价值网络
我们采用 Actor-Critic 架构。
策略网络 (PolicyNet):Actor 部分,输入状态,输出一个动作的概率分布。价值网络 (ValueNet):Critic 部分,输入状态,输出该状态的价值估计
V
(
s
)
V(s)
V(s)。
# 策略网络定义
class PolicyNet(torch.nn.Module):
"""
策略网络(Actor网络),用于输出动作概率分布
"""
def __init__(self, state_dim, hidden_dim, action_dim):
"""
初始化策略网络
参数:
state_dim (int): 状态维度
hidden_dim (int): 隐藏层维度
action_dim (int): 动作维度
"""
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim) # 第一个全连接层 [state_dim] -> [hidden_dim]
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) # 第二个全连接层 [hidden_dim] -> [hidden_dim]
self.fc3 = torch.nn.Linear(hidden_dim, action_dim) # 输出层 [hidden_dim] -> [action_dim]
def forward(self, x):
"""
前向传播
参数:
x (torch.Tensor): 输入状态,维度为 [batch_size, state_dim]
返回:
torch.Tensor: 动作概率分布,维度为 [batch_size, action_dim]
"""
x = F.relu(self.fc2(F.relu(self.fc1(x)))) # 两层ReLU激活 [batch_size, hidden_dim]
return F.softmax(self.fc3(x), dim=1) # Softmax输出概率分布 [batch_size, action_dim]
# 价值网络定义
class ValueNet(torch.nn.Module):
"""
价值网络(Critic网络),用于估计状态价值函数
"""
def __init__(self, state_dim, hidden_dim):
"""
初始化价值网络
参数:
state_dim (int): 状态维度
hidden_dim (int): 隐藏层维度
"""
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim) # 第一个全连接层 [state_dim] -> [hidden_dim]
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) # 第二个全连接层 [hidden_dim] -> [hidden_dim]
self.fc3 = torch.nn.Linear(hidden_dim, 1) # 输出层 [hidden_dim] -> [1]
def forward(self, x):
"""
前向传播
参数:
x (torch.Tensor): 输入状态,维度为 [batch_size, state_dim]
返回:
torch.Tensor: 状态价值,维度为 [batch_size, 1]
"""
x = F.relu(self.fc2(F.relu(self.fc1(x)))) # 两层ReLU激活 [batch_size, hidden_dim]
return self.fc3(x) # 输出状态价值 [batch_size, 1]
4. PPO 算法核心实现
接下来是 PPO 算法的核心类。它包含了动作选择 () 和网络更新 (
take_action) 两个主要方法。
update 方法是 PPO 的精髓,它实现了 PPO 的截断目标函数 (Clipped Surrogate Objective)。
update
PPO 的目标函数为:
# PPO算法类
class PPO:
"""
PPO算法实现,采用截断方式(Clipped Surrogate Objective)
"""
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
lmbda, eps, gamma, device):
"""
初始化PPO智能体
参数:
state_dim (int): 状态空间维度
hidden_dim (int): 隐藏层维度
action_dim (int): 动作空间维度
actor_lr (float): Actor网络学习率
critic_lr (float): Critic网络学习率
lmbda (float): GAE参数
eps (float): PPO截断参数,控制策略更新幅度
gamma (float): 折扣因子
device (torch.device): 计算设备(CPU或GPU)
"""
# 初始化网络
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) # 策略网络
self.critic = ValueNet(state_dim, hidden_dim).to(device) # 价值网络
# 初始化优化器
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) # Actor优化器
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) # Critic优化器
# 保存超参数
self.gamma = gamma # 折扣因子
self.lmbda = lmbda # GAE参数
self.eps = eps # PPO截断范围参数
self.device = device # 计算设备
def take_action(self, state):
"""
根据当前状态选择动作
参数:
state (list): 当前状态,维度为 [state_dim]
返回:
int: 选择的动作索引
"""
# 将状态转换为张量并添加batch维度 [1, state_dim]
state = torch.tensor([state], dtype=torch.float).to(self.device)
# 获取动作概率分布 [1, action_dim]
probs = self.actor(state)
# 创建分类分布并采样动作
action_dist = torch.distributions.Categorical(probs) # 基于概率分布创建分类分布
action = action_dist.sample() # 采样动作,维度为 [1]
return action.item() # 返回标量动作
def update(self, transition_dict):
"""
使用经验数据更新网络参数
参数:
transition_dict (dict): 包含轨迹数据的字典
'states': 状态列表,长度为episode_length,每个元素维度为 [state_dim]
'actions': 动作列表,长度为episode_length,每个元素为标量
'rewards': 奖励列表,长度为episode_length,每个元素为标量
'next_states': 下一状态列表,长度为episode_length,每个元素维度为 [state_dim]
'dones': 完成标志列表,长度为episode_length,每个元素为布尔值
"""
# 将经验数据转换为张量
states = torch.tensor(transition_dict['states'],
dtype=torch.float).to(self.device) # 维度: [episode_length, state_dim]
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
self.device) # 维度: [episode_length, 1]
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device) # 维度: [episode_length, 1]
next_states = torch.tensor(transition_dict['next_states'],
dtype=torch.float).to(self.device) # 维度: [episode_length, state_dim]
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device) # 维度: [episode_length, 1]
"""
数据维度说明(以示例为准):
states.shape= torch.Size([9, 150]) # 9个时间步,每个状态150维
actions.shape= torch.Size([9, 1]) # 9个动作,每个动作1维
rewards.shape= torch.Size([9, 1]) # 9个奖励,每个奖励1维
next_states.shape= torch.Size([9, 150]) # 9个下一状态,每个状态150维
dones.shape= torch.Size([9, 1]) # 9个完成标志,每个标志1维
其中9代表每个轮次的步数(episode length)
"""
# 计算TD目标值:r + γ * V(s') * (1 - done)
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) # 维度: [episode_length, 1]
# 计算TD误差:TD目标 - 当前状态价值
td_delta = td_target - self.critic(states) # 维度: [episode_length, 1]
# 使用GAE计算优势函数
advantage = compute_advantage(self.gamma, self.lmbda,
td_delta.cpu()).to(self.device) # 维度: [episode_length, 1]
# 计算旧策略的对数概率(用于重要性采样)
old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach() # 维度: [episode_length, 1]
# 计算新策略的对数概率
log_probs = torch.log(self.actor(states).gather(1, actions)) # 维度: [episode_length, 1]
# 计算重要性采样比率: π_new(a|s) / π_old(a|s)
ratio = torch.exp(log_probs - old_log_probs) # 维度: [episode_length, 1]
# PPO的两个代理目标
surr1 = ratio * advantage # 未截断的代理目标,维度: [episode_length, 1]
surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage # 截断的代理目标,维度: [episode_length, 1]
# Actor损失:取两个目标的最小值的负数(因为要最大化目标)
actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数,标量
# Critic损失:均方误差损失
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach())) # 标量
# 梯度清零
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
# 反向传播
actor_loss.backward()
critic_loss.backward()
# 参数更新
self.actor_optimizer.step()
self.critic_optimizer.step()
5. IPPO 训练函数
这是整个项目的核心训练逻辑。 函数实现了 IPPO 的训练循环。
train_IPPO
核心思想:
参数共享:所有智能体使用同一个 PPO 实例,这意味着它们的策略网络和价值网络是共享的。独立经验收集:在每个 episode 中,为每个智能体创建独立的轨迹存储字典 (
agent,
transition_dict_1)。独立更新:在一个 episode 结束后,使用每个智能体收集到的轨迹数据,分别调用
transition_dict_2 方法来更新共享的网络。这相当于用来自不同智能体的经验数据分两次更新同一套参数,增加了样本利用率。
agent.update()
def train_IPPO(agent, env, num_episodes):
"""
训练IPPO(Independent PPO)算法
IPPO的核心思想是使用参数共享(parameter sharing)技巧,即对所有智能体使用同一套策略参数。
这样做的好处是:
1. 训练数据更多,提高样本效率
2. 训练更稳定
3. 减少参数量和计算复杂度
前提条件:智能体必须是同质的(homogeneous),即:
- 状态空间和动作空间完全一致
- 优化目标完全一致
参数:
agent (PPO): PPO智能体实例
env: 多智能体环境实例
num_episodes (int): 训练轮数
返回:
None: 函数会显示训练过程和最终的胜率曲线
"""
win_list = [] # 存储每轮比赛的胜负结果,维度为 [total_episodes]
# 分10个阶段进行训练,每个阶段显示进度条
for i in range(10):
with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
"""
为两个智能体分别创建轨迹存储字典
由于使用参数共享,虽然是两个智能体,但实际使用同一个策略网络
"""
# 智能体1的轨迹存储
transition_dict_1 = {
'states': [], # 状态序列,每个元素维度为 [state_dim]
'actions': [], # 动作序列,每个元素为标量
'next_states': [], # 下一状态序列,每个元素维度为 [state_dim]
'rewards': [], # 奖励序列,每个元素为标量
'dones': [] # 完成标志序列,每个元素为布尔值
}
# 智能体2的轨迹存储(结构相同)
transition_dict_2 = {
'states': [], # 状态序列,每个元素维度为 [state_dim]
'actions': [], # 动作序列,每个元素为标量
'next_states': [], # 下一状态序列,每个元素维度为 [state_dim]
'rewards': [], # 奖励序列,每个元素为标量
'dones': [] # 完成标志序列,每个元素为布尔值
}
# 重置环境,获取初始状态
s = env.reset() # 返回两个智能体的初始状态合并列表
"""
状态数据结构说明:
s是一个包含两个智能体状态的列表:
- s[0]: 智能体1观察到的状态,维度为 [150]
- s[1]: 智能体2观察到的状态,维度为 [150]
- np.array(s).shape = (2, 150),其中2是智能体数量,150是状态维度
每个状态是一个150维的浮点数向量,包含:
- 环境地图信息
- 智能体位置信息
- 敌方智能体信息
- 其他游戏相关特征
"""
terminal = False # 标记一轮比赛是否结束
# 开始一轮比赛的交互循环
while not terminal:
# 两个智能体根据各自观察到的状态选择动作
a_1 = agent.take_action(s[0]) # 智能体1的动作,返回动作索引(整数)
a_2 = agent.take_action(s[1]) # 智能体2的动作,返回动作索引(整数)
"""
动作数据结构说明:
- a_1, a_2: 整数,表示选择的动作索引
- 动作空间通常是离散的,比如:0=前进,1=后退,2=左转,3=右转,4=攻击等
"""
# 环境根据两个智能体的动作执行一步,返回新状态、奖励、完成标志和额外信息
next_s, r, done, info = env.step([a_1, a_2])
"""
环境返回值说明:
- next_s: 下一状态列表,维度为 (2, 150),包含两个智能体的下一状态
- r: 奖励列表,维度为 [2],包含两个智能体各自的奖励
- done: 完成标志列表,维度为 [2],标记各智能体是否完成任务
- info: 额外信息字典,包含游戏状态信息,如:
{'health': {0: 3, 1: 0}, 'win': False}
其中health表示各智能体的生命值,win表示是否胜利
"""
# 将经验数据存储到智能体1的轨迹中
transition_dict_1['states'].append(s[0]) # 当前状态
transition_dict_1['actions'].append(a_1) # 采取的动作
transition_dict_1['next_states'].append(next_s[0]) # 下一状态
# 奖励设计:胜利时额外奖励100,否则惩罚0.1(鼓励快速决策)
transition_dict_1['rewards'].append(
r[0] + 100 if info['win'] else r[0] - 0.1)
transition_dict_1['dones'].append(False) # 标记为未完成(在episode内部)
# 将经验数据存储到智能体2的轨迹中(结构相同)
transition_dict_2['states'].append(s[1]) # 当前状态
transition_dict_2['actions'].append(a_2) # 采取的动作
transition_dict_2['next_states'].append(next_s[1]) # 下一状态
# 奖励设计:胜利时额外奖励100,否则惩罚0.1
transition_dict_2['rewards'].append(
r[1] + 100 if info['win'] else r[1] - 0.1)
transition_dict_2['dones'].append(False) # 标记为未完成
# 更新状态
s = next_s
# 检查是否所有智能体都完成了任务
terminal = all(done) # 当done列表中所有元素都为True时,比赛结束
"""
终止条件说明:
- all(done)函数检查done列表中的所有元素
- 只有当所有智能体都标记为完成时,比赛才结束
- 比赛可能因为胜负决出、时间超限或其他游戏规则而结束
"""
# 记录本轮比赛结果(1表示胜利,0表示失败)
win_list.append(1 if info["win"] else 0)
# 使用两个智能体的轨迹数据分别更新同一个策略网络
# 这是IPPO的核心:参数共享但独立训练
agent.update(transition_dict_1) # 用智能体1的轨迹更新网络
agent.update(transition_dict_2) # 用智能体2的轨迹更新网络
# 每100轮显示一次训练进度
if (i_episode + 1) % 100 == 0:
pbar.set_postfix({
'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), # 当前总轮数
'return': '%.3f' % np.mean(win_list[-100:]) # 最近100轮的平均胜率
})
pbar.update(1)
# 训练完成后,处理胜率数据用于可视化
win_array = np.array(win_list) # 转换为numpy数组,维度为 [total_episodes]
# 每100轮取一次平均胜率,用于绘制平滑的学习曲线
win_array = np.mean(win_array.reshape(-I 100), axis=1) # 维度为 [total_episodes/100]
# 创建对应的轮数序列
episodes_list = np.arange(win_array.shape[0]) * 100 # 维度为 [total_episodes/100]
# 绘制训练过程中的胜率变化曲线
plt.figure(figsize=(10, 6))
plt.plot(episodes_list, win_array, linewidth=2)
plt.xlabel('Episodes') # x轴:训练轮数
plt.ylabel('Win rate') # y轴:胜率
plt.title('IPPO on Combat') # 图标题
plt.grid(True, alpha=0.3) # 添加网格
plt.show() # 显示图像
6. 算法调用与训练
最后,我们设置超参数,实例化环境和 PPO 智能体,然后调用 函数开始训练。
train_IPPO
# 训练参数配置与环境实例化
# =============================================================================
# ======================== 超参数设置 ========================
actor_lr = 3e-4 # Actor网络学习率,用于策略网络参数更新,标量
critic_lr = 1e-3 # Critic网络学习率,用于价值网络参数更新,标量
num_episodes = 1000 # 总训练轮数,每轮代表一场完整的游戏,标量
hidden_dim = 64 # 神经网络隐藏层维度,控制网络容量和表达能力,标量
gamma = 0.99 # 折扣因子,用于计算未来奖励的现值,范围[0,1],标量
lmbda = 0.97 # GAE(Generalized Advantage Estimation)参数,控制偏差-方差权衡,范围[0,1],标量
eps = 0.2 # PPO截断参数,限制策略更新幅度,防止过大更新,标量
# ======================== 设备配置 ========================
# 优先使用GPU加速训练,如果不可用则使用CPU
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"使用计算设备: {device}")
# ======================== 环境参数设置 ========================
team_size = 2 # 每队智能体数量,即己方和敌方各有2个智能体,标量
grid_size = (15, 15) # 格子世界的大小,创建15x15的网格地图,元组维度为(2,)
# ======================== 环境实例化 ========================
"""
创建Combat多智能体战斗环境
- grid_shape: 网格地图尺寸,设置为15x15的二维网格世界
- n_agents: 己方智能体数量,设置为2个可控制的智能体
- n_opponents: 敌方智能体数量,设置为2个对手智能体
- 游戏目标: 己方智能体需要与敌方智能体战斗并获胜
"""
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)
# ======================== 状态和动作空间分析 ========================
"""
从环境中获取状态空间和动作空间的维度信息
注意:在多智能体环境中,observation_space和action_space都是列表,
每个元素对应一个智能体的空间信息
"""
# 获取单个智能体的状态空间维度
state_dim = env.observation_space[0].shape[0] # 单个智能体观察到的状态维度,标量
"""
state_dim = 150
- 表示每个智能体观察到的状态是一个150维的特征向量
- 这个向量包含了智能体能够感知到的所有环境信息:
* 自身位置和状态信息
* 队友位置和状态信息
* 敌方智能体位置和状态信息
* 环境地图信息(障碍物、可通行区域等)
* 其他游戏相关特征
"""
# 获取单个智能体的动作空间维度
action_dim = env.action_space[0].n # 单个智能体可选择的动作数量,标量
"""
action_dim = 7
动作空间包含7种不同的动作:
1. 向上移动 (移动类动作)
2. 向下移动 (移动类动作)
3. 向左移动 (移动类动作)
4. 向右移动 (移动类动作)
5. 攻击敌方智能体1 (攻击类动作)
6. 攻击敌方智能体2 (攻击类动作)
7. 不采取任何行动 (无动作)
动作设计说明:
- 移动动作(4种): 允许智能体在15x15网格中移动到相邻格子
- 攻击动作(2种): 可以攻击3x3范围内的特定敌方智能体
- 无动作(1种): 智能体选择等待或观察,不进行任何操作
"""
print(f"状态空间维度 (state_dim): {state_dim}")
print(f"动作空间维度 (action_dim): {action_dim}")
print(f"智能体数量: {team_size}")
print(f"环境大小: {grid_size}")
# ======================== 智能体实例化 ========================
"""
创建PPO智能体实例
关键点:两个智能体共享同一个策略网络(参数共享)
这是IPPO(Independent PPO)的核心思想:
- 使用同一套神经网络参数
- 但基于各自的经验独立进行训练
- 这样可以增加训练数据,提高学习效率
"""
agent = PPO(
state_dim=state_dim, # 状态空间维度: 150
hidden_dim=hidden_dim, # 隐藏层维度: 64
action_dim=action_dim, # 动作空间维度: 7
actor_lr=actor_lr, # Actor学习率: 3e-4
critic_lr=critic_lr, # Critic学习率: 1e-3
lmbda=lmbda, # GAE参数: 0.97
eps=eps, # PPO截断参数: 0.2
gamma=gamma, # 折扣因子: 0.99
device=device # 计算设备: cuda/cpu
)
print("PPO智能体初始化完成")
print(f"Actor网络参数量: {sum(p.numel() for p in agent.actor.parameters())}")
print(f"Critic网络参数量: {sum(p.numel() for p in agent.critic.parameters())}")
# ======================== 开始训练 ========================
print(f"
开始IPPO训练,总轮数: {num_episodes}")
print("=" * 50)
"""
调用训练函数开始IPPO算法训练
训练过程中将会:
1. 让两个智能体与环境交互,收集经验数据
2. 使用PPO算法更新策略网络和价值网络
3. 记录胜率变化,监控训练进度
4. 最终绘制学习曲线图表
"""
train_IPPO(agent, env, num_episodes)
使用计算设备: cuda
状态空间维度 (state_dim): 150
动作空间维度 (action_dim): 7
智能体数量: 2
环境大小: (15, 15)
PPO智能体初始化完成
Actor网络参数量: 14279
Critic网络参数量: 13889
开始IPPO训练,总轮数: 1000
==================================================
Iteration 0: 100%|██████████| 100/100 [00:14<00:00, 6.95it/s, episode=100, return=0.000]
Iteration 1: 100%|██████████| 100/100 [00:13<00:00, 7.39it/s, episode=200, return=0.000]
Iteration 2: 100%|██████████| 100/100 [00:15<00:00, 6.46it/s, episode=300, return=0.000]
Iteration 3: 100%|██████████| 100/100 [00:13<00:00, 7.48it/s, episode=400, return=0.000]
Iteration 4: 100%|██████████| 100/100 [00:14<00:00, 6.80it/s, episode=500, return=0.000]
Iteration 5: 100%|██████████| 100/100 [00:13<00:00, 7.29it/s, episode=600, return=0.000]
Iteration 6: 100%|██████████| 100/100 [00:13<00:00, 7.30it/s, episode=700, return=0.000]
Iteration 7: 100%|██████████| 100/100 [00:14<00:00, 7.08it/s, episode=800, return=0.010]
Iteration 8: 100%|██████████| 100/100 [00:13<00:00, 7.20it/s, episode=900, return=0.010]
Iteration 9: 100%|██████████| 100/100 [00:11<00:00, 8.67it/s, episode=1000, return=0.010]

从训练结果来看,胜率在训练后期有略微提升,但整体表现不佳。这反映了 IPPO 的固有缺陷:虽然它具有很好的扩展性,但由于无法解决环境的非静态问题,其收敛性和最终性能可能无法得到保证。
总结
本文详细介绍并实现了多智能体强化学习中的 IPPO 算法。通过 PyTorch 代码,我们在 环境中展示了 IPPO 的完整训练流程。
Combat
核心要点回顾:
多智能体挑战:核心在于环境的非静态性,即一个智能体的策略更新会改变其他智能体的学习环境。IPPO 范式:作为一种完全去中心化的方法,IPPO 让每个智能体独立学习,实现简单且扩展性强。参数共享:在同质智能体的场景下,通过共享网络参数,可以显著提升样本效率和训练稳定性。性能与局限:IPPO 虽然实现简单,但由于没有从根本上解决非静态问题,训练收敛性无法保证,性能可能受限。
尽管存在局限性,IPPO 仍然是多智能体强化学习领域一个重要的基线算法。对于许多问题,尤其是当智能体数量庞大时,这种简单、可扩展的方法可能是唯一可行的选择。通过理解 IPPO 的实现和原理,我们可以更好地为更复杂的算法(如 MADDPG、COMA、QMIX 等)打下坚实的基础。















暂无评论内容