• ROS学习(27)有限状态机SMACH



    前言

      有限状态机是一款用于对象行为建模的工具,其主要作用是描述对象在生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。。
      ROS提供了有限状态机功能包SMACH来处理机器人任务中的多个状态模块。

    一、什么是SMACH

      SMACH是基于Python实现的一个功能强大且易于扩展的库,不依赖于ROS,可用于任意python项目ROS中的executive_smach功能包将SMACH和ROS集成到一起,为机器人复杂应用开发提供任务级的状态机框架,同时集成了actionlib和smach_viewer,用于管理action和状态机的可视化显示。

    SMACH实现的功能:

    1. 快速原型设计:基于python语法,实现状态机模型的快速开发测试
    2. 复杂状态机模型:支持设计、维护、调试大型复杂的状态机
    3. 可视化:提供可视化的smach_viewer,可以看到完整状态机的状态跳转、数据流等信息

    二、安装SMACH

    命令如下:

    sudo apt-get install ros-kinetic-executive-smach
    sudo apt-get install ros-kinetic-executive-smach-visualization
    
    • 1
    • 2

    三、运行一个状态机

    参考胡老师的源码,功能包名为smach_tutorials,运行一个简单的状态机,命令如下:

    roscore
    rosrun smach_tutorials state_machine_simple.py
    
    • 1
    • 2

    如果运行失败,则先添加可执行权限,效果如下:
    在这里插入图片描述
    可以看出状态机在进行状态跳转,一共两个状态FOO和BAR,我们借助可视化的smach_viewer,可以很清晰的看到完整状态机的状态跳转、数据流等信息。命令如下:

    rosrun smach_viewer smach_viewer.py
    
    • 1

    效果如下:
    在这里插入图片描述

    四、实现剖析

    上述例程的实现源码在state_machine_simple.py文件中,大体上分为如下几步:

    1、 定义状态FOO和BAR

    (1)初始化函数:初始化状态类,定义输出状态。
    (2)执行函数:每个状态中的具体工作内容,工作结束后返回定义的输出值,该状态结束。

    # 定义状态Foo
    class Foo(smach.State):
        def __init__(self):
            smach.State.__init__(self, outcomes=['outcome1','outcome2'])
            self.counter = 0
    
        def execute(self, userdata):
            rospy.loginfo('Executing state FOO')
            if self.counter < 3:
                self.counter += 1
                return 'outcome1'
            else:
                return 'outcome2'
    
    # 定义状态Bar
    class Bar(smach.State):
        def __init__(self):
            smach.State.__init__(self, outcomes=['outcome2'])
    
        def execute(self, userdata):
            rospy.loginfo('Executing state BAR')
            return 'outcome2'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2、创建状态机

        # 创建一个状态机
        sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
    
    • 1
    • 2

    同时指定状态机执行结束后的输出值outcome4和outcome5

    3、 添加状态到状态机容器中

            # 使用add方法添加状态到状态机容器当中
            smach.StateMachine.add('FOO', Foo(), 
                                   transitions={'outcome1':'BAR', 
                                                'outcome2':'outcome4'})
    
    • 1
    • 2
    • 3
    • 4

    transitions代表状态跳转,如果FOO状态输出为outcome1,则跳转到BAR状态;如果状态输出为outcome2,则结束这个状态机,并且输出outcome4

    4、创建内部监测服务器

        # 创建并启动内部监测服务器
        sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
        sis.start()
    
    • 1
    • 2
    • 3

    第一个参数,表示监测服务器的名称,第二个参数,表示所要监测的状态机,第三个参数,表示状态机的层级
    目的是为了状态机可视化。

    5、执行状态机

        # 开始执行状态机
        outcome = sm.execute()
    
    • 1
    • 2

    五、状态间的数据传递

    有时候后一个状态的工作需要使用前一个状态中的数据,这时就需要状态间的数据传递。运行实例,命令如下:

    roscore
    rosrun smach_tutorials user_data.py
    rosrun smach_viewer smach_viewer.py
    
    • 1
    • 2
    • 3

    效果如下:
    在这里插入图片描述

    1、定义状态

    # 定义状态Foo
    class Foo(smach.State):
        def __init__(self):
            smach.State.__init__(self, 
                                 outcomes=['outcome1','outcome2'],
                                 input_keys=['foo_counter_in'],
                                 output_keys=['foo_counter_out'])
    
        def execute(self, userdata):
            rospy.loginfo('Executing state FOO')
            if userdata.foo_counter_in < 3:
                userdata.foo_counter_out = userdata.foo_counter_in + 1
                return 'outcome1'
            else:
                return 'outcome2'
    
    
    # 定义状态Bar
    class Bar(smach.State):
        def __init__(self):
            smach.State.__init__(self, 
                                 outcomes=['outcome1'],
                                 input_keys=['bar_counter_in'])
            
        def execute(self, userdata):
            rospy.loginfo('Executing state BAR')
            rospy.loginfo('Counter = %f'%userdata.bar_counter_in)        
            return 'outcome1'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    可以看到,在状态的初始化中多了两个参数:input_keys和output_keys,分别表示状态的输入/输出数据。在执行函数中,也多了一个userdata参数,这就是存储状态之间所传递数据的容器。FOO状态的输入/输出数据foo_counter_in和foo_counter_out就存储在userdata中。所以要访问或修改数据,需要使用userdata.foo_counter_out和userdata.foo_counter_in的形式。

    2、定义状态间需传递的数据变量

    sm.userdata.sm_counter = 0
    
    • 1

    3、添加状态到状态机容器中

            # 使用add方法添加状态到状态机容器当中
            smach.StateMachine.add('FOO', Foo(), 
                                   transitions={'outcome1':'BAR', 
                                                'outcome2':'outcome4'},
                                   remapping={'foo_counter_in':'sm_counter', 
                                              'foo_counter_out':'sm_counter'})
            smach.StateMachine.add('BAR', Bar(), 
                                   transitions={'outcome1':'FOO'},
                                   remapping={'bar_counter_in':'sm_counter'})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这一步,多了一个remapping参数,类似于ROS重映射。这里将sm_counter映射为foo_counter_in、foo_counter_out和bar_counter_in,也就是给sm_counter取了一堆别名。这样,sm_counter在FOO中累加后就会被传递到BAR中,表明状态间的数据传递完成。

    六、状态机嵌套

    状态机是容器,支持嵌套功能,即在状态机中还可以嵌套实现一个内部状态机。
    运行实例,命令如下:

    roscore
    rosrun smach_tutorials state_machine_nesting.py 
    rosrun smach_viewer smach_viewer.py
    
    • 1
    • 2
    • 3

    效果如下:
    在这里插入图片描述
    灰色区域,就是状态SUB内部的嵌套状态机。

    1、定义状态

    # 定义状态Bas
    class Bas(smach.State):
        def __init__(self):
            smach.State.__init__(self, outcomes=['outcome3'])
    
        def execute(self, userdata):
            rospy.loginfo('Executing state BAS')
            return 'outcome3'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2、创建状态机

        # 创建一个顶层状态机
        sm_top = smach.StateMachine(outcomes=['outcome5'])
        
        # 打开状态机容器
        with sm_top:
    
            smach.StateMachine.add('BAS', Bas(),
                                   transitions={'outcome3':'SUB'})
    
            # 创建一个内嵌的状态机
            sm_sub = smach.StateMachine(outcomes=['outcome4'])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    创建状态机sm_top,将其作为最顶层,在其中加入BAS状态,该状态在输出outcome3时会跳转到SUB状态。然后,定义一个内嵌的状态机。

    3、 添加状态到状态机容器中

            # 打开状态机容器
            with sm_sub:
    
                # 使用add方法添加状态到状态机容器当中
                smach.StateMachine.add('FOO', Foo(), 
                                       transitions={'outcome1':'BAR', 
                                                    'outcome2':'outcome4'})
                smach.StateMachine.add('BAR', Bar(), 
                                       transitions={'outcome1':'FOO'})
    
            smach.StateMachine.add('SUB', sm_sub,
                                   transitions={'outcome4':'outcome5'})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    将状态FOO和BAR添加到内嵌状态机sm_sub中,然后将sm_sub嵌套到sm_top中。

    七、多状态并行

    SMACH支持多个状态并列运行。
    运行实例,命令如下:

    roscore
    rosrun smach_tutorials concurrence.py 
    rosrun smach_viewer smach_viewer.py
    
    • 1
    • 2
    • 3

    效果如下:
    在这里插入图片描述
    可以看到FOO和BAR状态是并列运行的。

    1、创建状态机

        # 创建一个顶层状态机
        sm_top = smach.StateMachine(outcomes=['outcome6'])
        
        # 打开状态机容器
        with sm_top:
    
            smach.StateMachine.add('BAS', Bas(),
                                   transitions={'outcome3':'CON'})
    
            # 创建一个内嵌的状态机
            sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
                                       default_outcome='outcome4',
                                       outcome_map={'outcome5':
                                           { 'FOO':'outcome2',
                                             'BAR':'outcome1'}})
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2、添加状态到状态机容器中

            # 打开状态机容器
            with sm_con:
                # 使用add方法添加状态到状态机容器当中
                smach.Concurrence.add('FOO', Foo())
                smach.Concurrence.add('BAR', Bar())
    
            smach.StateMachine.add('CON', sm_con,
                                   transitions={'outcome4':'CON',
                                                'outcome5':'outcome6'})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    使用Concurrence创建了一个同步状态机,default_outcome表示该状态机的默认输出是outcome4,依然会循环该状态机。
    outcome_map参数,设置状态机同步运行的状态跳转,当FOO状态的输出为outcome2并且BAR状态的输出为outcome1时,状态机才会输出outcome5,从而跳转到顶层状态机中。

  • 相关阅读:
    【计算机毕设之基于Java的高校毕业生就业质量数据分析系统-哔哩哔哩】 https://b23.tv/3T9UIrM
    webpack-cli 在 webpack 打包中的作用
    大数据之路读书笔记-01总述
    区间第k小数 (可持久化线段树、主席树)
    关于安卓调用python代码(chaquo)(一)
    SQL优化--count优化
    数据类型 (C语言)
    VSCode自定义闪烁光标
    .NET混合开发解决方案11 WebView2加载的网页中JS调用C#方法
    【4】Docker容器相关命令
  • 原文地址:https://blog.csdn.net/u011832219/article/details/125500427