• [PyTorch][chapter 61][强化学习-免模型学习 off-policy]


    前言:

        蒙特卡罗的学习基本流程:

         Policy Evaluation :          生成动作-状态轨迹,完成价值函数的估计。

         Policy Improvement:       通过价值函数估计来优化policy。

           同策略(one-policy):产生 采样轨迹的策略 \pi^{'} 和要改善的策略 \pi 相同。

           Policy Evaluation :    通过\epsilon-贪心策略\pi^{'}),产生(状态-动作-奖赏)轨迹。

           Policy Improvement:  原始策略也是 \epsilon-贪心策略(\pi^{'}), 通过价值函数优化, \epsilon-贪心策略(\pi^{'})

          异策略(off-policy):产生采样轨迹的  策略 \pi^{'} 和要改善的策略 \pi 不同。

          Policy Evaluation :   通过\epsilon-贪心策略(\pi^{'}),产生采样轨迹(状态-动作-奖赏)。

          Policy Improvement:  改进原始策略\pi

        两个优势:

        1: 原始策略不容易采样

        2: 降低方差

    易策略常用的方案为 IR(importance sample) 重要性采样

    Importance sampling isMonte Carlo method for evaluating properties of a particular distribution, while only having samples generated from a different distribution than the distribution of interest. Its introduction in statistics is generally attributed to a paper by Teun Kloek and Herman K. van Dijk in 1978,[1] but its precursors can be found in statistical physics as early as 1949.[2][3] Importance sampling is also related to umbrella sampling in computational physics. Depending on the application, the term may refer to the process of sampling from this alternative distribution, the process of inference, or both.


    一  importance-samling

        1.1 原理

         原始问题:

          uf=xp(z)f(z)dx" role="presentation" style="position: relative;">uf=xp(z)f(z)dx

         如果采样N次,得到z1,z2,...zN" role="presentation" style="position: relative;">z1,z2,...zN

           uf1Nzip(z)f(zi)" role="presentation" style="position: relative;">uf1Nzip(z)f(zi)

        问题: p(z)" role="presentation" style="position: relative;">p(z) 很难采样(采样空间很大,很多时候只能采样到一部分)

       引入 q(x) 重要性分布(这也是一个分布,容易被采样)

      w(x)=\frac{p(x)}{q(x)}: 称为importance weight

                u_f =\int q(x)\frac{p(x)}{q(x)}f(x)dx

                 \approx \frac{1}{N}\sum_i w(x_i)f(x_i)(大数定理)

     下面例子,我们需要对w(xi)" role="presentation" style="position: relative;">w(xi),做归一化处理,更清楚的看出来占比

       下面代码进行了归一化处理,方案如下:

         w(xi)=logp(xi)logq(xi)" role="presentation" style="position: relative;">w(xi)=logp(xi)logq(xi)

         w^1(x_i)=\frac{e^{w(x_i)}}{\sum_j e^{w(x_i)}}

         w2(xi)=w(xi)logj(ew(xj))" role="presentation" style="position: relative;">w2(xi)=w(xi)logj(ew(xj))

          

    1. # -*- coding: utf-8 -*-
    2. """
    3. Created on Wed Nov 8 16:38:34 2023
    4. @author: chengxf2
    5. """
    6. import numpy as np
    7. import matplotlib.pyplot as plt
    8. from scipy.special import logsumexp
    9. class pdf:
    10. def __call__(self,x):
    11. pass
    12. def sample(self,n):
    13. pass
    14. #正太分布的概率密度
    15. class Norm(pdf):
    16. #返回一组符合高斯分布的概率密度随机数。
    17. def __init__(self, mu=0, sigma=1):
    18. self.mu = mu
    19. self.sigma = sigma
    20. def __call__(self, x):
    21. #log p 功能,去掉前面常数项
    22. logp = (x-self.mu)**2/(2*self.sigma**2)
    23. return -logp
    24. def sample(self, N):
    25. #产生N 个点,这些点符合正太分布
    26. x = np.random.normal(self.mu, self.sigma,N)
    27. return x
    28. class Uniform(pdf):
    29. #均匀分布的概率密度
    30. def __init__(self, low, high):
    31. self.low = low
    32. self.high = high
    33. def __call__(self, x):
    34. #logq 功能
    35. N = len(x)
    36. a = np.repeat(-np.log(self.high-self.low), N)
    37. return -a
    38. def sample(self, N):
    39. #产生N 点,这些点符合均匀分布
    40. x = np.random.uniform(self.low, self.high,N)
    41. return x
    42. class ImportanceSampler:
    43. def __init__(self, p_dist, q_dist):
    44. self.p_dist = p_dist
    45. self.q_dist = q_dist
    46. def sample(self, N):
    47. #采样
    48. samples = self.q_dist.sample(N)
    49. weights = self.calc_weights(samples)
    50. normal_weights = weights - logsumexp(weights)
    51. return samples, normal_weights
    52. def calc_weights(self, samples):
    53. #log (p/q) =log(p)-log(q)
    54. return self.p_dist(samples)-self.q_dist(samples)
    55. if __name__ == "__main__":
    56. N = 10000
    57. p = Norm()
    58. q = Uniform(-10, 10)
    59. sampler = ImportanceSampler(p, q)
    60. #samples 从q(x)采样出来的点,weight_sample
    61. samples,weight_sample= sampler.sample(N)
    62. #以weight_sample的概率,从samples中抽样 N 个点
    63. samples = np.random.choice(samples,N, p = np.exp(weight_sample))
    64. plt.hist(samples, bins=100)


    二 易策略 off-policy 原理

         target policy \pi: 原始策略 

            x:     这里面代表基于原始策略,得到的轨迹

                      [s0,a0,r1,....sT1,aT1,rT,sT]" role="presentation" style="position: relative;">[s0,a0,r1,....sT1,aT1,rT,sT]

           p(x):   该轨迹的概率

           f(x):    该轨迹的累积奖赏

          期望的累积奖赏:

                        u_f=\int_{x} f(x)p(x)dx \approx \frac{1}{N}\sum f(x_i)

        behavior policy \pi^{'}: 行为策略

         q(x): 代表各种轨迹的采样概率

        则累积奖赏函数f在概率p 也可以等价的写为:

         u_f=\int_{x}q(x)\frac{p(x)}{q(x)}f(x)dx

         E[f] \approx \frac{1}{m}\sum_{i=1}^{m}\frac{p(x_i)}{q(x_i)}f(x_i)

       

         P_i^{\pi} 和 P^{\pi^{'}} 分别表示两个策略产生i 条轨迹的概率,对于给定的一条轨迹

        [s0,a0,r1,....sT1,aT1,rT,sT]" role="presentation" style="position: relative;">[s0,a0,r1,....sT1,aT1,rT,sT]

        原始策略\pi 产生该轨迹的概率:

         P^{\pi}=\prod_{i=0}^{T-1} \pi(s_i,a_i)P_{s_i\rightarrow s_{i+1}}^{a_i}

        P^{\pi^{'}}=\prod_{i=0}^{T-1} \pi^{'}(s_i,a_i)P_{s_i\rightarrow s_{i+1}}^{a_i}

       则

        w(s)=\frac{P^{\pi}}{p^{\pi^{'}}}=\prod_{i=0}^{T-1}\frac{\pi(s_i,a_i)}{\pi^{'}(s_i,a_i)}

      若\pi 为确定性策略,但是\pi^{'} 是\pi\epsilon -贪心策略:

    原始策略   p_i=\left\{\begin{matrix} \pi(s_i,a_i)=1, if: a_i==\pi(x_i) \\ \pi(s_i,a_i)=0, if: a_i \neq \pi(x_i) \end{matrix}\right.

    行为策略: q_i=\left\{\begin{matrix} \pi^{'}(s_i,a_i)=1-\epsilon+\frac{\epsilon }{|A|} , if: a_i==\pi(x_i) \\ \pi^{'}(s_i,a_i)=\frac{\epsilon }{|A|}, if: a_i \neq \pi(x_i) \end{matrix}\right.

      现在通过行为策略产生的轨迹度量权重w

     理论上应该是连乘的,但是p_i=0, if a_i \neq \pi(x_i),

     考虑到只是概率的比值,上面可以做个替换

     w(s)=\frac{p^{\pi}}{p^{\pi^{'}}}=\prod\frac{e^{p_i}}{e^{q_i}}=\prod e^{p_i-q_i}

    其中: wi=epieqi=epiqi" role="presentation" style="position: relative;">wi=epieqi=epiqi更灵活的利用importance sample)

    其核心是要计算两个概率比值,上面的例子是去log,再归一化


    三  方差影响


    四  代码

    代码里面R的计算方式跟上面是不同的,

    R=\frac{1}{T-t}(\sum_{i=t}^{T-1}r_i)(\prod_{j=t}^{T-1}w_j)

    w_j=e^{p_j-q_j}

    1. # -*- coding: utf-8 -*-
    2. """
    3. Created on Wed Nov 8 11:56:26 2023
    4. @author: chengxf2
    5. """
    6. import numpy as ap
    7. # -*- coding: utf-8 -*-
    8. """
    9. Created on Fri Nov 3 09:37:32 2023
    10. @author: chengxf2
    11. """
    12. # -*- coding: utf-8 -*-
    13. """
    14. Created on Thu Nov 2 19:38:39 2023
    15. @author: cxf
    16. """
    17. import numpy as np
    18. import random
    19. from enum import Enum
    20. class State(Enum):
    21. #状态空间#
    22. shortWater =1 #缺水
    23. health = 2 #健康
    24. overflow = 3 #溢水
    25. apoptosis = 4 #凋亡
    26. class Action(Enum):
    27. #动作空间A#
    28. water = 1 #浇水
    29. noWater = 2 #不浇水
    30. class Env():
    31. def reward(self, state):
    32. #针对转移到新的环境奖赏
    33. r = -100
    34. if state is State.shortWater:
    35. r =-1
    36. elif state is State.health:
    37. r = 1
    38. elif state is State.overflow:
    39. r= -1
    40. else: # State.apoptosis
    41. r = -100
    42. return r
    43. def action(self, state, action):
    44. if state is State.shortWater:
    45. if action is Action.water :
    46. newState =[State.shortWater, State.health]
    47. p =[0.4, 0.6]
    48. else:
    49. newState =[State.shortWater, State.apoptosis]
    50. p =[0.4, 0.6]
    51. elif state is State.health:
    52. #健康
    53. if action is Action.water :
    54. newState =[State.health, State.overflow]
    55. p =[0.6, 0.4]
    56. else:
    57. newState =[State.shortWater, State.health]
    58. p =[0.6, 0.4]
    59. elif state is State.overflow:
    60. #溢水
    61. if action is Action.water :
    62. newState =[State.overflow, State.apoptosis]
    63. p =[0.6, 0.4]
    64. else:
    65. newState =[State.health, State.overflow]
    66. p =[0.6, 0.4]
    67. else:
    68. #凋亡
    69. newState=[State.apoptosis]
    70. p =[1.0]
    71. #print("\n S",S, "\t prob ",proba)
    72. nextState = random.choices(newState, p)[0]
    73. r = self.reward(nextState)
    74. return nextState,r
    75. def __init__(self):
    76. self.name = "环境空间"
    77. class Agent():
    78. def initPolicy(self):
    79. #初始化累积奖赏
    80. self.Q ={} #(state,action) 的累积奖赏
    81. self.count ={} #(state,action) 执行的次数
    82. for state in self.S:
    83. for action in self.A:
    84. self. Q[state, action] = 0.0
    85. self.count[state,action]= 0
    86. action = self.randomAction()
    87. self.policy[state]= Action.noWater #初始化都不浇水
    88. def randomAction(self):
    89. #随机策略
    90. action = random.choices(self.A, [0.5,0.5])[0]
    91. return action
    92. def behaviorPolicy(self):
    93. #使用e-贪心策略
    94. state = State.shortWater #从缺水开始
    95. env = Env()
    96. trajectory ={}#[s0,a0,r0]--[s1,a1,r1]--[sT-1,aT-1,rT-1]
    97. for t in range(self.T):
    98. #选择策略
    99. rnd = np.random.rand() #生成随机数
    100. if rnd <self.epsilon:
    101. action =self.randomAction()
    102. else:
    103. #通过原始策略选择action
    104. action = self.policy[state]
    105. newState,reward = env.action(state, action)
    106. trajectory[t]=[state,action,reward]
    107. state = newState
    108. return trajectory
    109. def calcW(self,trajectory):
    110. #计算权重
    111. q1 = 1.0-self.epsilon+self.epsilon/2.0 # a== 原始策略
    112. q2 = self.epsilon/2.0 # a!=原始策略
    113. w ={}
    114. for t, value in trajectory.items():
    115. #[state, action,reward]
    116. action =value[1]
    117. state = value[0]
    118. if action == self.policy[state]:
    119. p = 1
    120. q = q1
    121. else:
    122. p = 0
    123. q = q2
    124. w[t] = round(np.exp(p-q),3)
    125. #print("\n w ",w)
    126. return w
    127. def getReward(self,t,wDict,trajectory):
    128. p = 1.0
    129. r= 0
    130. #=[state,action,reward]
    131. for i in range(t,self.T):
    132. r+=trajectory[t][-1]
    133. w =wDict[t]
    134. p =p*w
    135. R = p*r
    136. m = self.T-t
    137. return R/m
    138. def improve(self):
    139. a = Action.noWater
    140. for state in self.S:
    141. maxR = self.Q[state, a]
    142. for action in self.A:
    143. R = self.Q[state,action]
    144. if R>=maxR:
    145. maxR = R
    146. self.policy[state]= action
    147. def learn(self):
    148. self.initPolicy()
    149. for s in range(1,self.maxIter): #采样第S 条轨迹
    150. #通过行为策略(e-贪心策略)产生轨迹
    151. trajectory =self.behaviorPolicy()
    152. w = self.calcW(trajectory)
    153. print("\n 迭代次数 %d"%s ,"\t 缺水:",self.policy[State.shortWater].name,
    154. "\t 健康:",self.policy[State.health].name,
    155. "\t 溢水:",self.policy[State.overflow].name,
    156. "\t 凋亡:",self.policy[State.apoptosis].name)
    157. #策略评估
    158. for t in range(self.T):
    159. R = self.getReward(t, w,trajectory)
    160. state = trajectory[t][0]
    161. action = trajectory[t][1]
    162. Q = self.Q[state,action]
    163. count = self.count[state, action]
    164. self.Q[state,action] = (Q*count+R)/(count+1)
    165. self.count[state, action]=count+1
    166. #获取权重系数
    167. self.improve()
    168. def __init__(self):
    169. self.S = [State.shortWater, State.health, State.overflow, State.apoptosis]
    170. self.A = [Action.water, Action.noWater]
    171. self.Q ={} #累积奖赏
    172. self.count ={}
    173. self.policy ={} #target Policy
    174. self.maxIter =500
    175. self.epsilon = 0.2
    176. self.T = 10
    177. if __name__ == "__main__":
    178. agent = Agent()
    179. agent.learn()

    https://img2020.cnblogs.com/blog/1027447/202110/1027447-20211013112906490-1926128536.png

  • 相关阅读:
    UGUI性能优化学习笔记(二)合批
    JAVA面试总结
    JsJavascriptEcma的eval性能测试2208011912
    【教程】Pycharm社区版中打开jupyter的方法
    【Spring】事务
    JetBrain Pycharm的一系列快捷键
    python连接MySQL数据库服务器、使用SQL查询数据表中的所有数据
    【LeetCode】Day127-四数之和 & 组合总和
    衡石科技携手亚马逊云科技、浩方集团,三大势能助推出海业务数字化升级
    go语言中如何实现同步操作呢
  • 原文地址:https://blog.csdn.net/chengxf2/article/details/134202820