其实很多时候我们可能会有以下的困惑
torchrl是一个基于pytorch的强化学习库,我发现根据torchrl的结构可以对强化学习知识点有更加深入的理解,相信在学习了torchrl之后,我们之前的疑惑可以获得解决。下面将我的理解记录如下:
torch rl的输入和输出都是一个叫做tensordict的类,可以理解为本来python函数的输入是一个数组,或者字典,现在torch rl直接强制要求输入是一个字典。使用这个方法增加了灵活性,可以使得模块可以更好的重用。
一个完整的强化学习包含了不同部分:
| 名称 | 解释 | torch rl基类 | 是否必须 |
|---|---|---|---|
| 环境 | 用来采集数据 | EnvBase | 是 |
| actor | 用来生成策略 | Actor | |
| critic | 估计状态的价值 | ValueOperator | 是 |
| loss function | 计算actor损失和critic损失 | LossModule | 是 |
| replay buffer | 存储从环境中采样的数据,类似于一个dataset | ReplayBuffer | 否 |
| collector | actor从环境中收集数据的过程抽象为一个类 | DataCollectorBase | 否 |
| trainer | 整个训练过程的抽象 | Trainer | 否 |
下面是使用torchrl实现了PPO算法:
import torch
from tensordict.nn import TensorDictModule
from tensordict.nn.distributions import NormalParamExtractor
from torch import nn
from torchrl.collectors import SyncDataCollector
from torchrl.data.replay_buffers import TensorDictReplayBuffer, \
LazyTensorStorage, SamplerWithoutReplacement
from torchrl.envs.libs.gym import GymEnv
from torchrl.modules import ProbabilisticActor, ValueOperator, TanhNormal
from torchrl.objectives import ClipPPOLoss
from torchrl.objectives.value import GAE
env = GymEnv("Pendulum-v1")
model = TensorDictModule(
nn.Sequential(
nn.Linear(3, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 2),
NormalParamExtractor()
),
in_keys=["observation"],
out_keys=["loc", "scale"]
)
critic = ValueOperator(
nn.Sequential(
nn.Linear(3, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 128), nn.Tanh(),
nn.Linear(128, 1),
),
in_keys=["observation"],
)
actor = ProbabilisticActor(
model,
in_keys=["loc", "scale"],
distribution_class=TanhNormal,
distribution_kwargs={"min": -1.0, "max": 1.0},
return_log_prob=True
)
buffer = TensorDictReplayBuffer(
LazyTensorStorage(1000),
SamplerWithoutReplacement()
)
collector = SyncDataCollector(
env,
actor,
frames_per_batch=1000,
total_frames=1_000_000
)
loss_fn = ClipPPOLoss(actor, critic, gamma=0.99)
optim = torch.optim.Adam(loss_fn.parameters(), lr=2e-4)
adv_fn = GAE(value_network=critic, gamma=0.99, lmbda=0.95, average_gae=True)
for data in collector: # collect data
for epoch in range(10):
adv_fn(data) # compute advantage
buffer.extend(data.view(-1))
for i in range(20): # consume data
sample = buffer.sample(50) # mini-batch
loss_vals = loss_fn(sample)
loss_val = sum(
value for key, value in loss_vals.items() if
key.startswith("loss")
)
loss_val.backward()
optim.step()
optim.zero_grad()
print(f"avg reward: {data['next', 'reward'].mean().item(): 4.4f}")
可以看出,整个过程就是组合了不同的部分,使得一个复杂的强化学习过程变得模块化。唯一小小的区别在于这里没有使用trainer,而是手动迭代了。
下面对每个部分进行解释:
具体的位置: https://github.dev/pytorch/rl/blob/bf264e0e24971fc05ec42b571de7b8df84043a51/torchrl/envs/common.py
可以使用torchrl自带的环境,如果是一个自己实现的环境:
| 名称 | 解释 |
|---|---|
| action_spec | 动作空间的shape |
| reward_spec | 奖励的shape |
| done_spec | 一个trajectory是否结束的shape |
| observation_spec | 观测空间的shape |
注意每对xxx_spec赋值一次都会将key和value收集到full_xxx_spec这个属性中,也可以直接对full_xxx_spec赋值,这样就支持了一个环境有多个reward,多个action等等,用于multi actor非常有用
如果希望对环境的输入输出进行映射,那么可以用Transform这个类,具体见:https://github.dev/pytorch/rl/blob/bf264e0e24971fc05ec42b571de7b8df84043a51/torchrl/envs/common.py
具体见:https://github.dev/pytorch/rl/blob/bf264e0e24971fc05ec42b571de7b8df84043a51/torchrl/envs/common.py
Actor类主要有两个可以使用的:
Actor和ProbabilisticActor的区别:
Actor网络输出是一个动作,而ProbabilisticActor输出是动作分布的参数,它们对应了确定和随机两种情况。确定的动作目标函数中对s,a的分布没有要求,而随机动作中目标函数对s,a的分布要求是符合当前策略的分布。因此当使用随机动作时,之前策略采集的数据无法再次使用(策略在迭代过程中被修改了),此时需要使用importace sampling技术,修改reward的权值。
另外Actor只能用于连续的动作空间,而ProbabilisticActor可以用于连续和离散的动作空间。对于离散的空间,只需要将输出的n个维度作为n个动作的概率,对于连续的空间,将输入作为分布的参数,然后对分布进行采样。
| 比较项 | Actor | ProbabilisticActor |
|---|---|---|
| actor网络输出值 | 动作本身 | 动作概率分布的参数 |
| 对应神经网络算法 | DDPG | 其他绝大多数算法 |
| 目标函数对s,a分布的要求 | 无 | s,a分布符合当前的策略 |
| 是否需要importance sampling | 否 | 是 |
| 策略类型 | off-policy | on-policy |
| 动作空间是否连续 | 是 | 都行 |
一般建议使用的是ProbabilisticActor
注意ValueOperator选择两种情况,
| 比较项 | in_key 中有action | in_key中没有action |
|---|---|---|
| 输出 | state_value | state_action_value |
| 能否使用gae | 能 | 不能 |
| 动作是否需要确定 | 随机 | 确定(DDPG) |
torch rl支持分别设置actor和critic,即Actor类和ValueOperator类,也支持通过一个ActorCriticOperator同时设置actor和critic。
具体看文档
对于value loss来说,需要指定估计目标价值的方法,可以理解为value net需要拟合到一个标签,或者ground truth,而强化学习中这个ground truth是未知的,需要根据当前采样+当前value net的输出进行计算,这些计算方法主要区别在于使用多少步迭代的数据进行估计。比如采样的数据是
a
1
,
s
1
,
r
1
,
a
2
,
s
2
,
r
2
,
.
.
.
.
,
a
n
,
s
n
,
r
n
a_1, s_1, r_1, a_2, s_2, r_2, ...., a_n, s_n, r_n
a1,s1,r1,a2,s2,r2,....,an,sn,rn
假设在sn之后到达了结束状态,那么算法计算
s
1
s_1
s1状态价值ground truth的方法是:
| 算法 | 随机动作计算方法 | 确定动作计算方法 |
|---|---|---|
| TD(0) | r 1 + c r i t i c ( s 2 ) r_1 + critic(s_2) r1+critic(s2) | r 1 + c r i t i c ( s 2 , a 2 ) r_1 + critic(s_2, a_2) r1+critic(s2,a2) |
| TD(1) | r 1 + r 2 + . . . + c r i t i c ( s n ) r_1 + r_2 + ... + critic(s_n) r1+r2+...+critic(sn) | r 1 + r 2 + . . . + c r i t i c ( s n , a n ) r_1 + r_2 + ... + critic(s_n, a_n) r1+r2+...+critic(sn,an) |
| TD(lambda) | 考虑了TD(0), … TD(1)的加权求和 | 考虑了TD(0), … TD(1)的加权求和 |
| gae | r 1 − c r i t i c ( s 1 ) + c r i t i c ( s 2 ) r_1 - critic(s1)+critic(s_2) r1−critic(s1)+critic(s2) | 无法计算 |
简单来说:
另外在reward之前会有一个系数,具体看代码实现:
# TD0
advantage = reward + gamma * not_terminated * next_state_value
# TD1
gamma = [g1, g2, g3, g4]
value = [v1, v2, v3, v4]
return = [
v1 + g1 v2 + g1 g2 v3 + g1 g2 g3 v4,
v2 + g2 v3 + g2 g3 v4,
v3 + g3 v4,
v4,
]
使用两个网络/暂缓更新机制:
这是为了防止value net或者action net更新太快导致模型不稳定。这个不是必须的,可以酌情使用。在torchrl的损失函数中专门有个参数:delay_actor和delay_value可以控制是否需要暂缓更新。
delay_actor (bool, optional): whether to separate the target actor networks from the actor networks used for
data collection. Default is ``False``.
delay_value (bool, optional): whether to separate the target value networks from the value networks used for
data collection. Default is ``True``.
实际使用时,不同的算法选择不同的损失函数
| 损失函数 | 解释 |
|---|---|
| A2CLoss | 随机动作损失函数是:-action_prop * state_value |
| DDPGLoss | 确定动作损失函数是: -state_action_value |
| ClipPPOLoss | 通过裁剪actor loss,减小策略更新的速度,使得策略更加稳定,公式为: loss = -min( weight * advantage, min(max(weight, 1-eps), 1+eps) * advantage) |
| KLPENPPOLoss | 通过裁剪actor loss,减小策略更新的速度,使得策略更加稳定,公式为: loss = -min( weight * advantage, min(max(weight, 1-eps), 1+eps) * advantage) |
还有些其他的损失,比如SAC,TD3等等,这里先不说了,之后会有相关算法的完整总结。
类似与dataset,对于off-policy的直接用,对于on-policy的,需要确保使用了importance sampling再用。很多算法在loss中就内置了importance sampling,所以说基本上都可以用。
用来采集数据的,采集好的数据放入replay buffer,可以用来训练
相当于一个外循环套着一个内循环:在外循环里面收集数据,然后填充replay buffer。如果收集到足够的数据,就更新若干次。
def train(self):
if self.progress_bar:
self._pbar = tqdm(total=self.total_frames)
self._pbar_str = {}
for batch in self.collector:
batch = self._process_batch_hook(batch)
current_frames = (
batch.get(("collector", "mask"), torch.tensor(batch.numel()))
.sum()
.item()
* self.frame_skip
)
self.collected_frames += current_frames
self._pre_steps_log_hook(batch)
if self.collected_frames > self.collector.init_random_frames:
self.optim_steps(batch)
self._post_steps_hook()
self._post_steps_log_hook(batch)
if self.progress_bar:
self._pbar.update(current_frames)
self._pbar_description()
if self.collected_frames >= self.total_frames:
self.save_trainer(force_save=True)
break
self.save_trainer()
self.collector.shutdown()
optim_steps的内部其实也是一个循环,在这个循环中更新参数
PPO修改的是critic loss,对critic网络的loss进行了裁剪,主要有两种方法,对应了两个PPO loss,一个方法是裁剪loss 函数,另一个方法是在损失函数中加入KL散度进行调整,两种方法都是希望损失函数不要变化太大,从而更新太多引起模型不稳定。
DDPG修改的是actor loss,将随机动作变为确定动作。
可以,PPO裁剪的是critic的损失,而DDPG是修改为确定的动作,如果希望PPO输出的是一个确定的动作,那么就是PPO和DDPG结合了。结合之后的算法变为了off policy的算法
只有on-policy算法需要,比如PPO, A2C之类的,对于DDPG,DQN是不需要的
换句话说,输出的是一个确定的策略,而不是一个分布,那么不需要,否则需要。
输出确定策略的都能用,输出随机策略的,如果用了Importance sampling也能用。
TD1, TD0, TD lambda都能用,而gae需要能算state value的方法才能用,一般来说只有输出动作分布的才能算state value,因此gae只能在输出随机分布的算法中使用,对于DDPG无法使用,因为无法计算状态的价值,只能获得状态动作对的价值。因此DDPG无法使用,而PPO, A2C是可以使用的
从torchrl的实现中也可以看出,DDPG是不支持gae的
https://github.com/pytorch/rl/blob/bf264e0e24971fc05ec42b571de7b8df84043a51/torchrl/objectives/ddpg.py
if value_type == ValueEstimators.TD1:
self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.TD0:
self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.GAE:
raise NotImplementedError(
f"Value type {value_type} it not implemented for loss {type(self)}."
)
elif value_type == ValueEstimators.TDLambda:
self._value_estimator = TDLambdaEstimator(
value_network=self.actor_critic, **hp
)
else:
raise NotImplementedError(f"Unknown value type {value_type}")
最准确的回答是:看actor或者critic的损失函数,如果损失函数中有对s,a的分布有要求,那么就是on-policy的,否则是off-policy的
一般来说,如果使用了输出随机动作,那么actor的损失函数大概率是对s,a分布有要求的,因此是on-policy的,如果使用了输出确定动作的,比如DDPG,那么actor损失函数大概率是对s,a分布无要求的,因此是off-policy的。
另外不要根据动作是否连续进行判断,因为有时候输出的是高斯分布的均值和方差,然后在这个高斯分布中采样,这种虽然获得的也是连续的动作空间,但是输出的仍然是一个分布,因此是一个on-policy的。
有些人认为,对于DQN来说,根本没有actor函数,直接通过critic选择策略,因此action的分布永远是固定的,也没有这个问题。上面这个看法是错误的,DQN的action分布是会改变的,选择某个动作的概率有时候是0,有时候是1,怎么能说概率分布不变呢。DQN是off-policy的原因是DQN的损失函数中不对s,a的分布做要求,因此s,a分布改变也没有关系。
| 对比 | 随机策略 | 确定策略 |
|---|---|---|
| 代表算法 | PPO, A2C, … | DDPG |
| 动作的含义 | 离散的分布,或者连续分布的参数 | 动作值本身 |
| Actor的in_key | observation | action, observation |
| Actor的out_key | state_value | action_state_value |
| 可以使用的value估计方法 | TD0, TD1, TDlambda, gae | TD0, TD1,TDlambda |
| 是否需要importance samping | 是 | 否 |
| 是否可以直接使用replay buffer | 否 | 是 |
| actor 目标函数 | -state_value * action_prob | -state_action_value |