1.Q-learning 算法

核心公式:

Q-learning算法是一种免模型的算法,核心思想就是基于价值,实际上就是在填一张状态-价值表,初始化都是为0,基于当前状态,计算出所有动作的reward分数,在$\epsilon$的概率下会选择随机的动作,1-$\epsilon$的概率下会选择最高分数的工作,$\epsilon$会随着学习的过程逐步衰减,这个学习过程也被称为”先探索再稳定”,实际上整个学习过程,就是在不断的去填这张状态-动作价值表,这张表相当于策略,在后面的决策动作时,会依据这张表来采取相应的动作(action).

当时Q-learning主要存在以下缺陷:

  • 1、存在维度灾难问题
  • 2、只能处理离散状态的决策问题,不能处理连续状态的决策问题
  • 3、训练不稳定

因此,DQN的提出,用深度学习模型来近似动作价值函数的方法,解决了Q-learning维度灾难的问题、只能处理离散状态的问题,并且改善了训练的稳定性。

2.DQN算法

主要思想:

DQN主要有两个改进点:

一、经验回放机制

当产生一条新的数据时,先$ e_t = (S_t, A_t, R_t, S_{t+1}, \text{Done})$,不会立即用这条数据来进行训练,而是先把这条数据存放到Repaly Buffer中(大小为N的参数),如果存满了,则会按照先进先出的原则,丢弃掉最早进入Buffer的数据,训练时,会随机从Buffer中取出一条数据来进行训练。

解决的问题:

  • 1、打破数据的时间相关性: 原本的序列数据,存在相关性,通过经验回放机制,可以打破时间相关性,学习到的规律更通用。
  • 2、样本可以复用:原本可以复用,解决之前数据用一次就丢掉的问题,一条数据可能被多次抽中进行训练
  • 3、提高训练稳定性

**细节:**经验回放机制在取数据的时候,会按照一定概率取随机取数据或取最大的Q值动作,这个过程是为了保证训练过程中,先进行探索再逐步稳定。

二、使用策略网络和目标网络

  • 这样可以提高训练稳定性避免Q值发散,在实际中,先更新策略网络,把目标值进行固定,达到设定的步数c时,才会将策略网络复制更新到目标网络中

解决的问题:

这样可以提高训练稳定性,避免Q值发散,在实际中,先更新策略网络,把目标值进行固定,达到设定的步数c时,才会将策略网络复制更新到目标网络中。

**为什么要这样做?**答:因为原本的目标网络和策略网路的参数存在相关性,状态的改变对两者的学习都有影响,通过先固定目标网络,可以避免单个样本造成的错误估计,避免Q值发散,使得网络训练更加稳定。

3.DQN算法实现过程

1.策略网络和目标网路的网络结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import torch.nn as nn
import torch.nn.functional as F
class MLP(nn.Module):
def __init__(self, n_states,n_actions,hidden_dim=128):
""" 初始化q网络,为全连接网络
"""
super(MLP, self).__init__()
self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层
self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层
self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层

def forward(self, x):
# 各层对应的激活函数
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)

2.经验回放机制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from collections import deque # 队列
import random
class ReplayBuffer(object):
def __init__(self, capacity: int) -> None:
self.capacity = capacity
self.buffer = deque(maxlen=self.capacity)
def push(self,transitions):
''' 存储transition到经验回放中
'''
self.buffer.append(transitions)
def sample(self, batch_size: int, sequential: bool = False):
if batch_size > len(self.buffer): # 如果批量大小大于经验回放的容量,则取经验回放的容量
batch_size = len(self.buffer)
if sequential: # 顺序采样
rand = random.randint(0, len(self.buffer) - batch_size)
batch = [self.buffer[i] for i in range(rand, rand + batch_size)]
return zip(*batch)
else: # 随机采样
batch = random.sample(self.buffer, batch_size)
return zip(*batch)
def clear(self):
''' 清空经验回放
'''
self.buffer.clear()
def __len__(self):
''' 返回当前存储的量
'''
return len(self.buffer)

3.DQN智能体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import torch
import torch.optim as optim
import math
import numpy as np
class DQN:
def __init__(self,model,memory,cfg): # !初始化超参数、 模型、以及经验回放池
self.n_actions = cfg['n_actions']
self.device = torch.device(cfg['device'])
self.gamma = cfg['gamma'] # 奖励的折扣因子
# e-greedy策略相关参数
self.sample_count = 0 # 用于epsilon的衰减计数
self.epsilon = cfg['epsilon_start']
self.sample_count = 0
self.epsilon_start = cfg['epsilon_start']
self.epsilon_end = cfg['epsilon_end']
self.epsilon_decay = cfg['epsilon_decay']
self.batch_size = cfg['batch_size']
self.policy_net = model.to(self.device)
self.target_net = model.to(self.device)
# 复制策略网络参数到目标网络
for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()):
target_param.data.copy_(param.data)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg['lr']) # 优化器
self.memory = memory # 经验回放
def sample_action(self, state):
''' 采样动作
'''
self.sample_count += 1
# epsilon指数衰减
self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
math.exp(-1. * self.sample_count / self.epsilon_decay)
if random.random() > self.epsilon: # 取最大Q值
with torch.no_grad():
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
q_values = self.policy_net(state)
action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value 最大Q值的下标
else: # 随机采样
action = random.randrange(self.n_actions)
return action
@torch.no_grad() # 不计算梯度,该装饰器效果等同于with torch.no_grad():
def predict_action(self, state):
''' 预测动作
'''
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
q_values = self.policy_net(state)
action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
return action
def update(self):
if len(self.memory) < self.batch_size: # !当经验回放中不满足一个批量时,不更新策略
return
# 从经验回放中随机采样一个批量的转移(transition)
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(
self.batch_size)
# 将数据转换为tensor
state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)
action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(np.array(object=next_state_batch), device=self.device, dtype=torch.float)
done_batch = torch.tensor(np.float32(done_batch), device=self.device)
q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 计算当前状态(s_t,a)对应的Q(s_t, a)
next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 计算下一时刻的状态(s_t_,a)对应的Q值
# 计算期望的Q值,对于终止状态,此时done_batch[0]=1, 对应的expected_q_value等于reward
expected_q_values = reward_batch + self.gamma * next_q_values * (1-done_batch)
loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1)) # 计算均方根损失
# 优化更新模型
self.optimizer.zero_grad()
loss.backward()
# clip防止梯度爆炸
for param in self.policy_net.parameters(): # !进行梯度裁剪
param.grad.data.clamp_(-1, 1)
self.optimizer.step() # 进行梯度更新

3.训练测试函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def train(cfg, env, agent):
''' 训练
'''
print("开始训练!")
rewards = [] # 记录所有回合的奖励
steps = []
for i_ep in range(cfg['train_eps']):
ep_reward = 0 # 记录一回合内的奖励
ep_step = 0
state = env.reset(seed=cfg['seed']) if cfg['seed'] != 0 else env.reset() # 重置环境,返回初始状态
# 处理新版gym返回的(state, info)元组
if isinstance(state, tuple):
state = state[0]
for _ in range(cfg['ep_max_steps']):
ep_step += 1
action = agent.sample_action(state) # 选择动作
result = env.step(action) # 更新环境,返回transition
# 处理不同版本的返回值
if len(result) == 5:
next_state, reward, terminated, truncated, _ = result
done = terminated or truncated
else:
next_state, reward, done, _ = result
agent.memory.push((state, action, reward, next_state, done)) # 保存transition
state = next_state # 更新下一个状态
agent.update() # 更新智能体
ep_reward += reward # 累加奖励
if done:
break
if (i_ep + 1) % cfg['target_update'] == 0: # 智能体目标网络更新
agent.target_net.load_state_dict(agent.policy_net.state_dict())
steps.append(ep_step)
rewards.append(ep_reward)
if (i_ep + 1) % 10 == 0:
print(f"回合:{i_ep+1}/{cfg['train_eps']},奖励:{ep_reward:.2f},Epislon:{agent.epsilon:.3f}")
print("完成训练!")
env.close()
return {'rewards':rewards}

def test(cfg, env, agent):
print("开始测试!")
rewards = [] # 记录所有回合的奖励
steps = []
for i_ep in range(cfg['test_eps']):
ep_reward = 0 # 记录一回合内的奖励
ep_step = 0
state = env.reset() # 重置环境,返回初始状态
# 处理新版gym返回的(state, info)元组
if isinstance(state, tuple):
state = state[0]
for _ in range(cfg['ep_max_steps']):
ep_step+=1
action = agent.predict_action(state) # 选择动作
result = env.step(action) # 更新环境,返回transition
# 处理不同版本的返回值
if len(result) == 5:
next_state, reward, terminated, truncated, _ = result
done = terminated or truncated
else:
next_state, reward, done, _ = result
state = next_state # 更新下一个状态
ep_reward += reward # 累加奖励
if done:
break
steps.append(ep_step)
rewards.append(ep_reward)
print(f"回合:{i_ep+1}/{cfg['test_eps']},奖励:{ep_reward:.2f}")
print("完成测试")
env.close()
return {'rewards':rewards}

4.定义环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import gym
import os
def all_seed(env,seed = 1):
''' 万能的seed函数
'''
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed) # config for CPU
torch.cuda.manual_seed(seed) # config for GPU
os.environ['PYTHONHASHSEED'] = str(seed) # config for python scripts
# config for cudnn
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled = False
def env_agent_config(cfg):
env = gym.make(cfg['env_name']) # 创建环境
if cfg['seed'] !=0:
all_seed(env,seed=cfg['seed'])
n_states = env.observation_space.shape[0]
n_actions = env.action_space.n
print(f"状态空间维度:{n_states},动作空间维度:{n_actions}")
cfg.update({"n_states":n_states,"n_actions":n_actions}) # 更新n_states和n_actions到cfg参数中
model = MLP(n_states, n_actions, hidden_dim = cfg['hidden_dim']) # 创建模型
memory = ReplayBuffer(cfg['memory_capacity'])
agent = DQN(model,memory,cfg)
return env,agent

5.参数设置及功能函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import argparse
import matplotlib.pyplot as plt
import seaborn as sns
def get_args():
""" 超参数
"""
parser = argparse.ArgumentParser(description="hyperparameters")
parser.add_argument('--algo_name',default='DQN',type=str,help="name of algorithm")
parser.add_argument('--env_name',default='CartPole-v0',type=str,help="name of environment")
parser.add_argument('--train_eps',default=200,type=int,help="episodes of training")
parser.add_argument('--test_eps',default=20,type=int,help="episodes of testing")
parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps")
parser.add_argument('--gamma',default=0.95,type=float,help="discounted factor")
parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon")
parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon")
parser.add_argument('--epsilon_decay',default=500,type=int,help="decay rate of epsilon, the higher value, the slower decay")
parser.add_argument('--lr',default=0.0001,type=float,help="learning rate")
parser.add_argument('--memory_capacity',default=100000,type=int,help="memory capacity")
parser.add_argument('--batch_size',default=64,type=int)
parser.add_argument('--target_update',default=4,type=int)
parser.add_argument('--hidden_dim',default=256,type=int)
parser.add_argument('--device',default='cpu',type=str,help="cpu or cuda")
parser.add_argument('--seed',default=10,type=int,help="seed")
args = parser.parse_args([])
args = {**vars(args)} # 转换成字典类型
## 打印超参数
print("超参数")
print(''.join(['=']*80))
tplt = "{:^20}\t{:^20}\t{:^20}"
print(tplt.format("Name", "Value", "Type"))
for k,v in args.items():
print(tplt.format(k,v,str(type(v))))
print(''.join(['=']*80))
return args
def smooth(data, weight=0.9):
'''用于平滑曲线,类似于Tensorboard中的smooth曲线
'''
last = data[0]
smoothed = []
for point in data:
smoothed_val = last * weight + (1 - weight) * point # 计算平滑值
smoothed.append(smoothed_val)
last = smoothed_val
return smoothed

def plot_rewards(rewards,cfg, tag='train'):
''' 画图
'''
sns.set()
plt.figure() # 创建一个图形实例,方便同时多画几个图
plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algo_name']} for {cfg['env_name']}")
plt.xlabel('epsiodes')
plt.plot(rewards, label='rewards')
plt.plot(smooth(rewards), label='smoothed')
plt.legend()
plt.show()

6.训练

1
2
3
4
5
6
7
8
9
10
# 获取参数
cfg = get_args()
# 训练
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)

plot_rewards(res_dic['rewards'], cfg, tag="train")
# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test") # 画出结果

4.训练过程图