• JAVA任务流-算子执行排序算法


    目录

    初版

    根据后续业务升级版(支持判断和debug)

    代码(暂时不写了,可以自己根据代码研究)

    新方案(不依赖于预排序)



    初版

    需求,给定1个图(暂时不包含判断),求他的执行顺序

    假设单线程主机(其中每个字母,代表不同的算子

    最终想得到的是执行顺序

    如果3没有走,则不能运行4和5.

    请求测试

    {

        "id":"1",

        "newsName":"测试任务",

        "jsonArray":[

            {"name":"a","next":["b","c"],"pre":[]},

            {"name":"b","next":["d"],"pre":["a"]},

            {"name":"c","next":[],"pre":["a"]},

            {"name":"d","next":["f","e"],"pre":["b"]},

            {"name":"f","next":[],"pre":["d"]},

            {"name":"e","next":[],"pre":["d"]}  

        ],

        "cron":"* * * * *",

        "executeCount":1

    }

     只用管红色内容即可,得到

    {
        "code": 1,
        "message": "执行成功",
        "data": {
            "1": {
                "name": "a",
                "next": [
                    "b",
                    "c"
                ],
                "pre": []
            },
            "2": {
                "name": "b",
                "next": [
                    "d"
                ],
                "pre": [
                    "a"
                ]
            },
            "3": {
                "name": "d",
                "next": [
                    "f",
                    "e"
                ],
                "pre": [
                    "b"
                ]
            },
            "4": {
                "name": "f",
                "next": [],
                "pre": [
                    "d"
                ]
            },
            "5": {
                "name": "e",
                "next": [],
                "pre": [
                    "d"
                ]
            },
            "6": {
                "name": "c",
                "next": [],
                "pre": [
                    "a"
                ]
            }
        }
    }

     JAVA,controller代码

    1. @PostMapping("/debugRun")
    2. @ResponseBody
    3. public RsJsonBean test(@RequestBody JsonRequestParam jsonRequestParam){
    4. System.out.println(jsonRequestParam);
    5. try {
    6. //走service
    7. JSONArray jsonArray = jsonRequestParam.getJsonArray();
    8. //排序算法,计算出顺序
    9. /**
    10. * 先寻找根节点
    11. * */
    12. //将节点遍历以name为key,value为jsonObject
    13. HashMap nameJsonObject = new HashMap();
    14. for (int i = 0; i
    15. JSONObject jsonObject = jsonArray.getJSONObject(i);
    16. String name = jsonObject.getString("name");//name前端必须传过来,而且还是唯一的
    17. nameJsonObject.put(name,jsonObject);
    18. }
    19. HashMap numAction = new HashMap();//定义最终顺序
    20. HashMap preExecuteName = new HashMap();//定义已经编过号的节点,value暂时没有使用
    21. for (int i = 0; i < jsonArray.size(); i++) {
    22. JSONObject jsonObject = jsonArray.getJSONObject(i);
    23. JSONArray prev = jsonObject.getJSONArray("pre");
    24. if (prev.size()==0){//前端必须给数组,上面没有父节点,给空数组
    25. //找到了根节点,根节点编号,执行sortAction递归
    26. //编号
    27. numAction.put(numAction.size()+1,jsonObject);
    28. preExecuteName.put(jsonObject.getString("name"),"已经预执行");
    29. //递归遍历子节点,并且编号
    30. SortTool.sortAction(jsonObject,numAction,nameJsonObject,preExecuteName);
    31. }
    32. }
    33. System.out.println("成功,顺序为"+numAction);
    34. //---等会儿封装到service中
    35. return new RsJsonBean(1,"执行成功",numAction);
    36. }catch (NullPointerException e){
    37. e.printStackTrace();
    38. return new RsJsonBean(0,"传入数据字段格式不匹配,导致解析为空"+e);
    39. }
    40. }

    JAVA的递归代码

    1. package com.dahua.tools;
    2. import com.alibaba.fastjson.JSONArray;
    3. import com.alibaba.fastjson.JSONObject;
    4. import java.util.HashMap;
    5. public class SortTool {
    6. //预排序,执行算子的各个顺序
    7. /**
    8. *方法1,计算执行顺序
    9. * @param jsonObject 父节点
    10. * @param numAction 排序的节点,key是执行顺序,value是算子jsonObj
    11. * @param nameJsonObject 所有的节点,key是唯一名称,value是jsonObj
    12. * @param preExecuteName 已经预执行过的节点,key是节点唯一名称
    13. *
    14. *
    15. */
    16. public static HashMap sortAction(JSONObject jsonObject,HashMap numAction,
    17. HashMap nameJsonObject,
    18. HashMap preExecuteName){
    19. JSONArray nextNames = jsonObject.getJSONArray("next");//存节点唯一名称
    20. for (int i = 0; i < nextNames.size(); i++) {
    21. String name = nextNames.getString(i);
    22. JSONObject jsonObjectSon = nameJsonObject.get(name);//获取真正的子节点对象
    23. //子节点判断,是否为有多个父节点,并且都已经预执行过了
    24. JSONArray preNames = jsonObjectSon.getJSONArray("pre");
    25. if(preNames.size()>1){//则说明他是多节点合并来的
    26. //此时则遍历他的父节点,是否全部执行完成,若完成,则递归执行,若未完成,则返回到上一级
    27. boolean flag=true; //表示他的所有父节点已经执行完成
    28. for (int j = 0; j < preNames.size(); j++) {
    29. String preName = preNames.getString(j);
    30. if(!preExecuteName.containsKey(preName)){//父节点存在,没有执行完的
    31. flag=false;
    32. }
    33. }
    34. if(flag){
    35. // System.out.println(flag);
    36. //若没有子节点,则递归结束,返回上一级
    37. JSONArray nextNamesSon = jsonObjectSon.getJSONArray("next");
    38. if(nextNamesSon.size()==0){
    39. numAction.put(numAction.size()+1,jsonObjectSon);
    40. preExecuteName.put(name,"已经预执行此节点");
    41. }else{//有子节点,则处理完递归
    42. numAction.put(numAction.size()+1,jsonObjectSon);
    43. preExecuteName.put(name,"已经预执行此节点");
    44. jsonObject=jsonObjectSon;//父节点改变
    45. sortAction(jsonObject,numAction,nameJsonObject,preExecuteName);
    46. }
    47. }
    48. }else{
    49. //父节点处理结束--
    50. //若没有子节点,则递归结束,返回上一级
    51. JSONArray nextNamesSon = jsonObjectSon.getJSONArray("next");
    52. if(nextNamesSon.size()==0){
    53. numAction.put(numAction.size()+1,jsonObjectSon);
    54. preExecuteName.put(name,"已经预执行此节点");
    55. }else{//有子节点,则处理完递归
    56. numAction.put(numAction.size()+1,jsonObjectSon);
    57. preExecuteName.put(name,"已经预执行此节点");
    58. jsonObject=jsonObjectSon;//父节点改变
    59. sortAction(jsonObject,numAction,nameJsonObject,preExecuteName);
    60. }
    61. }
    62. }
    63. //获取下一个节点的数据
    64. return numAction;
    65. }
    66. }

    根据后续业务升级版(支持判断和debug)

    需求新增了,原来的预排序不能满足,于是想了一天,想到了很多方法,从中挑选了一个最简单,最清晰的方法

    当e判断为【是】,则会执行i,不会执行f,那么f后面依赖的h也不会被执行

    思路如下:

    1.首先预排序

    与前初版不同的是,预排序时候会【记录判断节点】所在的位置

     2.按顺序执行

    升级版新增:每次执行的时候,对整个任务流的算子进行记录(分为,已执行,未执行,不执行)

    初始状态为【未执行】(下图1),当执行到【判断时】【根据判断结果】标识它的子节点为不执行的节点,进行递归(下图2),将判断下面的所有为否的节点进行递归标记,改为不执行状态

    (图1)

     (图2)

    3. 按照顺序进行

    》1首先会执行1,2,3(判断),此时执行到了判断。

    判断根据结果进行递归,选择为不执行的子节点,然后子节点的子节点等全部设定为不执行状态。

    》2 然后继续执行,4,5,6(执行到这里时)6是不执行状态则直接跳过,然后执行7,7也是不执行状态直接跳过

    总结:每个算子除了头节点,都要判断是否为不执行状态,如果是,则不执行。由于是根据预排序改编的,所以不存在父节点没有执行,子节点已执行的情况。

    代码(暂时不写了,可以自己根据代码研究)

    暂时不写了

    上司看了后,觉得不要预排序,觉得每次新增特殊算子(指的是更改任务流流程的算子),都要特殊处理,他们觉得递归不容易理解,然后使用数组代替递归,仔细想了下还是得递归,于是有了下面的方案

    新方案(不依赖于预排序,不依赖递归,更加简单)

    这个思路是领导想出来的,我后续实现时才发现它的妙处。完全替代了递归,用堆栈去解决(堆栈在java里面你可以看作是一个arrayList,每次删除开头的节点,插入从开头的节点插入,这就是一个堆栈)

    流程图如下:

    1.遍历整个任务流json,找到没有父节点的节点,作为根节点

    2.遍历根节点的子节点,将其放入到堆栈{a,判断1}

    3.终止条件为,堆栈中没有数据,即arrayList的大小为0

    判断,对父节点的结果进行判断,若满足则为true走一条任务,若不满足则false走另一条任务

    循环,分为循环开始和循环结束,循环开始你可以看作是一个标志节点,实质上没有什么用,为了避免循环结束各种甩到其他不适合的算子上,导致错误才搞出来的。

    循环结束,里面有两种方式,1个是条件循环,当满足了什么条件才会停止,则为true时,则退出到下一个节点,为false则指向某个循环开始节点,另外一个是次数,当循环了多少次停止,这些都是在循环算子里定义的。如下图,为从整个任务流中抽出来的单个循环算子。

     

    成果(我们采用的是socket连接),当执行完算子会向前端发tcp的消息,然后前端根据信号对界面进行渲染

     

    下面为核心算法,我自己写的,分享给大家

    传入参数json

    {

        "newsId":"asdadxaad1234",

        "userId":"admin",

        "clickTime":"2022-10-31 18:35:29",

        "newsName":"测试合并",

        "jsonArray":[

    {"newsId":"asdadxaad1234","name":"a","next":["c","b"],"pre":[],"type":"开始","status":"未执行"},

    {"newsId":"asdadxaad1234","name":"c","next":["e"],"pre":["a","h"],"type":"循环开始","status":"未执行"},

    {"newsId":"asdadxaad1234","name":"e","next":["i","f"],"pre":["c"],"type":"判断","status":"未执行","content":{"true":["f"],"false":["i"]}},

    {"newsId":"asdadxaad1234","name":"i","next":[],"pre":["e"],"type":"读取数据","status":"未执行"},

    {"newsId":"asdadxaad1234","name":"f","next":["h"],"pre":["e"],"type":"读取数据","status":"未执行"},

    {"newsId":"asdadxaad1234","name":"d","next":["h"],"pre":["b"],"type":"读取数据","status":"未执行"},

    {"newsId":"asdadxaad1234","name":"b","next":["d"],"pre":["a"],"type":"读取数据","status":"未执行"},

    {"newsId":"asdadxaad1234","name":"h","next":["c","z"],"pre":["f","d"],"type":"循环结束","status":"未执行","content":{"condition":"","times":1,"true":["z"],"false":["c"]}}

    ,{"newsId":"asdadxaad1234","name":"z","next":[],"pre":["h"],"type":"读取数据","status":"未执行"}

    ],

        "cron":""

    }

    run方法(java,复制到controller即可使用)

    1. //TODO 新架构
    2. //支持判断,支持debug,非预排序,新版架构
    3. @PostMapping("/run2")
    4. @ResponseBody
    5. public static RsJsonBean run2(@RequestBody JsonRequestParam jsonRequestParam) {
    6. //检测特殊情况,写一个方法,调用,若不满足,//如果循环的condition不为""并且times也不为""则报错
    7. //判断算子的父节点不能有多个节点
    8. // 比如判断前面有多个节点,或者循环开始没有循环父节点指向,则报错 //TODO 还没有写
    9. //判断节点,只能有1个父节点,不能没有父节点,所以取1个,并且父节点必须是存在数据的节点,不能为输出节点
    10. //1.拿到根节点存入待执行数组,2//初始化得到name与obj的map
    11. JSONArray jsonArray = jsonRequestParam.getJsonArray();
    12. HashMap name4json = new HashMap<>();//名字for杰森对象
    13. ArrayList waitExeNode = new ArrayList<>();
    14. for (int i = 0; i < jsonArray.size(); i++) {
    15. JSONObject jsonObject = jsonArray.getJSONObject(i);
    16. String name = jsonObject.getString("name");
    17. if (jsonObject.getJSONArray("pre").size()<1){
    18. waitExeNode.add(name);
    19. }
    20. name4json.put(name,jsonObject);
    21. }
    22. //2.执行节点,取子节点,将要执行的子节点放在待执行数组的最前头
    23. while (waitExeNode.size()!=0){
    24. String name = waitExeNode.get(0);
    25. JSONObject jsonObject = name4json.get(name);
    26. //判断父节点是否满足
    27. JSONArray pres = jsonObject.getJSONArray("pre");
    28. if(pres.size()!=0){//根节点不需要满足
    29. //去掉他的循环结束的父节点(方法),在进行对比
    30. // TODO 等待写,传入pres,返回pres
    31. //方法开始
    32. for (int i = 0; i < pres.size(); i++) {
    33. String preName = pres.getString(i);
    34. JSONObject jsonObj = name4json.get(preName);
    35. String type = jsonObj.getString("type");
    36. if(type.equals("循环结束")){
    37. pres.remove(i);
    38. }
    39. }
    40. //方法结束
    41. if(SortTool.isRun(pres,name4json)==false){//则跳出此次循环
    42. //删除节点;
    43. waitExeNode.remove(0);
    44. continue;
    45. }
    46. }
    47. System.out.println("执行了"+name);
    48. //更新状态
    49. jsonObject.remove("status");
    50. jsonObject.put("status","已执行");
    51. name4json.remove(name);
    52. name4json.put(name,jsonObject);
    53. waitExeNode.remove(0);
    54. if(jsonObject.getString("type").equals("判断")){
    55. //这里存放判断算子的内容,后续替换 //TODO 等产品画完判断算子
    56. String flag="true";//假设结果是false
    57. JSONObject content = jsonObject.getJSONObject("content");
    58. JSONArray sonNames = content.getJSONArray(flag);
    59. for (int i = 0; i < sonNames.size(); i++) {
    60. waitExeNode.add(0,sonNames.getString(i));
    61. }
    62. }else if(jsonObject.getString("type").equals("循环结束")){//循环开始只是普通算子,只起到标识作用
    63. //次数和条件,具体算子判断,flag由判断结果生成//TODO 循环算子代码编写
    64. String flag="false";//假设结果是fasle
    65. JSONObject content = jsonObject.getJSONObject("content");
    66. JSONArray sonNames = content.getJSONArray(flag);
    67. for (int i = 0; i < sonNames.size(); i++) {
    68. waitExeNode.add(0,sonNames.getString(i));
    69. }
    70. }else{//非修改流程算子---普通算子
    71. //装入待执行,完美绕过递归(在纸上,画几个节点和一个存储的数组,你就能理解)
    72. JSONArray sonNames = jsonObject.getJSONArray("next");
    73. for (int i = 0; i < sonNames.size(); i++) {//不满足父节点条件,要在开头删除(已写了)
    74. /*
    75. 将他排在下一位(必须,否则导致重复,导致重复的原因是,他有多个父节点,每个父节点
    76. 指向了他,所以创建了多个,当一个父节点执行完,那么直接执行它,则会进入开头的删除判断,
    77. 存在其他父节点没有执行完,此节点会被删除,等到最后一个父节点执行后,那么所有父节点
    78. 现在已经执行完了,那他就可以正常执行了,这样就是这个算子只执行了一次
    79. 若不放到下一个,则会存在多个节点没有被删除,会造成重复)
    80. */
    81. waitExeNode.add(0,sonNames.getString(i));
    82. }
    83. }
    84. }
    85. return new RsJsonBean(0,"测试.");
    86. }

    以及工具类SortTool.java

    1. //判断某个子节点的父节点是否全部执行完毕
    2. public static boolean isRun(JSONArray pres,HashMap name4json){
    3. for (int i = 0; i < pres.size(); i++) {
    4. String fatherName = pres.getString(i);
    5. JSONObject fatherObj = name4json.get(fatherName);
    6. String status = fatherObj.getString("status");
    7. if(status.equals("未执行")){
    8. return false;//终止循环,将子节点(该执行的本节点,移除待执行节点)
    9. }
    10. }
    11. return true;
    12. }

    依赖,主要就一个fastJson

    剩下为业务代码,对大家学习任务流没有什么太大帮助,这里就不展示了。

    完!

  • 相关阅读:
    数字人直播助力商家降本增效,AI数字人主播成为商家直播新选择
    【JSP】Page指令和九大内置对象
    补充记录sql基础语法(未完待续8/25)
    Springboot毕设项目公司实习生培训系统p79f6(java+VUE+Mybatis+Maven+Mysql)
    Elasticsearch 出现 “429 rejected” 报错,怎么办?
    如何通过企业培训考试系统实现持续学习和发展
    恶意软件之系统病毒
    Android—PMS: installPackagesLI
    展示日志log4.properties
    SPARK:性能调优之RSS
  • 原文地址:https://blog.csdn.net/qq_38403590/article/details/127574134