• 【强化学习】《动手学强化学习》动态规划算法


    一、基本思想

    • 动态规划算法在计算机专业课中是特别重要的思想,将待求问题分解成若干个子问题,先求解子问题,然后利用子问题的解得到目标问题的解。贝尔曼方程将当前阶段的最优决策问题转化为下一阶段的最优决策问题,从而初始状态的最优决策可以由终状态的最优决策(一般易解)问题逐步迭代求解。
    • 本篇文章介绍如何用动态规划的思想来求解马尔可夫决策过程中的最优策略。基于动态规划的强化学习算法主要有两种:策略迭代价值迭代。策略迭代由两部分组成:策略评估与策略提升。策略评估使用贝尔曼期望方程来得到一个策略的状态价值函数;价值迭代直接使用贝尔曼最优方程来进行动态规划。
    • 所有的动态规划强化学习算法必须要求都是基于模型的(model-based),即知道完整的MDP信息才能够使用贝尔曼方程进行迭代更新。如果无法得知完整MDP信息,就只能通过和环境进行交互采样。

    二、悬崖漫步环境

    • 4行12列的网格地图,左下角是起点,右下角是终点,最下方一行除了起点和终点以外都是悬崖。智能体的位置便是状态,在每个状态都可以执行上、下、左、右四个动作,如果触碰到边界则状态不变。终结状态是最下方一行除了起点以外的所有格子。智能体每走一步奖励-1,掉入悬崖奖励-100,在此激励下智能体想要用最短的步数到达终点。示意图如下
      在这里插入图片描述
    class CliffWalkingEnv:
        """ 悬崖漫步环境"""
        def __init__(self, ncol=12, nrow=4):
            self.ncol = ncol  # 定义网格世界的列
            self.nrow = nrow  # 定义网格世界的行
            # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
            # P含有48个list,每个list包含4个list,每个list包含一个四元组(p,next_state,reward,done)
            self.P = self.createP()
    
        def createP(self):
            # 初始化
            P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
    
            # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)定义在左上角
            change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
            for i in range(self.nrow):
                for j in range(self.ncol):
                    # 遍历每个位置,即每个状态
                    for a in range(4):
                        # 遍历每个状态的4个动作
                        # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                        if i == self.nrow - 1 and j > 0:
                            P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]
                            continue
                        # 其他位置
                        # 与0取max是判断是否触碰左边界,与self.ncol-1取min是判断是否触碰右边界。
                        next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                        # 与0取max是判断是否触碰上边界,与self.nrow-1取min是判断是否触碰下边界。
                        next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                        next_state = next_y * self.ncol + next_x
                        reward = -1
                        done = False
                        # 下一个位置在悬崖或者终点
                        if next_y == self.nrow - 1 and next_x > 0:
                            done = True
                            if next_x != self.ncol - 1:  # 下一个位置在悬崖
                                reward = -100
                        P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
            return P
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 上述环境代码的编写核心在于状态转移矩阵的编写。转移矩阵 P P P共有48个(4x12)个list,每个list包含4个list对应4个动作,每个list包含一个4元组(转移概率,下一状态,奖励,是否终结)。

    三、策略迭代算法

    • 策略迭代是策略评估和策略提升不断循环交替,直至最后得到最优策略的过程。策略评估是计算一个状态价值函数的过程;策略提升是优化当前策略的过程。如果策略提升之后的策略与之前相同说明此时策略迭代算法已达到了收敛状态,此时的策略即为最优策略。
    3.1 策略评估
    • 策略评估说白了就是计算当前策略下的状态价值函数。
    • 首先来看一下之前提及的贝尔曼期望方程:
      V π ( s ) = ∑ a ∈ A π ( a ∣ s ) ( r ( s , a ) + γ ∑ s ′ ∈ S P ( s ′ ∣ s , a ) V π ( s ′ ) ) V^{\pi}(s)=\sum_{a\in A}\pi(a|s)(r(s,a)+\gamma\sum_{s'\in S}P(s'|s,a)V^{\pi}(s')) Vπ(s)=aAπ(as)(r(s,a)+γsSP(ss,a)Vπ(s))
      其中 π ( a ∣ s ) \pi(a|s) π(as)由策略决定, r ( s , a ) , P ( s ′ ∣ s , a ) r(s,a),P(s'|s,a) r(s,a),P(ss,a)由MDP决定。因此上述贝尔曼期望方程可以看作,在策略确定以及MDP确定的情况下,利用状态 s s s可转移到的所有周边状态 s ′ s' s的价值函数来计算 s s s的价值函数。
    • 策略评估是一个循环迭代的过程,第k+1轮中状态 s s s的价值函数使用第k轮中与之相关的状态 s ′ s' s的价值函数来求解。当迭代轮次最够多时,认为当前价值函数逼近或者就是真正的价值函数。实际应用中策略评估使用贝尔曼期望函数进行迭代使用大量计算资源,无须迭代无限轮次,只需要某次迭代更新的幅度非常小时算法即可停止。
    3.2 策略提升
    • 策略评估之后,我们获得了在当前策略下所有状态的价值函数,也就是得到了在当前策略下每个状态出发到达终结状态的期望回报。得到了状态价值函数,也就相当于得到了动作价值函数。我们便可以在当前策略的基础上每个状态都贪心地选择最优的动作 a a a以最大化 Q π ( s , a ) Q^{\pi}(s,a) Qπ(s,a),从而实现策略提升。
      π ′ ( s ) = arg ⁡ max ⁡ a Q π ( s , a ) = arg ⁡ max ⁡ a { r ( s , a ) + γ ∑ s ′ P ( s ′ ∣ s , a ) V π ( s ′ ) }
      π(s)=argmaxaQπ(s,a)=argmaxa{r(s,a)+γsP(s|s,a)Vπ(s)}" role="presentation">π(s)=argmaxaQπ(s,a)=argmaxa{r(s,a)+γsP(s|s,a)Vπ(s)}
      π(s)=argamaxQπ(s,a)=argamax{r(s,a)+γsP(ss,a)Vπ(s)}
    • 贪心算法与贪心策略我们需要考虑的问题是,局部最优的累积最终是否达到全局最优。在策略提升中需要考虑的问题是,每个状态都选择动作价值最大的动作得到的新策略 π ′ \pi' π是否在每个状态的价值都不低于原策略 π \pi π在该状态的价值。证明如下:
      V π ( s ) ≤ Q π ( s , π ′ ( s ) ) = E π ′ [ R t + γ V π ( S t + 1 ) ∣ S t = s ] ≤ E π ′ [ R t + γ Q π ( S t + 1 , π ′ ( S t + 1 ) ) ∣ S t = s ] = E π ′ [ R t + γ R t + 1 + γ 2 V π ( S t + 2 ) ∣ S t = s ] ≤ E π ′ [ R t + γ R t + 1 + γ 2 R t + 2 + γ 3 V π ( S t + 3 ) ∣ S t = s ] . . . ≤ E π ′ [ R t + γ R t + 1 + γ 2 R t + 2 + γ 3 R t + 3 + . . . ∣ S t = s ] = V π ′ ( s )
      Vπ(s)Qπ(s,π(s))=Eπ[Rt+γVπ(St+1)|St=s]Eπ[Rt+γQπ(St+1,π(St+1))|St=s]=Eπ[Rt+γRt+1+γ2Vπ(St+2)|St=s]Eπ[Rt+γRt+1+γ2Rt+2+γ3Vπ(St+3)|St=s]...Eπ[Rt+γRt+1+γ2Rt+2+γ3Rt+3+...|St=s]=Vπ(s)" role="presentation" style="position: relative;">Vπ(s)Qπ(s,π(s))=Eπ[Rt+γVπ(St+1)|St=s]Eπ[Rt+γQπ(St+1,π(St+1))|St=s]=Eπ[Rt+γRt+1+γ2Vπ(St+2)|St=s]Eπ[Rt+γRt+1+γ2Rt+2+γ3Vπ(St+3)|St=s]...Eπ[Rt+γRt+1+γ2Rt+2+γ3Rt+3+...|St=s]=Vπ(s)
      Vπ(s)Qπ(s,π(s))=Eπ[Rt+γVπ(St+1)St=s]Eπ[Rt+γQπ(St+1,π(St+1))St=s]=Eπ[Rt+γRt+1+γ2Vπ(St+2)St=s]Eπ[Rt+γRt+1+γ2Rt+2+γ3Vπ(St+3)St=s]...Eπ[Rt+γRt+1+γ2Rt+2+γ3Rt+3+...∣St=s]=Vπ(s)
    3.3 悬崖漫步环境下的策略迭代
    import copy
    
    
    class PolicyIteration:
        """ 策略迭代算法 """
        def __init__(self, env, theta, gamma):
            self.env = env
            self.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0
            self.pi = [[0.25, 0.25, 0.25, 0.25] for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略
            self.theta = theta  # 策略评估收敛阈值
            self.gamma = gamma  # 折扣因子
    
        def policy_evaluation(self):  # 策略评估
            cnt = 1  # 计数器
            while 1:
                max_diff = 0
                new_v = [0] * self.env.ncol * self.env.nrow  # 定义一个新容器
                for s in range(self.env.ncol * self.env.nrow):
                    qsa_list = []  # 开始计算状态s下的所有Q(s,a)价值
                    for a in range(4):
                        qsa = 0
                        for res in self.env.P[s][a]:
                            p, next_state, r, done = res
                            qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                        qsa_list.append(self.pi[s][a] * qsa)
                    new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系
                    max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
                self.v = new_v
                if max_diff < self.theta:
                    break  # 满足收敛条件,退出评估迭代
                cnt += 1
            print("策略评估进行%d轮后完成" % cnt)
    
        def policy_improvement(self):  # 策略提升
            for s in range(self.env.nrow * self.env.ncol):
                qsa_list = []
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(qsa)
                maxq = max(qsa_list)
                cntq = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
                # 让这些动作均分概率
                self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
            print("策略提升完成")
            return self.pi
    
        def policy_iteration(self):  # 策略迭代
            while 1:
                self.policy_evaluation()
                old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较
                new_pi = self.policy_improvement()
                if old_pi == new_pi:
                    break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 读者可能会纳闷为啥代码中qsa的计算方法与公式不同?具体来说,公式中状态转移概率在括号内部,代码中状态转移概率在括号外部并且括号内还乘了 ( 1 − d o n e ) (1-done) (1done)。这个问题跟环境的设定有关。本文环境中,并不是在状态 s s s下做出动作决策 a a a就立即获得当前轮次的奖励,而是在真正迁移到下一状态之后才获得对应奖励,这就解释了为什么转移概率在括号外面。本文还设定转移到终结状态后奖励为0,因此乘以 ( 1 − d o n e ) (1-done) (1done)
    • 策略提升部分,每次都是以混合策略的形式提升。得到每个状态 s s s动作价值函数最大的几个动作,将概率 1 1 1平均分配到上述动作中。
    • 一个完整的强化学习项目一般包含三大模块,具体来说是三个.py文件。env.py、RL_brain.py、main.py。其中env.py定义了该问题的环境,包括智能体与环境的交互过程以及可视化过程;RL_brain.py定义了该问题所使用的RL算法;main.py包含该问题的主控逻辑。

    四、价值迭代算法

    • 在策略迭代算法中显示定义了策略,策略具体来说是面对每个状态选择不同动作的混合策略。策略迭代的策略评估过程需要很多轮才可以收敛,整个算法运行下来需要很大的计算量。价值迭代算法直接通过选择最大化状态价值函数/动作价值函数来产生最优策略,每次“策略”更新(实际上就是状态价值函数更新)都只需要一次状态价值函数的更新。
    • 状态价值函数的贝尔曼最优方程如下所示:
      V ∗ ( s ) = max ⁡ a ∈ A { r ( s , a ) + γ ∑ s ′ ∈ S P ( s ′ ∣ s , a ) V ∗ ( s ′ ) } V^*(s)=\max_{a\in A}\{r(s,a)+\gamma \sum_{s'\in S}P(s'|s,a)V^*(s')\} V(s)=aAmax{r(s,a)+γsSP(ss,a)V(s)}
      将其写成迭代更新的方式如下:
      V k + 1 ( s ) = max ⁡ a ∈ A { r ( s , a ) + γ ∑ s ′ ∈ S P ( s ′ ∣ s , a ) V k ( s ′ ) } V^{k+1}(s)=\max_{a\in A}\{r(s,a)+\gamma \sum_{s'\in S}P(s'|s,a)V^k(s')\} Vk+1(s)=aAmax{r(s,a)+γsSP(ss,a)Vk(s)}
      迭代更新到 V k + 1 V^{k+1} Vk+1 V k V^k Vk相同或差距很小时,此时对应着最优状态价值函数 V ∗ ( s ) V^*(s) V(s),利用最优状态价值函数便可以复原出最优策略。
      π ( s ) = arg ⁡ max ⁡ a { r ( s , a ) + γ ∑ s ′ ∈ S P ( s ′ ∣ s , a ) V ∗ ( s ′ ) } \pi(s)=\arg \max_{a}\{r(s,a)+\gamma \sum_{s'\in S}P(s'|s,a)V^*(s')\} π(s)=argamax{r(s,a)+γsSP(ss,a)V(s)}
    • 悬崖漫步环境下的价值迭代代码如下所示:
    class ValueIteration:
        """ 价值迭代算法 """
        def __init__(self, env, theta, gamma):
            self.env = env
            self.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0
            self.theta = theta  # 价值收敛阈值
            self.gamma = gamma
            # 价值迭代结束后得到的策略
            self.pi = [None for i in range(self.env.ncol * self.env.nrow)]
    
        def value_iteration(self):
            cnt = 0
            while 1:
                max_diff = 0
                new_v = [0] * self.env.ncol * self.env.nrow
                for s in range(self.env.ncol * self.env.nrow):
                    qsa_list = []  # 开始计算状态s下的所有Q(s,a)价值
                    for a in range(4):
                        qsa = 0
                        for res in self.env.P[s][a]:
                            p, next_state, r, done = res
                            qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                        qsa_list.append(qsa)  # 这一行和下一行代码是价值迭代和策略迭代的主要区别
                    new_v[s] = max(qsa_list)
                    max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
                self.v = new_v
                if max_diff < self.theta: 
                    break  # 满足收敛条件,退出评估迭代
                cnt += 1
            print("价值迭代一共进行%d轮" % cnt)
            self.get_policy()
    
        def get_policy(self):  # 根据价值函数导出一个贪婪策略
            for s in range(self.env.nrow * self.env.ncol):
                qsa_list = []
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(qsa)
                maxq = max(qsa_list)
                cntq = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
                # 让这些动作均分概率
                self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
  • 相关阅读:
    mybatis-plus使用sql的date_format()函数来查询数据
    【三大锁】悲观锁——mysql悲观锁
    Python第二章 基础语法
    hermite、三次样条插值算法 调用matlab函数、代码实现
    npm 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径,请确保路径正确,然后再试一次。
    图论17-有向图的强联通分量-Kosaraju算法
    ts重点学习108-枚举类型知识点补充
    声明式事务管理参数配置
    阿里云Linux系统MySQL8忘记密码修改密码
    100个Python实战项目(十)从照片构建 GUI 应用程序铅笔素描
  • 原文地址:https://blog.csdn.net/dzc_go/article/details/126896965