本篇文章是 OpenAI Spinnging Up 中 Part 3: Intro to Policy Optimization 中代码的学习笔记, 原文在 https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html , 代码在 https://github.com/openai/spinningup/blob/master/spinup/examples/pytorch/pg_math/1_simple_pg.py .

先给出代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import torch
import torch.nn as nn
from torch.distributions.categorical import Categorical
from torch.optim import Adam
import numpy as np
import gym
from gym.spaces import Discrete, Box

def mlp(sizes, activation=nn.Tanh, output_activation=nn.Identity):
# Build a feedforward neural network.
layers = []
for j in range(len(sizes)-1):
act = activation if j < len(sizes)-2 else output_activation
layers += [nn.Linear(sizes[j], sizes[j+1]), act()]
return nn.Sequential(*layers)

def train(env_name='CartPole-v0', hidden_sizes=[32], lr=1e-2,
epochs=50, batch_size=5000, render=False):

# make environment, check spaces, get obs / act dims
env = gym.make(env_name)
assert isinstance(env.observation_space, Box), \
"This example only works for envs with continuous state spaces."
assert isinstance(env.action_space, Discrete), \
"This example only works for envs with discrete action spaces."

obs_dim = env.observation_space.shape[0]
n_acts = env.action_space.n

# make core of policy network
logits_net = mlp(sizes=[obs_dim]+hidden_sizes+[n_acts])

# make function to compute action distribution
def get_policy(obs):
logits = logits_net(obs)
return Categorical(logits=logits)

# make action selection function (outputs int actions, sampled from policy)
def get_action(obs):
return get_policy(obs).sample().item()

# make loss function whose gradient, for the right data, is policy gradient
def compute_loss(obs, act, weights):
logp = get_policy(obs).log_prob(act)
return -(logp * weights).mean()

# make optimizer
optimizer = Adam(logits_net.parameters(), lr=lr)

# for training policy
def train_one_epoch():
# make some empty lists for logging.
batch_obs = [] # for observations
batch_acts = [] # for actions
batch_weights = [] # for R(tau) weighting in policy gradient
batch_rets = [] # for measuring episode returns
batch_lens = [] # for measuring episode lengths

# reset episode-specific variables
obs = env.reset() # first obs comes from starting distribution
done = False # signal from environment that episode is over
ep_rews = [] # list for rewards accrued throughout ep

# render first episode of each epoch
finished_rendering_this_epoch = False

# collect experience by acting in the environment with current policy
while True:

# rendering
if (not finished_rendering_this_epoch) and render:
env.render()

# save obs
batch_obs.append(obs.copy())

# act in the environment
act = get_action(torch.as_tensor(obs, dtype=torch.float32))
obs, rew, done, _ = env.step(act)

# save action, reward
batch_acts.append(act)
ep_rews.append(rew)

if done:
# if episode is over, record info about episode
ep_ret, ep_len = sum(ep_rews), len(ep_rews)
batch_rets.append(ep_ret)
batch_lens.append(ep_len)

# the weight for each logprob(a|s) is R(tau)
batch_weights += [ep_ret] * ep_len

# reset episode-specific variables
obs, done, ep_rews = env.reset(), False, []

# won't render again this epoch
finished_rendering_this_epoch = True

# end experience loop if we have enough of it
if len(batch_obs) > batch_size:
break

# take a single policy gradient update step
optimizer.zero_grad()
batch_loss = compute_loss(obs=torch.as_tensor(batch_obs, dtype=torch.float32),
act=torch.as_tensor(batch_acts, dtype=torch.int32),
weights=torch.as_tensor(batch_weights, dtype=torch.float32)
)
batch_loss.backward()
optimizer.step()
return batch_loss, batch_rets, batch_lens

# training loop
for i in range(epochs):
batch_loss, batch_rets, batch_lens = train_one_epoch()
print('epoch: %3d \t loss: %.3f \t return: %.3f \t ep_len: %.3f'%
(i, batch_loss, np.mean(batch_rets), np.mean(batch_lens)))

if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--env_name', '--env', type=str, default='CartPole-v0')
parser.add_argument('--render', action='store_true')
parser.add_argument('--lr', type=float, default=1e-2)
args, unknown = parser.parse_known_args()
print('\nUsing simplest formulation of policy gradient.\n')
train(env_name=args.env_name, render=args.render, lr=args.lr)

这里我们会对大部分函数以及一些变量一一解析, 其中一些 Pytorch 的 API 可以参考我的这篇文章或者官方文档 .

mlp

1
2
3
4
5
6
7
def mlp(sizes, activation=nn.Tanh, output_activation=nn.Identity):
# Build a feedforward neural network.
layers = []
for j in range(len(sizes)-1):
act = activation if j < len(sizes)-2 else output_activation
layers += [nn.Linear(sizes[j], sizes[j+1]), act()]
return nn.Sequential(*layers)

依据输入返回一个神经网络.

参数

  • sizes

    其中包含神经网络的层数以及节点数信息

  • activation

    节点的激活函数, 这里默认是 nn.Tanh 也就是 $\tanh$ 函数

  • output_activation

    输出的激活函数

解析

layers 中的每一个元素就是神经网络的一部分 (节点与激活函数), 而 nn.Sequential(*layers) 是将这些部分组合成一个神经网络. 其中

1
2
3
for j in range(len(sizes)-1):
act = activation if j < len(sizes)-2 else output_activation
layers += [nn.Linear(sizes[j], sizes[j+1]), act()]

这个循环, act 指的是激活函数, 当该层不是最后一层时使用 activation , 是时使用 output_activation 作为激活函数.

get_policy

1
2
3
def get_policy(obs):
logits = logits_net(obs)
return Categorical(logits=logits)

依据环境计算出动作的对数概率, 并依此返回一个 Categorical 对象.

参数

  • obs

    环境的参数, 描述了环境

解析

logits_net 是一个神经网络, 接受参数后输出最终结果 (动作的对数概率). 至于 Categorical 对象请自行了解.

get_action

1
2
def get_action(obs):
return get_policy(obs).sample().item()

参数

  • obs

    环境的参数, 描述了环境.

解析

利用 Categorical 对象采样动作.

compute_loss

1
2
3
def compute_loss(obs, act, weights):
logp = get_policy(obs).log_prob(act)
return -(logp * weights).mean()

计算损失.

参数

  • obs

    环境的参数, 描述了环境

  • act

    采样的动作

  • weights

    某项的权重

解析

损失函数对参数的梯度要和期望回报对参数的梯度相同, 而期望回报对参数的梯度的估计式为
$$
\hat{g}=\frac{1}{|\mathcal{D}|}\sum_{\tau\in\mathcal{D}}\sum^T_{t=0}\nabla_\theta\log \pi_\theta(a_t\mid s_t)R(\tau)
$$
logp 其实就是 $\log \pi_\theta(a_t\mid s_t)$ , 而 weight 其实就是 $R(\tau)$ . 因此该函数返回的其实就是
$$
\frac{1}{|\mathcal{D}|}\sum_{\tau\in\mathcal{D}}\sum^T_{t=0}\log \pi_\theta(a_t\mid s_t)R(\tau)
$$
对 $\theta$ 求导后正是我们的梯度.

train_one_epoch

这是训练一个 epoch 的函数 (神经网络参数更新一次) .

解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if done:
# if episode is over, record info about episode
ep_ret, ep_len = sum(ep_rews), len(ep_rews)
batch_rets.append(ep_ret)
batch_lens.append(ep_len)

# the weight for each logprob(a|s) is R(tau)
batch_weights += [ep_ret] * ep_len

# reset episode-specific variables
obs, done, ep_rews = env.reset(), False, []

# won't render again this epoch
finished_rendering_this_epoch = True

# end experience loop if we have enough of it
if len(batch_obs) > batch_size:
break
1
2
3
4
5
6
7
8
# take a single policy gradient update step
optimizer.zero_grad()
batch_loss = compute_loss(obs=torch.as_tensor(batch_obs, dtype=torch.float32),
act=torch.as_tensor(batch_acts, dtype=torch.int32),
weights=torch.as_tensor(batch_weights, dtype=torch.float32)
)
batch_loss.backward()
optimizer.step()

依据 batch_size 确定走一个 epoch 走多少步. 然后当某个轨迹结束时 (也就是 done ) , 会计算总的回报, 然后通过 compute_loss 计算损失, 同时通过 Pytorch 的自动求导机制算出梯度, 然后用 optimizer (Adam 算法) 更新.

train

整个过程其实就是重复多个 epoch , 然后最终训练 epochs 次. 如果需要利用 Gym 的可视化, 可以将 render 参数设为 True

参数

  • env_name

    Gym 环境的名称

  • hidden_sizes

    神经网络隐藏节点数, 可以自行调整

  • lr

    学习率.

  • epochs

    训练的 epoch 数

  • batch_size

    一次 epoch 行动的次数

  • render

    Gym 是否可视化 (True or False)