Dueling Deep Q Network(Dueling DQN)是对DQN算法的改进,有效提升了算法的性能。如果对DQN算法还不太了解的话,可以参考我的这篇博文:深度强化学习-DQN算法原理与代码,里面详细讲述了DQN算法的原理和代码实现。本文就带领大家了解一下Dueling DQN算法,论文链接见下方。
论文:http://proceedings.mlr.press/v48/wangf16.pdf
代码:https://github.com/indigoLovee/Dueling_DQN
喜欢的话可以点个star呢。
1 Dueling DQN算法简介
Dueling DQN算法提出了一种新的神经网络结构——对偶网络(duel network)。网络的输入与DQN和DDQN算法的输入一样,均为状态信息,但是输出却有所不同。Dueling DQN算法的输出包括两个分支,分别是该状态的状态价值V(标量)和每个动作的优势值A(与动作空间同维度的向量)。DQN和DDQN算法的输出只有一个分支,为该状态下每个动作的动作价值(与动作空间同维度的向量)。具体的网络结构如下图所示:
单分支网络结构(DQN和DDQN)
对偶网络结构(Dueling DQN)
在DQN算法的网络结构中,输入为一张或多张照片,利用卷积网络提取图像特征,之后经过全连接层输出每个动作的动作价值;在Dueling DQN算法的网络结构中,输入同样为一张或多张照片,然后利用卷积网络提取图像特征获取特征向量,输出时会经过两个全连接层分支,分别对应状态价值和优势值,最后将状态价值和优势值相加即可得到每个动作的动作价值(即绿色连线操作)。
为什么要采用对偶网络结构?
其实动机很简单:很多游戏的Q值,只受当前状态影响,无论采取什么动作区别不大。如下图所示:
这是Atari game中的一个赛车游戏,Value表示状态价值,Advantage表示动作优势值,图中黄色部分表示注意力。当前方没有车辆时,智能体左右移动并没有影响,说明动作对Q值没有影响,但是状态对Q值很有影响。从上面两幅图可以看出,Value更加关注远方道路,因为开的越远,对应的状态值越大;Advantage没有特别注意的地方,说明动作没有影响。当前方存在车辆阻挡时,智能体的动作选择至关重要,说明动作对Q值存在影响,同样状态对Q值也会存在影响。从下面两幅图可以看出,Value同样更加关注远方道路,但是Advantge此时会关注前方车辆,因为如果不采取相应动作,智能体很有可能会发生碰撞,分数就会很低。对偶网络思想符合很多场景的设定。
2 Dueling DQN算法原理
2.1 优势函数(Advantage function)
在讲优势函数之前,我们先来回顾一些基本概念吧。
折扣回报(Discounted return):
表示从t时刻开始,未来所有奖励的加权求和。在t时刻,是未知的,它依赖于未来所有状态和动作。
动作价值函数(Action-value function):
是回报的条件期望,将t+1时刻以后的状态和动作全部消掉。依赖于状态、动作以及策略。
状态价值函数(State-value function):
是的期望,将中的动作消掉。依赖于状态 和策略。
最优动作价值函数(Optimal action-value function):
动作价值函数依赖于策略,对关于求最大值,消除掉,得到,只依赖于状态和动作,不依赖于策略。用于评价在状态下做动作的好坏。
最优状态价值函数(Optimal state_value function):
对关于求 最大值,消除掉,得到,只依赖于状态, 不依赖于策略。用于评价状态的好坏。
最优优势函数(Optimal advantage function):
将作为baseline,则优势函数表示动作相对于baseline的优势。动作越好,其优势越大。
2.2 优势函数性质
为了推导优势函数的性质,这里给出定理1(大家可以从确定性策略角度来理解定理1,这里不给出该定理的证明):
定理1:
针对优势函数
两边同时对动作取最大值,得到
根据定理1进一步得到
接着对优势函数做变换,得到
不妨在上式右边减去 ,由于其等于0,等号不变,得到
看到这里小伙伴可能会疑惑,为什么要 减去 ,这不是多此一举吗?不急,这个后面2.4节会解释。至此,我们得到了定理2,这是一个非常重要的等式,Dueling Network就是根据定理2得到的。
定理2:
2.3 对偶网络
在DQN算法中,我们利用神经网络来近似最优动作值函数,从而得到最优策略,其中为网络参数。但是在Dueling Network中,我们利用神经网路来近似最优优势函数,利用神经网络来近似最优状态值函数,其中和均为网络参数。
根据定理2,将其中的用神经网络来替代,用神经网络来代替,得到
其中近似,它被成为Dueling Network。令,则
现在,我们发现Dueling Network与DQN完全一致,包括输入和输出,唯一不同在于神经网络的结构。Dueling Network的网络结构更好,因此它的表现比DQN更好。另外,DQN中的一些技巧也同样可以用于Dueling Network中,例如优先经验回放、Double DQN和Multi-step TD target等。
2.4 唯一性
观察下面两个等式:
下面的式子相较于上面的式子只是减去了,前面已经讨论过 ,既然如此,这一项有必要的吗,直接用上面的式子不就行了吗?
答案是有必要的。因为上面的式子存在一个缺点:无法通过学习来唯一确定和。举个栗子:令,,那么
因此结果不唯一。这种不唯一性会带来什么问题呢?
如果神经网络V和A上下波动,幅度相同,方向相反,那么网络的输出相同,但是由于两个网络都在上下波动,两个网络都不稳定,因此都训练不好。
针对上面式子的不唯一性,下面的式子可以解决。通过加入最大化强可以有效避免不唯一性问题,仍然用上面的栗子来分析,令,,那么
此时发生了变化,解决了不唯一性问题,这就是要加上最大化项的原因了。
2.5 两种数学形式
通过前面的推导,我们得到了Dueling Network的数学形式为
实际中将最大化形式变成均值形式效果更好,更稳定,其数学形式如下
这种替换没有任何理论依据,就是实验效果好,简单!粗暴!有效!
3 Dueling DQN算法代码
经验回放采用集中式均匀回放,代码如下(脚本buffer.py):
- import numpy as np
-
-
- class ReplayBuffer:
- def __init__(self, state_dim, action_dim, max_size, batch_size):
- self.mem_size = max_size
- self.mem_cnt = 0
- self.batch_size = batch_size
-
- self.state_memory = np.zeros((self.mem_size, state_dim))
- self.action_memory = np.zeros((self.mem_size, ))
- self.reward_memory = np.zeros((self.mem_size, ))
- self.next_state_memory = np.zeros((self.mem_size, state_dim))
- self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)
-
- def store_transition(self, state, action, reward, state_, done):
- mem_idx = self.mem_cnt % self.mem_size
-
- self.state_memory[mem_idx] = state
- self.action_memory[mem_idx] = action
- self.reward_memory[mem_idx] = reward
- self.next_state_memory[mem_idx] = state_
- self.terminal_memory[mem_idx] = done
-
- self.mem_cnt += 1
-
- def sample_buffer(self):
- mem_len = min(self.mem_size, self.mem_cnt)
-
- batch = np.random.choice(mem_len, self.batch_size, replace=False)
-
- states = self.state_memory[batch]
- actions = self.action_memory[batch]
- rewards = self.reward_memory[batch]
- states_ = self.next_state_memory[batch]
- terminals = self.terminal_memory[batch]
-
- return states, actions, rewards, states_, terminals
-
- def ready(self):
- return self.mem_cnt > self.batch_size
目标网络的更新方式为软更新,Dueling DQN的实现代码如下(脚本Dueling_DQN.py):
- import torch as T
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- import numpy as np
- from buffer import ReplayBuffer
-
- device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
-
-
- class DuelingDeepQNetwork(nn.Module):
- def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
- super(DuelingDeepQNetwork, self).__init__()
-
- self.fc1 = nn.Linear(state_dim, fc1_dim)
- self.fc2 = nn.Linear(fc1_dim, fc2_dim)
- self.V = nn.Linear(fc2_dim, 1)
- self.A = nn.Linear(fc2_dim, action_dim)
-
- self.optimizer = optim.Adam(self.parameters(), lr=alpha)
- self.to(device)
-
- def forward(self, state):
- x = T.relu(self.fc1(state))
- x = T.relu(self.fc2(x))
-
- V = self.V(x)
- A = self.A(x)
-
- return V, A
-
- def save_checkpoint(self, checkpoint_file):
- T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)
-
- def load_checkpoint(self, checkpoint_file):
- self.load_state_dict(T.load(checkpoint_file))
-
-
- class DuelingDQN:
- def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
- gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-4,
- max_size=1000000, batch_size=256):
- self.gamma = gamma
- self.tau = tau
- self.batch_size = batch_size
- self.epsilon = epsilon
- self.eps_min = eps_end
- self.eps_dec = eps_dec
- self.checkpoint_dir = ckpt_dir
- self.action_space = [i for i in range(action_dim)]
-
- self.q_eval = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
- fc1_dim=fc1_dim, fc2_dim=fc2_dim)
- self.q_target = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
- fc1_dim=fc1_dim, fc2_dim=fc2_dim)
-
- self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
- max_size=max_size, batch_size=batch_size)
-
- self.update_network_parameters(tau=1.0)
-
- def update_network_parameters(self, tau=None):
- if tau is None:
- tau = self.tau
-
- for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
- q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)
-
- def remember(self, state, action, reward, stata_, done):
- self.memory.store_transition(state, action, reward, stata_, done)
-
- def decrement_epsilon(self):
- self.epsilon = self.epsilon - self.eps_dec \
- if self.epsilon > self.eps_min else self.eps_min
-
- def choose_action(self, observation, isTrain=True):
- state = T.tensor([observation], dtype=T.float).to(device)
- _, A = self.q_eval.forward(state)
- action = T.argmax(A).item()
-
- if (np.random.random() < self.epsilon) and isTrain:
- action = np.random.choice(self.action_space)
-
- return action
-
- def learn(self):
- if not self.memory.ready():
- return
-
- states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
- batch_idx = T.arange(self.batch_size, dtype=T.long).to(device)
- states_tensor = T.tensor(states, dtype=T.float).to(device)
- actions_tensor = T.tensor(actions, dtype=T.long).to(device)
- rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
- next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
- terminals_tensor = T.tensor(terminals).to(device)
-
- with T.no_grad():
- V_, A_ = self.q_target.forward(next_states_tensor)
- q_ = V_ + A_ - T.mean(A_, dim=-1, keepdim=True)
- q_[terminals_tensor] = 0.0
- target = rewards_tensor + self.gamma * T.max(q_, dim=-1)[0]
- V, A = self.q_eval.forward(states_tensor)
- q = (V + A - T.mean(A, dim=-1, keepdim=True))[batch_idx, actions_tensor]
-
- loss = F.mse_loss(q, target.detach())
- self.q_eval.optimizer.zero_grad()
- loss.backward()
- self.q_eval.optimizer.step()
-
- self.update_network_parameters()
- self.decrement_epsilon()
-
- def save_models(self, episode):
- self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_{}.pth'.format(episode))
- print('Saving Q_eval network successfully!')
- self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_{}.pth'.format(episode))
- print('Saving Q_target network successfully!')
-
- def load_models(self, episode):
- self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_{}.pth'.format(episode))
- print('Loading Q_eval network successfully!')
- self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_{}.pth'.format(episode))
- print('Loading Q_target network successfully!')
算法仿真环境为gym库中的LunarLander-v2,因此需要先配置好gym库。进入Anaconda3中对应的Python环境中,执行下面的指令
pip install gym
但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏和经典控制环境,无法使用LunarLander-v2。因此还需要安装一些其他依赖项,具体可以参考我的这篇博文:AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法
让智能体在环境中训练500轮,训练代码如下(脚本train.py):
- import gym
- import numpy as np
- import argparse
- from utils import plot_learning_curve, create_directory
- from Dueling_DQN import DuelingDQN
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--max_episodes', type=int, default=500)
- parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DuelingDQN/')
- parser.add_argument('--reward_path', type=str, default='./output_images/reward.png')
- parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')
-
- args = parser.parse_args()
-
-
- def main():
- env = gym.make('LunarLander-v2')
- agent = DuelingDQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
- fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
- eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
- create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
- total_rewards, avg_rewards, eps_history = [], [], []
-
- for episode in range(args.max_episodes):
- total_reward = 0
- done = False
- observation = env.reset()
- while not done:
- action = agent.choose_action(observation, isTrain=True)
- observation_, reward, done, info = env.step(action)
- agent.remember(observation, action, reward, observation_, done)
- agent.learn()
- total_reward += reward
- observation = observation_
-
- total_rewards.append(total_reward)
- avg_reward = np.mean(total_rewards[-100:])
- avg_rewards.append(avg_reward)
- eps_history.append(agent.epsilon)
- print('EP:{} reward:{} avg_reward:{} epsilon:{}'.
- format(episode + 1, total_reward, avg_reward, agent.epsilon))
-
- if (episode + 1) % 50 == 0:
- agent.save_models(episode + 1)
-
- episodes = [i for i in range(args.max_episodes)]
- plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
- plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)
-
-
- if __name__ == '__main__':
- main()
训练时还会用到画图函数和创建文件夹函数,它们均放置在utils.py脚本中,具体代码如下:
- import os
- import matplotlib.pyplot as plt
-
-
- def plot_learning_curve(episodes, records, title, ylabel, figure_file):
- plt.figure()
- plt.plot(episodes, records, linestyle='-', color='r')
- plt.title(title)
- plt.xlabel('episode')
- plt.ylabel(ylabel)
-
- plt.show()
- plt.savefig(figure_file)
-
-
- def create_directory(path: str, sub_dirs: list):
- for sub_dir in sub_dirs:
- if os.path.exists(path + sub_dir):
- print(path + sub_dir + ' is already exist!')
- else:
- os.makedirs(path + sub_dir, exist_ok=True)
- print(path + sub_dir + ' create successfully!')
仿真结果如下图所示:
平均累积奖励曲线
epsilon变化曲线
通过平均累积奖励可以看出,Dueling DQN算法大约在250步时趋于收敛。