2024/3/21 DDPG 算法
问题导出
在现代机器学习中,特别是在强化学习领域,连续动作空间的处理一直是一个挑战。具体来说,当一个机械手臂设计为具有多个自由度时,如这个系统的两个关节可以独立运动,每个关节的运动范围都可以被视为一个连续区间。上部关节 (a_1) 在 [0°, 360°] 的范围内转动,而下部关节 ( a_2) 在 [0°, 180°] 范围内转动,组合起来,它们形成了一个二维的连续动作空间。在这样的空间内,动作可以被表达为二维向量,且因为每个维度都是连续的,理论上这个空间包含了无穷多个可能的动作。
在DDPG算法之前,我们在求解连续动作空间问题时,主要有两种方式:一是对连续动作做离散化处理,然后再利用强化学习算法(例如DQN)进行求解。二是使用Policy Gradient (PG)算法 (例如Reinforce) 直接求解。但是对于方式一,离散化处理在一定程度上脱离了工程实际;对于方式二,PG算法在求解连续控制问题时效果往往不尽人意。为此,DDPG算法横空出世,在许多连续控制问题上取得了非常不错的效果。
而DDPG算法则是为了在这样的连续动作空间中学习最优策略而设计。它融合了价值函数优化和策略梯度的方法,通过端到端的学习,使得算法能够在面对无穷动作选择时,生成一个确定性的策略。这使得DDPG特别适合于像机械手臂这样的控制系统,它们需要精确和连续的输出来执行复杂的任务。通过DDPG算法,我们能够使机器学习模型在高维的连续动作空间中找到那些能够最大化累积奖励的策略,从而达到任务的最优解。
DDPG 基本概念
DDPG算法,全称是deep deterministic policy gradient,深度确定性策略梯度算法。
深度(Deep):这一部分指的是DDPG使用深度神经网络来近似策略函数和价值函数。"深度"强调了算法采用深度学习技术,利用多层网络结构来处理高维度的状态空间,这对于处理复杂问题是非常有用的。
确定性(Deterministic):在DDPG中,策略是确定性的,这意味着给定一个状态,策略会输出一个确定的动作,而非一个动作的概率分布。这与传统的随机(或随机性)策略梯度方法不同,后者在给定状态下输出一个动作的概率分布,并从中采样以决定要执行的动作。
策略梯度(Policy Gradient):这表明DDPG属于策略梯度方法的一类,即直接对策略本身进行参数化,并通过梯度上升(或下降)方法来优化策略参数,以此最大化累积奖励。在DDPG中,策略梯度是基于一个确定性的策略函数来计算的,不同于基于概率策略的传统策略梯度方法。
为什么 DDPG 能够解决连续动作空间的问题
DDPG解决了连续动作空间问题的关键,确实是因为它避免了需要在连续动作空间中枚举和评估所有可能动作的需求。这是通过其确定性策略实现的,即直接从状态映射到一个具体的动作,而非动作的概率分布。这个动作是根据当前策略,预测在给定状态下可以获得最大回报的动作。
- 动作输出:DDPG的Actor网络直接输出一个具体的动作值,这个值是在当前策略下,给定状态空间中预测能获得最大回报的动作。这避免了在连续动作空间中对每个可能动作进行评估和比较的需要。
- 优化过程:DDPG中的Actor网络通过梯度上升法根据Critic网络的反馈(即状态-动作对的Q值)来优化其策略,使得输出的动作能够最大化预期回报。
- 效率和适用性:这种直接输出具体动作值的方法大大提高了算法在连续动作空间中的效率和适用性,使得DDPG适用于各种需要精细动作控制的连续控制任务。
为了便于理解,我们可以将其看成是一个包含状态、动作、Q值的三维坐标,每一点都对应于一个特定的状态-动作对的Q值,假设所有的点用一张布包裹,这样就相当于得到了一个连续的、包含所有状态和动作的网络。"布"代表了Critic网络的Q函数模型,它连续地覆盖了所有可能的状态-动作组合。
当我们输入状态state的时候,就相当于在这块布上做沿着state所在的位置剪开,这个时候大家会看到这个边缘是一条曲线。而这条曲线就是代表当前状态下,所有动作以及对应的Q值。
DDPG 算法原理
1.Value Network
- Value Network的作用是评估Q, 但和AC中的Critic不一样,这里评价的是Q不是V;
- 注意Value Network的输入有两个:动作和状态,需要一起输入到Value Network中;
- Value Network网络的loss其次是和AC一样,用的是TD-error
- 根据确定性策略网络(actor)得到动作a: a = \pi(s; \theta) 。
- 根据价值网络(critic)计算在当前状态s下动作a的价值: q(s, a; w) 。
- 根据得到的价值Q评估动作 a 的好坏。
- 一次传播的经验: (s_t, a_t, r_t, s_{t+1}) 。
- 值网络对时间 t 做出预测: q_t = q(s_t, a_t; w) 。
- 值网络对时间 t + 1 做出预测: q_{t+1} = q(s_{t+1}, a'_{t+1}; w) ,其中 a'_{t+1} = \pi(s_{t+1}; \theta) 。
- TD error: \delta_t = q_t - (r_t + \gamma \cdot q_{t+1}) 。
- 更新: w \leftarrow w - \alpha \cdot \delta_t \cdot \frac{\partial q(s_t,a_t;w)}{\partial w} 。
注意: r_t + \gamma \cdot q_{t+1} 是TD target,他一部分是真实观测到的奖励,另一部分是价值网络的预测,我们认为TD target比单纯的 q_{t+1} 更接近真相,所以鼓励 q_{t+1} 接近TD target,也就是让TD error尽量小。换句话说,就是用一个更接近真实值的预测取代当前的预测。
2. Policy Network
- Policy Network的功能是,根据状态S,输出一个动作A,这个动作A输入到Value Network中,能够获得最大的Q值。
- 当状态S和 \theta 一定的时候,网络输出的动作也是固定的,而当 \theta 变化的时候,输出的动作也会随之变化。也就是说,当我们状态 S 一定的时候,我们根据Value Network预测的 Q 值,根据梯度上升的方法,指导Policy Network的 \theta 更新,使其朝能使输出动作的Q值最大的方向变化。
- 所以Policy Network的更新方式也和AC不同,不是用梯度反向传,而是用梯度上升。
- critic q(s, a; w) 评估动作 a 的好坏。
- 改善 \theta 以使critic相信 a = \pi(s; \theta) 更好。
- 更新 \theta 以使 q(s, a; w) = q(s, \pi(s; \theta); w) 增加。
- 目标:增加 q(s, a; w) ,其中 a = \pi(s; \theta) 。
- DPG: g = \frac{\partial q(s, \pi(s; \theta))}{\partial \theta} = \frac{\partial a}{\partial \theta} \cdot \frac{\partial q(s, a; w)}{\partial a} 。
- 梯度上升: \theta \leftarrow \theta + \beta \cdot g 。
改进:Target networks
1.经验回放
在DQN训练中,利用神经网络逼近 Q*(s, a) 进行值函数近似是重要组成部分。这就要求我们要有能力从经验中学习并更新网络参数,因而我们需要一种机制来存储和利用这些经验,而这就是经验回放的基础。
每次行动之后,我们不会立即从这次行动中学习,而是将其存储到经验回放(Replay Buffer),可以有效避免在在线更新中因时间序列相关性而导致的过拟合问题。具体来说,经验回放会维护一个固定大小的缓冲区 (buffer),其中储存了由策略产生的一个或多个完整的经历 (s, a, r, s', d) 保存到这个buffer中。需要注意的是,buffer的容量是有限的,旧的经验会被新的替代。
采用经验回放的优点:
- 数据复用:同一个经验可以被多次用于训练,提高数据效率。
- 去相关:随机抽样打破了数据之间的时间相关性,有助于训练稳定,避免模型训练过拟合。
- 更平滑的学习过程:缓冲区内的多样性经验可以帮助模型更全面地学习。
*[注释]:过拟合:模型在训练数据上能够达到非常高的精确度,但是当应用于新的或者未见过的数据时,性能显著下降。
2.target 网络和参数软更新
同样在DQN算法中,DDPG的Critic网络更新的目标值是通过让不断与目标值接近。由于在计算目标值时,Critic网络 Q(s, a|θ^Q) 也在同时更新,非常容易导致神经网络训练过程的不稳定性。
如果把训练神经网络比喻成射击游戏,在训练网络的时候,就相当于在射击一个移动靶,因为每次射击一次,靶就会挪动一次。相比起固定的靶,无疑加上了训练的难度。那怎么解决这个问题呢?DQN的双网络思路是,既然现在是移动靶,那么我们就把它弄成是固定的靶,先停止10秒。10后挪动靶再打新的靶。而DDPG中没有使用这种硬更新,而是选择了软更新,即每次参数只更新一点点,即当前移动靶一次移动10m,而我们目标靶一次移动1m,可以保证参数稳定更新,从而提高参数的稳定性。
其中 \tau 是更新系数,一般取较小的值,比如 0.1 或者 0.001 这样的值。
3.环境探索
在DDPG算法中,由于策略是确定性的,这意味着对于给定的状态,策略总是产生相同的行动。这种确定性行为虽然在收敛到最优策略后非常有效,但在学习初期可能限制了智能体的探索范围。智能体可能会忽视那些初看不理想,但实际上可能更好的行动选择。这就使得它本身的探索十分有限。为了让DDPG策略能够更好地探索,可以在行动输出上加上一些随机噪音从而进行探索。具体,行动噪音表示为:
然而,随机噪音有时候的过大或过小,可能导致行动超出环境允许的边界。因此,在DDPG的算法中,采用了Clip操作,事先规定了通过策略网络输出的行动加上一个上界 a_{High} 和一个下界 a_{Low} 来确保行动在最终采用的动作 ( a ) :
具体来说,我们用输出的a作为一个正态分布的平均值,加上标准差参数VAR,构造一个正态分布。然后从正态分布中随机出一个新的动作代替a。我们知道a作为正态分布的均值,也是一个有最大概率获得的一个值。这就有点像epsilon-greedy,有一定概率在探索,也有一定概率在开发新的动作。我们可以用VAR来控制探索的程度: VAR越小,正态分布形状越“瘦”,选择a的概率会更大,开发的性质更多; VAR越大,正态分布形状越“胖”,选择a的概率会更小,探索的性质更多。
值得注意的是,在训练好智能体之后,我们就不需要再做探索了。
*[注释]:
因为var的大小代表agent探索化境的程度,var越大agent对环境的探索能力更大,var越小,agent对环境的探索越少。一般我们在程序初期,需要agent多与环境进行交互和探索,以便能尽可能的存储到各类可能情况,所以我们需要var值比较大,后期慢慢减小,让agent少些探索,尽快收敛。
完整DDPG网络结构
综上,所以DDPG具有双网络结构,即当前网络和目标网络的概念。而由于现在本就有actor和critic两个网络,那么双网络就变成了4个网络,分别是:actor当前网络、actor目标网络、critic当前网络、critic目标网络。DDPG 4个网络的功能:
1.Actor当前网络:负责策略网络参数的迭代更新,负责根据当前状态 s 选择当前动作 a ,用于和环境交互生成 s',r, 。
2.Actor目标网络:负责根据经验回放池中采样的下一状态 s' 选择最优下一动作 a' 。
3.Critic当前网络:负责价值网络参数w的迭代更新,负责计算负责计算当前Q值 Q(s,a,,w) 。目标Q值 y'=r+γQ'(s',a',w') 。
4.Critic目标网络:负责计算目标Q值中的 Q'(s',a',w') 部分。
算法实现
算法伪代码:
算法大概流程如下:
*[注释]:
问题:经验回放里面的每条四元组都是根据相同的模型参数得到,但是得到不同四元组的网络参数是不同的,那如果根据一条四元组使用TD算法更新了模型参数之后,再选择另一条四元组来更新参数是不是有问题呢?因为新选择的四元组信息还是根据之前的参数得到的?答:在理想情况下,我们希望每次更新模型时都使用基于当前模型参数生成的数据。然而,这在实际中是不可行的,因为这会要求在每次参数更新后都重新与环境进行大量的交互来生成新数据。经验回放通过使用一个包含多样化历史数据的大型记忆库来缓解这个问题。虽然这些数据是基于旧的模型参数生成的,但由于记忆库的多样性和随机抽样机制,智能体仍然能从中学习到有价值的信息。此外,随着学习的进行,记忆库中也会逐渐融入基于新参数的数据。而且在实践中,使用经验回放的方法被证明是有效的。
代码实现
代码详见:强化学习小例子:DDPG算法 pytorch代码-CSDN博客
*[注释]:以下代码没什么用,小白敲着练手加深理解的!!!!
策略网络
class PolicyNet(nn.Model):
def __int__(self, n_states, n_hiddens, n_actions, action_bound):
super(PolicyNet,self).__init__()
self.action_bound=action.bound
self.fc1=nn.Linear(n_states,n_hiddens)
self.fc2=nn.Linear(n_hiddens,n_actions)
def forward(self,x):
x=self.fc1(x)
x=F.relu(x)
x=self.fc2(x)
x=torch.tanh(x)
x=x*self.action_bound
return x
价值网络
class QValueNet(nn.module):
def __init__(self,n_states,n_hiddens,n_actions)
super(QValueNet,self).__init__()
self.fc1=nn.Liner(n_states+n_actions,n_hiddens)
seld.fc2=nn.Liner(n_hiddens,n_hiddens)
seld.fc3=nn.Liner(n_hiddens,1)
def forward(self,x,a):
cat = torch.cat([x,a],dim=1)
x=self.fc1(x)
x=F.relu(x)
x=self.fc2(x)
x=F.relu(x)
x=self.fc3(x)
return x
*[注释]: torch.cat函数:按指定维度拼接:Pytorch中torch.cat()函数解析-CSDN博客
算法主体
class DDPG
def __init__(self,n_states,n_hiddens,n_actions,action_bound,
sigma,actor_lr,critic_lr,tau,gamma,device)
#DDPG拥有四个网络、
#策略网络—训练
self.actor=PoliceNet(n_states,n_hiddens,n_actions,action_bound).to(device)
#价值网络训练
self.critic=QvalueNet(n_states,n_hiddens,n_actions).to(device)
#策略网络目标
self.target_action=PoliceNet(n_states,n_hiddens,n_actions,action_bound).to(device)
#价值网络-目标
self.target_critic=QValueNet(n_states,n_hiddens,n_actions).to(device)
#初始化价值网络参数,两个网络参数相同
self.target_critic.load_state_dict(self.critic.state_dict())
#初始化策略网络参数,两个网络参数相同
self.targect_actor.load_state_dict(self.actor.state_dict)
#策略网络优化器
self.actor_optimizer=torch.optim.Adam(self.actor.parameters(),lr=actor_lr)
#价值网络的优化器
self.critic_optimizer=torcch.optim.Adam(self.critic.parameters(),r=critic_lr)
#属性分配
self.gamma=gamma
self.sigma=sigma
self.tau=tau
self.n_actions=n_actions
self.device=device
#动作选择
def take_actions(self,state)
#维度变化,list[n_states]-->tensor[1,n_states]-->gpu
state=torch.tensor(state,dtype=torch.float).view(-1,1).to(self.device)
#策略网络计算出当前动作下的价值[1,n_states]-->[1,1]-->int
action=self.actor(state).item()
#给动作添加噪声,增加搜索
action=action+self.sigma*np.random.randn(self.n_actions)
if self.sigma>0.01:
self.sigma -= 0.00001
return action
#软更新,每次learn的时候更新部分参数
def soft_update(self,net,target_net)
#获取训练网络和目标网络需要的参数
for param_target,param in zip(target_net.parameters(),net.parameters())
param_target.data.copy_(param_target.data*(1-self.tau)+para.data*self.tau)
#训练
def update(self,transition_dict)
#从训练集中取出数据
states=torch.tensor(transitions_dict['states'],dtype=torch.float).to(self.device)
actions=torch.tensor(transitions_dict['actions'],dtype=torch.float)
.view(-1, 1).to(self.device)
rewards=torch.tensor(transitions_dict['rewards'],dtype=torch.float)
.view(-1, 1).to(self.device)
next_stations=torch.tensor(transitions_dict['next_states'],dtype=torch.float)
.to(self.device)
dones=torch.tensor(transitions_dict['dones'],dtype=torch.float)
.view(-1, 1).to(self.device)
#价值网络获取下一时刻的每个动作价值
next_q_values = self.target_actor(next_states)
#策略网络获取下一个时刻状态选出的动作价值
next_q_values = self.target_critic(next_states,next_q_values)
#当前时刻动作价值对应的目标值
q_targets=rewards+self.gamma*next_q_values*(1-dones)
#当前时刻动作价值的预测值
q_values=self.critic(states,actions)
#预测值和目标值之间的均方差损失
critic_loss=torch.mean(F.mse_loss(q_values,q_targets))
#价值网络梯度
self.critic_optimizer.zero_gard()
critic_loss.backword()
self.critic_optimizer.step()
#当前状态下策略网络选出的动作
actor_q_values = self.actor(states)
#当前动作选出的动作的价值
score=self.critic(states,actor_q_values)
#计算损失
actor_loss= -torch.mean(score)
#策略网络梯度
self.actor_optimizer.zero_gard()
actor_loss.backward()
self.actor_optimizer.step()
#软更新策略网络参数
self.soft_update(self.actor,self.target_actor)
#软更新价值网络参数
self.soft_update(self.critic,self.target_critic)
*[注释]:
load_state_dict函数:pytorch模型保存与加载:state_dict、load_state_dict_model.load_state_dict-CSDN博客torch.optim.Adam函数:pytorch ——torch.optim.Adam-CSDN博客
view(-1,1)函数:Python函数.view(1,-1)和 .view(-1,1)有什么区别_.view(-1, 1)-CSDN博客
.item()函数;Pytorch/Python中item()的用法_item函数-CSDN博客
run_this函数
import numpy as np
import torch
import matplotlib.pyplot as plt
import gym
from parsers import args
from RL_bain import ReplayBuffer, DDPG
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
#----------------------------
#环境加载
#----------------------------
env_name="MountionCarContinus-v0" #连续型动作
env =gym.make(env_name,render_mode="human")
state = env.reset()[0]
n_states=env.observation_space.shape[0]
n_actions=env.action_space.shape[0]
action-bound=env.action_space.high[0]
#----------------------------
#模型构建
#----------------------------
#经验回放池实例化
replay_buffer=ReplayBuffer(capacity=args.buffer_size)
agent = DDPG(n_states=n_states,n_hiddens=args.n_hiddens,n_actions=n_actions
action_bound=action_bound,sigma=args.sigma,actor_lr=args.actor_lr
critic.lr=args.critic_lr,tau=args.tau,gamma=aargs.gamma
device=device)
#--------------------------
#模型训练
#--------------------------
for i in range(100)
episode_return=0 #累计每条链路上的reward
state=env.reset()[0]
done=False
num_done=0
while not done
action = agent.take.action(state)
next_state,reward,done,_,_=env.step(action)
#自己更改reward
position,velocity = next_state
reward += 2*abs(position+0.5) +abs(velocity)*2
if velocity > 0:
reward = reward + 0.1
else:
reward += -0.1
#更新经验回放池
replay_buffer.add(state,action,reward,next_state,done)
#状态更新
state = next_state
#累计每一步的奖励
episode_return += reward
num_done = num_done +1
print(f'done:{done},num:{num_done},reward:{reward},i:{i+1}')
#如果经验经验池超过容量,开始训练
#经验池随机采样batch_size
if replay_buffer.size()>args.min_size
a,s,r,ns,d=replay_buffer.sample(args.batch_size)
#构造数据集
transition_dict = {'states':s,'actions':a,'rewards':r,
'next_states':ns'dones'=d,}
#模型训练
agent.update(transition_dict)
# 保存每一个回合的回报
return_lsit.append(episode_return)
mean_return_list.append(nn.mean(return_list[-10:]))
#打印回合信息
print(f'iter:{i},return:{episode_return},mean_return:{np.mean(return_list{-10:})}')
env.close()
#---------------------
#绘图
#---------------------
plt.subplot(121)
plt.plot(x_range,return_list)
plt.xlable('episode')
plt.ylable('return')
plt.subplot(122)
plt.plot(x_range,mean_return_list)
plt.xlable('episode')
plt.ylable('mean_return')
*[注释]:
env.make:Gym使用实例-小车上山_import gym-CSDN博客
参考文章
参考文章1:白话强化学习 - 知乎 (zhihu.com)
参考文章2;[强化学习-07]–DPG、DDPG - 知乎 (zhihu.com)
参考文章3: 【深度强化学习】(5) DDPG 模型解析,附Pytorch完整代码_ddpg算法流程图-CSDN博客
参考文章4:强化学习小例子:DDPG算法 pytorch代码-CSDN博客
参考文章5:深度强化学习(五):DDPG(Deep Deterministic Policy Gradient,深度确定性策略梯度) - 知乎 (zhihu.com)