深度强化学习-Dueling DQN算法原理与代码

Dueling Deep Q Network(Dueling DQN)是对DQN算法的改进,有效提升了算法的性能。如果对DQN算法还不太了解的话,可以参考我的这篇博文:深度强化学习-DQN算法原理与代码,里面详细讲述了DQN算法的原理和代码实现。本文就带领大家了解一下Dueling DQN算法,论文链接见下方。

论文:http://proceedings.mlr.press/v48/wangf16.pdf

代码:https://github.com/indigoLovee/Dueling_DQN

喜欢的话可以点个star呢。

1 Dueling DQN算法简介

Dueling DQN算法提出了一种新的神经网络结构——对偶网络(duel network)。网络的输入与DQN和DDQN算法的输入一样,均为状态信息,但是输出却有所不同。Dueling DQN算法的输出包括两个分支,分别是该状态的状态价值V(标量)和每个动作的优势值A(与动作空间同维度的向量)。DQN和DDQN算法的输出只有一个分支,为该状态下每个动作的动作价值(与动作空间同维度的向量)。具体的网络结构如下图所示:

单分支网络结构(DQN和DDQN)

 对偶网络结构(Dueling DQN)

在DQN算法的网络结构中,输入为一张或多张照片,利用卷积网络提取图像特征,之后经过全连接层输出每个动作的动作价值;在Dueling DQN算法的网络结构中,输入同样为一张或多张照片,然后利用卷积网络提取图像特征获取特征向量,输出时会经过两个全连接层分支,分别对应状态价值和优势值,最后将状态价值和优势值相加即可得到每个动作的动作价值(即绿色连线操作)。

为什么要采用对偶网络结构?

其实动机很简单:很多游戏的Q值,只受当前状态影响,无论采取什么动作区别不大。如下图所示:

 这是Atari game中的一个赛车游戏,Value表示状态价值,Advantage表示动作优势值,图中黄色部分表示注意力。当前方没有车辆时,智能体左右移动并没有影响,说明动作对Q值没有影响,但是状态对Q值很有影响。从上面两幅图可以看出,Value更加关注远方道路,因为开的越远,对应的状态值越大;Advantage没有特别注意的地方,说明动作没有影响。当前方存在车辆阻挡时,智能体的动作选择至关重要,说明动作对Q值存在影响,同样状态对Q值也会存在影响。从下面两幅图可以看出,Value同样更加关注远方道路,但是Advantge此时会关注前方车辆,因为如果不采取相应动作,智能体很有可能会发生碰撞,分数就会很低。对偶网络思想符合很多场景的设定。

2 Dueling DQN算法原理

2.1 优势函数(Advantage function)

在讲优势函数之前,我们先来回顾一些基本概念吧。

折扣回报(Discounted return):

U_{t}=R_{t}+\gamma R_{t+1}+\gamma ^{2}R_{t+1}+\gamma ^{3}R_{t+3}+\cdots

U_{t} 表示从t时刻开始,未来所有奖励的加权求和。在t时刻,U_{t}是未知的,它依赖于未来所有状态和动作。

动作价值函数(Action-value function):

Q_{\pi }(s_{t},a_{t})=E\left [ U_{t}\mid S_{t}=s_{t},A_{t}=a_{t} \right ]

 Q_{\pi }(s_{t},a_{t})是回报U_{t}的条件期望,将t+1时刻以后的状态和动作全部消掉。Q_{\pi }(s_{t},a_{t})依赖于状态s_{t}、动作a_{t}以及策略\pi

状态价值函数(State-value function):

V_{\pi }(s_{t})=E\left [ Q_{\pi }(s_{t},A) \right ]

V_{\pi }(s_{t})Q_{\pi }(s_{t},a_{t})的期望,将Q_{\pi }(s_{t},a_{t})中的动作a_{t}消掉。V_{\pi }(s_{t})依赖于状态s_{t}  和策略\pi

最优动作价值函数(Optimal action-value function):

Q^{\ast }(s,a)=max_{\pi }Q_{\pi }(s,a)

 动作价值函数Q_{\pi }(s_{t},a_{t})依赖于策略\pi,对Q_{\pi }(s_{t},a_{t})关于\pi求最大值,消除掉\pi,得到Q^{\ast }(s,a)Q^{\ast }(s,a)只依赖于状态s和动作a,不依赖于策略\piQ^{\ast }(s,a)用于评价在状态s下做动作a的好坏。

 最优状态价值函数(Optimal state_value function):

V^{\ast }(s)=max_{\pi }V_{\pi }(s)

V_{\pi }(s)关于\pi求 最大值,消除掉\pi,得到V^{\ast }(s)V^{\ast }(s)只依赖于状态s, 不依赖于策略\piV^{\ast }(s)用于评价状态s的好坏。

 最优优势函数(Optimal advantage function):

A^{\ast }(s,a)=Q^{\ast }(s,a)-V^{\ast }(s)

 将V^{\ast }(s)作为baseline,则优势函数A^{\ast }(s,a)表示动作a相对于baseline的优势。动作a越好,其优势越大。

 2.2 优势函数性质

 为了推导优势函数的性质,这里给出定理1(大家可以从确定性策略角度来理解定理1,这里不给出该定理的证明):

定理1:V^{\ast }(s)=max_{a}Q^{\ast }(s,a)

针对优势函数

A^{\ast }(s,a)=Q^{\ast }(s,a)-V^{\ast }(s)

 两边同时对动作a取最大值,得到

max_{a}A^{\ast }(s,a)=max_{a}Q^{\ast }(s,a)-V^{\ast }(s)

 根据定理1进一步得到

max_{a}A^{\ast }(s,a)=0

 接着对优势函数做变换,得到

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)

不妨在上式右边减去 max_{a}A^{\ast }(s,a),由于其等于0,等号不变,得到

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)-max_{a}A^{\ast }(s,a)

看到这里小伙伴可能会疑惑,为什么要 减去 max_{a}A^{\ast }(s,a),这不是多此一举吗?不急,这个后面2.4节会解释。至此,我们得到了定理2,这是一个非常重要的等式,Dueling Network就是根据定理2得到的。

定理2:Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)-max_{a}A^{\ast }(s,a)

 2.3 对偶网络

在DQN算法中,我们利用神经网络Q(s,a;w)来近似最优动作值函数Q^{\ast }(s,a),从而得到最优策略,其中w为网络参数。但是在Dueling Network中,我们利用神经网路A(s,a;w^{A})来近似最优优势函数A^{\ast }(s,a),利用神经网络V(s;w^{V})来近似最优状态值函数V^{\ast }(s),其中w^{A}w^{V}均为网络参数。

根据定理2,将其中的V^{\ast }(s)用神经网络A(s,a;w^{A})来替代,A^{\ast }(s,a)用神经网络A(s,a;w^{A})来代替,得到

Q(s,a;w^{A},w^{V})=V(s;w^{V})+A(s,a;w^{A})-max_{a}A(s,a;w^{A})

其中Q(s,a;w^{A},w^{V})近似Q^{\ast }(s,a),它被成为Dueling Network。令w=(w^{A},w^{V}),则

Q(s,a;w)=V(s;w^{V})+a(s,a;w^{A})-max_{a}A(s,a;w^{A})

现在,我们发现Dueling Network与DQN完全一致,包括输入和输出,唯一不同在于神经网络的结构。Dueling Network的网络结构更好,因此它的表现比DQN更好。另外,DQN中的一些技巧也同样可以用于Dueling Network中,例如优先经验回放、Double DQN和Multi-step TD target等。

2.4 唯一性

 观察下面两个等式:

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)-max_{a}A^{\ast }(s,a)

下面的式子相较于上面的式子只是减去了max_{a}A^{\ast }(s,a),前面已经讨论过 max_{a}A^{\ast }(s,a)=0,既然如此,这一项有必要的吗,直接用上面的式子不就行了吗?

答案是有必要的。因为上面的式子存在一个缺点:无法通过学习Q^{\ast }(s,a)来唯一确定V^{\ast }(s)A^{\ast }(s,a)举个栗子:令{V}'=V^{\ast }+10{A}'=A^{\ast }-10,那么

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)={V}'(s)+{A}'(s,a)

因此结果不唯一。这种不唯一性会带来什么问题呢?

如果神经网络V和A上下波动,幅度相同,方向相反,那么网络的输出相同,但是由于两个网络都在上下波动,两个网络都不稳定,因此都训练不好。

针对上面式子的不唯一性,下面的式子可以解决。通过加入最大化强可以有效避免不唯一性问题,仍然用上面的栗子来分析,令{V}'=V^{\ast }+10{A}'=A^{\ast }-10,那么

Q^{\ast }(s,a)=(V^{\ast }(s)+10)+(A^{\ast }(s,a)-10)-max_{a}(A^{\ast }(s,a)-10) =V^{\ast }(s)+A^{\ast }(s,a)+10

 此时Q^{\ast }(s,a)发生了变化,解决了不唯一性问题,这就是要加上最大化项的原因了。

2.5 两种数学形式

 通过前面的推导,我们得到了Dueling Network的数学形式为

Q(s,a;w)=V(s;w^{V})+a(s,a;w^{A})-max_{a}A(s,a;w^{A})

 实际中将最大化形式变成均值形式效果更好,更稳定,其数学形式如下

Q(s,a;w)=V(s;w^{V})+A(s,a;w^{A})-mean_{a}A(s,a;w^{A})

 这种替换没有任何理论依据,就是实验效果好,简单!粗暴!有效!

3 Dueling DQN算法代码

经验回放采用集中式均匀回放,代码如下(脚本buffer.py):

  1. import numpy as np
  2. class ReplayBuffer:
  3. def __init__(self, state_dim, action_dim, max_size, batch_size):
  4. self.mem_size = max_size
  5. self.mem_cnt = 0
  6. self.batch_size = batch_size
  7. self.state_memory = np.zeros((self.mem_size, state_dim))
  8. self.action_memory = np.zeros((self.mem_size, ))
  9. self.reward_memory = np.zeros((self.mem_size, ))
  10. self.next_state_memory = np.zeros((self.mem_size, state_dim))
  11. self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)
  12. def store_transition(self, state, action, reward, state_, done):
  13. mem_idx = self.mem_cnt % self.mem_size
  14. self.state_memory[mem_idx] = state
  15. self.action_memory[mem_idx] = action
  16. self.reward_memory[mem_idx] = reward
  17. self.next_state_memory[mem_idx] = state_
  18. self.terminal_memory[mem_idx] = done
  19. self.mem_cnt += 1
  20. def sample_buffer(self):
  21. mem_len = min(self.mem_size, self.mem_cnt)
  22. batch = np.random.choice(mem_len, self.batch_size, replace=False)
  23. states = self.state_memory[batch]
  24. actions = self.action_memory[batch]
  25. rewards = self.reward_memory[batch]
  26. states_ = self.next_state_memory[batch]
  27. terminals = self.terminal_memory[batch]
  28. return states, actions, rewards, states_, terminals
  29. def ready(self):
  30. return self.mem_cnt > self.batch_size

目标网络的更新方式为软更新,Dueling DQN的实现代码如下(脚本Dueling_DQN.py):

  1. import torch as T
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. import torch.nn.functional as F
  5. import numpy as np
  6. from buffer import ReplayBuffer
  7. device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
  8. class DuelingDeepQNetwork(nn.Module):
  9. def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
  10. super(DuelingDeepQNetwork, self).__init__()
  11. self.fc1 = nn.Linear(state_dim, fc1_dim)
  12. self.fc2 = nn.Linear(fc1_dim, fc2_dim)
  13. self.V = nn.Linear(fc2_dim, 1)
  14. self.A = nn.Linear(fc2_dim, action_dim)
  15. self.optimizer = optim.Adam(self.parameters(), lr=alpha)
  16. self.to(device)
  17. def forward(self, state):
  18. x = T.relu(self.fc1(state))
  19. x = T.relu(self.fc2(x))
  20. V = self.V(x)
  21. A = self.A(x)
  22. return V, A
  23. def save_checkpoint(self, checkpoint_file):
  24. T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)
  25. def load_checkpoint(self, checkpoint_file):
  26. self.load_state_dict(T.load(checkpoint_file))
  27. class DuelingDQN:
  28. def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
  29. gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-4,
  30. max_size=1000000, batch_size=256):
  31. self.gamma = gamma
  32. self.tau = tau
  33. self.batch_size = batch_size
  34. self.epsilon = epsilon
  35. self.eps_min = eps_end
  36. self.eps_dec = eps_dec
  37. self.checkpoint_dir = ckpt_dir
  38. self.action_space = [i for i in range(action_dim)]
  39. self.q_eval = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
  40. fc1_dim=fc1_dim, fc2_dim=fc2_dim)
  41. self.q_target = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
  42. fc1_dim=fc1_dim, fc2_dim=fc2_dim)
  43. self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
  44. max_size=max_size, batch_size=batch_size)
  45. self.update_network_parameters(tau=1.0)
  46. def update_network_parameters(self, tau=None):
  47. if tau is None:
  48. tau = self.tau
  49. for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
  50. q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)
  51. def remember(self, state, action, reward, stata_, done):
  52. self.memory.store_transition(state, action, reward, stata_, done)
  53. def decrement_epsilon(self):
  54. self.epsilon = self.epsilon - self.eps_dec \
  55. if self.epsilon > self.eps_min else self.eps_min
  56. def choose_action(self, observation, isTrain=True):
  57. state = T.tensor([observation], dtype=T.float).to(device)
  58. _, A = self.q_eval.forward(state)
  59. action = T.argmax(A).item()
  60. if (np.random.random() < self.epsilon) and isTrain:
  61. action = np.random.choice(self.action_space)
  62. return action
  63. def learn(self):
  64. if not self.memory.ready():
  65. return
  66. states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
  67. batch_idx = T.arange(self.batch_size, dtype=T.long).to(device)
  68. states_tensor = T.tensor(states, dtype=T.float).to(device)
  69. actions_tensor = T.tensor(actions, dtype=T.long).to(device)
  70. rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
  71. next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
  72. terminals_tensor = T.tensor(terminals).to(device)
  73. with T.no_grad():
  74. V_, A_ = self.q_target.forward(next_states_tensor)
  75. q_ = V_ + A_ - T.mean(A_, dim=-1, keepdim=True)
  76. q_[terminals_tensor] = 0.0
  77. target = rewards_tensor + self.gamma * T.max(q_, dim=-1)[0]
  78. V, A = self.q_eval.forward(states_tensor)
  79. q = (V + A - T.mean(A, dim=-1, keepdim=True))[batch_idx, actions_tensor]
  80. loss = F.mse_loss(q, target.detach())
  81. self.q_eval.optimizer.zero_grad()
  82. loss.backward()
  83. self.q_eval.optimizer.step()
  84. self.update_network_parameters()
  85. self.decrement_epsilon()
  86. def save_models(self, episode):
  87. self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_{}.pth'.format(episode))
  88. print('Saving Q_eval network successfully!')
  89. self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_{}.pth'.format(episode))
  90. print('Saving Q_target network successfully!')
  91. def load_models(self, episode):
  92. self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_{}.pth'.format(episode))
  93. print('Loading Q_eval network successfully!')
  94. self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_{}.pth'.format(episode))
  95. print('Loading Q_target network successfully!')

算法仿真环境为gym库中的LunarLander-v2,因此需要先配置好gym库。进入Anaconda3中对应的Python环境中,执行下面的指令

pip install gym

但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏和经典控制环境,无法使用LunarLander-v2。因此还需要安装一些其他依赖项,具体可以参考我的这篇博文:AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法

让智能体在环境中训练500轮,训练代码如下(脚本train.py):

  1. import gym
  2. import numpy as np
  3. import argparse
  4. from utils import plot_learning_curve, create_directory
  5. from Dueling_DQN import DuelingDQN
  6. parser = argparse.ArgumentParser()
  7. parser.add_argument('--max_episodes', type=int, default=500)
  8. parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DuelingDQN/')
  9. parser.add_argument('--reward_path', type=str, default='./output_images/reward.png')
  10. parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')
  11. args = parser.parse_args()
  12. def main():
  13. env = gym.make('LunarLander-v2')
  14. agent = DuelingDQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
  15. fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
  16. eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
  17. create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
  18. total_rewards, avg_rewards, eps_history = [], [], []
  19. for episode in range(args.max_episodes):
  20. total_reward = 0
  21. done = False
  22. observation = env.reset()
  23. while not done:
  24. action = agent.choose_action(observation, isTrain=True)
  25. observation_, reward, done, info = env.step(action)
  26. agent.remember(observation, action, reward, observation_, done)
  27. agent.learn()
  28. total_reward += reward
  29. observation = observation_
  30. total_rewards.append(total_reward)
  31. avg_reward = np.mean(total_rewards[-100:])
  32. avg_rewards.append(avg_reward)
  33. eps_history.append(agent.epsilon)
  34. print('EP:{} reward:{} avg_reward:{} epsilon:{}'.
  35. format(episode + 1, total_reward, avg_reward, agent.epsilon))
  36. if (episode + 1) % 50 == 0:
  37. agent.save_models(episode + 1)
  38. episodes = [i for i in range(args.max_episodes)]
  39. plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
  40. plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)
  41. if __name__ == '__main__':
  42. main()

训练时还会用到画图函数和创建文件夹函数,它们均放置在utils.py脚本中,具体代码如下:

  1. import os
  2. import matplotlib.pyplot as plt
  3. def plot_learning_curve(episodes, records, title, ylabel, figure_file):
  4. plt.figure()
  5. plt.plot(episodes, records, linestyle='-', color='r')
  6. plt.title(title)
  7. plt.xlabel('episode')
  8. plt.ylabel(ylabel)
  9. plt.show()
  10. plt.savefig(figure_file)
  11. def create_directory(path: str, sub_dirs: list):
  12. for sub_dir in sub_dirs:
  13. if os.path.exists(path + sub_dir):
  14. print(path + sub_dir + ' is already exist!')
  15. else:
  16. os.makedirs(path + sub_dir, exist_ok=True)
  17. print(path + sub_dir + ' create successfully!')

仿真结果如下图所示:

 平均累积奖励曲线

epsilon变化曲线

 通过平均累积奖励可以看出,Dueling DQN算法大约在250步时趋于收敛。