• 聊聊powerjob的执行机器地址


    本文主要研究一下powerjob的执行机器地址(designatedWorkers)

    SaveJobInfoRequest

    powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java

    @Data
    public class SaveJobInfoRequest {
    
        /**
         * id of the job. set null to create or non-null to update the job.
         */
        private Long id;
    
        //......
    
        /* ************************** PowerJob-worker cluster property ************************** */
        /**
         * Designated PowerJob-worker nodes. Blank value indicates that there is
         * no limit. Non-blank value means to run the corresponding machine(s) only.
         * example: 192.168.1.1:27777,192.168.1.2:27777
         */
        private String designatedWorkers;
        /**
         * Max count of PowerJob-worker nodes.
         */
        private Integer maxWorkerCount = 0;
    
        //......
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    SaveJobInfoRequest定义了designatedWorkers,用于指定woker节点的地址

    JobInfoDO

    powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java

    @Data
    @Entity
    @NoArgsConstructor
    @AllArgsConstructor
    @Table(indexes = {
            @Index(name = "idx01_job_info", columnList = "appId,status,timeExpressionType,nextTriggerTime"),
    })
    public class JobInfoDO {
    
    
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
        @GenericGenerator(name = "native", strategy = "native")
        private Long id;
    
        //......
    
        /**
         * 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割)
         */
        private String designatedWorkers;
        /**
         * 最大机器数量
         */
        private Integer maxWorkerCount;
    
        //......
    
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    JobInfoDO定义了designatedWorkers,用于指定运行的机器地址

    DesignatedWorkerFilter

    powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java

    @Slf4j
    @Component
    public class DesignatedWorkerFilter implements WorkerFilter {
    
        @Override
        public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
    
            String designatedWorkers = jobInfo.getDesignatedWorkers();
    
            // no worker is specified, no filter of any
            if (StringUtils.isEmpty(designatedWorkers)) {
                return false;
            }
    
            Set designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(designatedWorkers));
    
            for (String tagOrAddress : designatedWorkersSet) {
                if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) {
                    return false;
                }
            }
    
            return true;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    DesignatedWorkerFilter会根据jobInfo.getDesignatedWorkers()来进行过滤,在指定机器列表内的返回false,否则返回true

    getSuitableWorkers

    powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

        public List getSuitableWorkers(JobInfoDO jobInfo) {
    
            List workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());
    
            workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
    
            DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
            switch (dispatchStrategy) {
                case RANDOM:
                    Collections.shuffle(workers);
                    break;
                case HEALTH_FIRST:
                    workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                    break;
                default:
                    // do nothing
            }
    
            // 限定集群大小(0代表不限制)
            if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
                workers = workers.subList(0, jobInfo.getMaxWorkerCount());
            }
            return workers;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    WorkerClusterQueryService的getSuitableWorkers会为该job选出合适的worker,它会将filterWorker返回true的机器给删除掉

    小结

    powerjob的执行机器地址(designatedWorkers)用于指定该job运行的worker机器列表,DesignatedWorkerFilter会根据jobInfo.getDesignatedWorkers()来进行过滤,在指定机器列表内的返回false,否则返回true,WorkerClusterQueryService的getSuitableWorkers会为该job选出合适的worker,它会将filterWorker返回true的机器给删除掉。

  • 相关阅读:
    GO-日志分析
    LeetCode 494.目标和 (动态规划 + 性能优化)二维数组 压缩成 一维数组
    Redis学习笔记--002
    serveless 思想 Midway.js 框架使用教程(五)
    图片采集器-网页图片批量采集器免费
    《Java 并发编程实战》—— 安全性、活跃性以及性能问题
    API性能监控 【ApiHelp】-- 组件Enhance 代码实现 ~ ASM字节码增强
    浅读-《深入浅出Nodejs》
    leetcode二叉树相关模板
    MySQL索引
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/136766179