【深度强化学习】(4) Actor-Critic 模型解析,附Pytorch完整代码

大家好,今天和各位分享一下深度强化学习中的 Actor-Critic 演员评论家算法,Actor-Critic 算法是一种综合了策略迭代和价值迭代的集成算法。我将使用该模型结合 OpenAI 中的 Gym 环境完成一个小游戏,完整代码可以从我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model

1. 算法原理

根据 agent 选择动作方法的不同,可以把强化学习方法分为三大类:行动者方法(Actor-only),评论家方法(Critic-only),行动者评论家方法(Actor-critic)。

行动者方法中不会对值函数进行估计,直接按照当前策略和环境进行交互。通过交互后得到的立即奖赏值直接优化当前策略。例如:Policy Gradients

评论家方法没有需要维护的策略,评论家方法的策略是直接通过当前的值函数获得的,并通过值函数获得的策略与环境交互。交互得到的立即奖赏值用来优化当前值函数。例如:DQN

行动者评论家方法是由行动者和评论家两个部分构成。行动者用于选择动作评论家评论选择动作的好坏。行动者选择动作的方法不是依据当前的值函数,而是依据存储的策略。评论家的评论一般采用时间差分误差的形式,时间差分误差是根据当前的值函数计算获得的时间差分误差是是评论家的唯一输出,并且驱动了行动者和评论家之间的所有学习。

2. 公式推导

根据策略梯度算法的定义,策略优化目标函数如下:

L_pi = sum _{ain A} log pi_ theta (s_t, a_t) (G_{t}^{n}-V(s_t))

令 ,A_t = G_t^n - V(s_t),称 A_t 为优势函数。采用 n 步时序差分法求解时,G_t 可以表示如下:

G_t = R_{t+1} + gamma R_{t+2} + cdots +gamma ^{n-1}R_{t+n} + gamma^nV(s_{t+n})

当 n 为一个完整的状态序列大小时,该算法与蒙特卡洛算法等价。

Actor-Critic 算法一共分为两个部分,Critic 和 Actor 网络。

Critic 是评判网络当输入为环境状态时,它可以评估当前状态的价值当输入为环境状态和采取的动作时,它可以评估当前状态下采取该动作的价值。

Actor 为策略网络以当前的状态作为输入,输出为动作的概率分布或者连续动作值再由 Critic 网络来评价该动作的好坏从而调整策略。Actor-Critic算法将动作价值评估和策略更新过程分开,Actor 可以对当前环境进行充分探索并缓慢进行策略更新Critic 只需要负责评价策略的好坏,所以这种集成算法有相对较好的性能。

Critic 网络的输入一般有两种形式,(1)如果输入为状态,则该评价网络的作用为评价当前状态价值(2)如果输入为状态和动作,则该评价网络的作用为评价当前状态的动作价值

如果评价网络 Critic 为状态价值 state value 的评价网络,输入为状态Critic 网络的损失函数计算公式采用均方误差损失函数,即 TD 误差值的累计平方值的均值,表达式如下:

L_{critic} = frac{1}{N} sum_{i}^{N} (G_t-V(s_t))

Actor 网络的优化目标可以如下:

L_{actor} = frac{1}{N} sum_{i}^{N} log pi_ast (s_t,a_t) (G_t-V(s_t))

其中,pi_ast 代表最优策略,由于该公式表达的含义为当 TD 误差值大于 0 时增强该动作选择概率当 TD 误差值小于 0 时减小该动作选择概率,所以目标为最小化损失函数 -L_{actor}

如果评价网络 Critic 为动作价值 action value 的评价网络,即输入为状态和动作,则Critic 网络的损失函数如下:

L_{critic} = frac{1}{N} sum_{i}^{N} (G_t - Q(s_t,a_t))^2

其中,G_t 的表达式变换如下:

G_t = R_{t+1} + gamma R_{t+2} + cdots +gamma ^{n-1}R_{t+n} + gamma^nQ(s_{t+n}, a_{t+n})

Actor-Critic 算法流程如下:

3. 代码实现

Actor-Critic 模型部分的实现方式如下:

import torch
from torch import nn
from torch.nn import functional as F
import numpy as np

# ------------------------------------ #
# 策略梯度Actor,动作选择
# ------------------------------------ #

class PolicyNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_actions)
    # 前向传播
    def forward(self, x):
        x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]
        x = F.relu(x)  
        x = self.fc2(x)  # [b,n_hiddens]-->[b,n_actions]
        # 每个状态对应的动作的概率
        x = F.softmax(x, dim=1)  # [b,n_actions]-->[b,n_actions]
        return x

# ------------------------------------ #
# 值函数Critic,动作评估输出 shape=[b,1]
# ------------------------------------ #

class ValueNet(nn.Module):
    def __init__(self, n_states, n_hiddens):
        super(ValueNet, self).__init__()
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, 1)
    # 前向传播
    def forward(self, x):
        x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # [b,n_hiddens]-->[b,1]
        return x

# ------------------------------------ #
# Actor-Critic
# ------------------------------------ #

class ActorCritic:
    def __init__(self, n_states, n_hiddens, n_actions,
                 actor_lr, critic_lr, gamma):
        # 属性分配
        self.gamma = gamma

        # 实例化策略网络
        self.actor = PolicyNet(n_states, n_hiddens, n_actions)
        # 实例化价值网络
        self.critic = ValueNet(n_states, n_hiddens)
        # 策略网络的优化器
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        # 价值网络的优化器
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
    
    # 动作选择
    def take_action(self, state):
        # 维度变换numpy[n_states]-->[1,n_sates]-->tensor
        state = torch.tensor(state[np.newaxis, :])
        # 动作价值函数,当前状态下各个动作的概率
        probs = self.actor(state)
        # 创建以probs为标准类型的数据分布
        action_dist = torch.distributions.Categorical(probs)
        # 随机选择一个动作 tensor-->int
        action = action_dist.sample().item()
        return action

    # 模型更新
    def update(self, transition_dict):
        # 训练集
        states = torch.tensor(transition_dict['states'], dtype=torch.float)
        actions = torch.tensor(transition_dict['actions']).view(-1,1)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1)

        # 预测的当前时刻的state_value
        td_value = self.critic(states)
        # 目标的当前时刻的state_value
        td_target = rewards + self.gamma * self.critic(next_states) * (1-dones)
        # 时序差分的误差计算,目标的state_value与预测的state_value之差
        td_delta = td_target - td_value
        
        # 对每个状态对应的动作价值用log函数
        log_probs = torch.log(self.actor(states).gather(1, actions))
        # 策略梯度损失
        actor_loss = torch.mean(-log_probs * td_delta.detach())
        # 值函数损失,预测值和目标值之间
        critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))

        # 优化器梯度清0
        self.actor_optimizer.zero_grad()  # 策略梯度网络的优化器
        self.critic_optimizer.zero_grad()  # 价值网络的优化器
        # 反向传播
        actor_loss.backward()
        critic_loss.backward()
        # 参数更新
        self.actor_optimizer.step()
        self.critic_optimizer.step()

4. 案例演示

我们使用 OpenAI 的 gym 库中的环境,完成一个小案例。我们的目的是左右移动黑色小车使得黄色的杆子保持竖直。状态 state 的维度为 4,动作 action 有 2 个。

环境交互与训练部分的代码如下:

import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
from RL_brain import ActorCritic

# ----------------------------------------- #
# 参数设置
# ----------------------------------------- #

num_episodes = 100  # 总迭代次数
gamma = 0.9  # 折扣因子
actor_lr = 1e-3  # 策略网络的学习率
critic_lr = 1e-2  # 价值网络的学习率
n_hiddens = 16  # 隐含层神经元个数
env_name = 'CartPole-v1'
return_list = []  # 保存每个回合的return

# ----------------------------------------- #
# 环境加载
# ----------------------------------------- #

env = gym.make(env_name, render_mode="human")
n_states = env.observation_space.shape[0]  # 状态数 4
n_actions = env.action_space.n  # 动作数 2

# ----------------------------------------- #
# 模型构建
# ----------------------------------------- #

agent = ActorCritic(n_states=n_states,  # 状态数
                    n_hiddens=n_hiddens,  # 隐含层数
                    n_actions=n_actions,  # 动作数
                    actor_lr=actor_lr,  # 策略网络学习率
                    critic_lr=critic_lr,  # 价值网络学习率
                    gamma=gamma)  # 折扣因子

# ----------------------------------------- #
# 训练--回合更新
# ----------------------------------------- #

for i in range(num_episodes):
    
    state = env.reset()[0]  # 环境重置
    done = False  # 任务完成的标记
    episode_return = 0  # 累计每回合的reward

    # 构造数据集,保存每个回合的状态数据
    transition_dict = {
        'states': [],
        'actions': [],
        'next_states': [],
        'rewards': [],
        'dones': [],
    }

    while not done:
        action = agent.take_action(state)  # 动作选择
        next_state, reward, done, _, _  = env.step(action)  # 环境更新
        # 保存每个时刻的状态动作...
        transition_dict['states'].append(state)
        transition_dict['actions'].append(action)
        transition_dict['next_states'].append(next_state)
        transition_dict['rewards'].append(reward)
        transition_dict['dones'].append(done)
        # 更新状态
        state = next_state
        # 累计回合奖励
        episode_return += reward

    # 保存每个回合的return
    return_list.append(episode_return)
    # 模型训练
    agent.update(transition_dict)

    # 打印回合信息
    print(f'iter:{i}, return:{np.mean(return_list[-10:])}')

# -------------------------------------- #
# 绘图
# -------------------------------------- #

plt.plot(return_list)
plt.title('return')
plt.show()

绘制每回合的回报 return

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
乘风的头像乘风管理团队
上一篇 2023年4月5日
下一篇 2023年4月5日

相关推荐