• 【源码分析】XXL-JOB的执行器的注册流程


    目的:分析xxl-job执行器的注册过程

    流程:

    1. 获取执行器中所有被注解(@xxlJjob)修饰的handler
    2. 执行器注册过程
    3. 执行器中任务执行过程

    版本:xxl-job 2.3.1

    建议:下载xxl-job源码,按流程图debug调试,看堆栈信息并按文章内容理解执行流程

    完整流程图:

    img

    查找Handler任务#

    部分流程图:

    img

    首先启动管理台界面(服务XxlJobAdminApplication),然后启动项目中给的执行器实例(SpringBoot);

    img

    这个方法是扫描项目中使用@xxlJob注解的所有handler方法。接着往下走

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        //获取该项目中所有的bean,然后遍历
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);
    
            Map annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup() {
                            //注意点★
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            //没有跳过本次循环继续
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }
        	//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
            for (Map.Entry methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                // regist
                registJobHandler(xxlJob, bean, executeMethod);
            }
        }
    }
    

    Spring案例执行器中有5个handler:

    img

    XxlJobExecutor.registJobHandler()中部分源码

    String name = xxlJob.value();
    //make and simplify the variables since they'll be called several times later
    Class clazz = bean.getClass();
    String methodName = executeMethod.getName();
    if (name.trim().length() == 0) {
        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
    }
    if (loadJobHandler(name) != null) {
        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
    }
    

    然后进行遍历注册;开始进行名字判断:

    1. 判断bean名字是否为空
    2. 判断bean是否被注册了(存在了)

    loadJobHandler校验方式会去该方法中查找:当bean注册完成后时存放到jobHandlerRepository一个map类型中;

    private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap();
    public static IJobHandler loadJobHandler(String name){
        return jobHandlerRepository.get(name);
    }
    

    executeMethod.setAccessible(true);它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查.

    往后走会判断该注解的生命周期方法(init和destroy)

    1. 未设置生命周期,则直接开始注册
    //注意MethodJobHandler,后面会用到
    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
    //添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性)
    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }
    
    1. 有生命周期,设置init和destroy方法权限
    if (xxlJob.init().trim().length() > 0) {
        try {
            initMethod = clazz.getDeclaredMethod(xxlJob.init());
            initMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }
    if (xxlJob.destroy().trim().length() > 0) {
        try {
            destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
            destroyMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }
    

    首先检查@XxlJob注解中的init属性是否存在且不为空。如果存在,则尝试获取该类中名为init的方法,并将其设置为可访问状态,以便后续调用。

    同理,代码接下来也检查了@XxlJob注解中的destroy属性是否存在且不为空,如果是,则获取该类中名为destroy的方法,并设置其为可访问状态

    在这个过程中,如果某个方法不存在或者无法被访问,则会抛出NoSuchMethodException异常,并且使用throw new RuntimeException将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置init和destroy属性,并确保方法名称与属性值一致。

    执行器的注册过程#

    部分流程图:

    img

    public void afterSingletonsInstantiated() {
    
        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/
    
        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);
    
        // refresh GlueFactory
        GlueFactory.refreshInstance(1);
    
        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    

    在扫描完执行器中所有的任务后,开始进行执行器注册XxlJobSpringExecutor中的super.start() 方法。

    在初始化执行服务器启动之前,进行了四种操作,初始化日志、初始化adminBizList地址(可视化管理台地址)、初始化日志清除、初始化回调线程等。

    这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到。

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    
        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    
        // generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }
    
        // accessToken
        if (accessToken==null || accessToken.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }
    
        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }
    

    继续到initEmbedServer,开始初始化ip地址和端口等,需要明白的是,这一步的参数获取方式其实是第一步读取**XxlJobConfig**获得的;进行ip的校验和拼接等操作,开始进行真正的注册。

    创建一个嵌入式的HTTP服务器,将当前执行器信息(包含应用名称和IP地址端口等)注册到注册中心,注册方式的实现在ExecutorRegistryThread中实现。

    校验名字和注册中心,如果注册中心不可用,则等待一段时间后重新尝试连接。

    // registry
    while (!toStop) {
        try {
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT registryResult = adminBiz.registry(registryParam);
                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                        registryResult = ReturnT.SUCCESS;
                        logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        break;
                    } else {
                        logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    }
                } catch (Exception e) {
                    logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                }
    
            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
    
        }
    
        try {
            //心跳检测,默认30s
            if (!toStop) {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            }
        } catch (InterruptedException e) {
            if (!toStop) {
                logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
            }
        }
    }
    

    开启一个新线程,首先构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号),遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求。

    @Override
    public ReturnT registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }
    

    debug的过程中,XxlJobRemotingUtil 执行到int statusCode = connection.getResponseCode();才会跳转到JobApiController.api中的注册地址.

    // services mapping
    if ("callback".equals(uri)) {
        List callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
        return adminBiz.callback(callbackParamList);
    } else if ("registry".equals(uri)) {
        RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
        return adminBiz.registry(registryParam);
    } else if ("registryRemove".equals(uri)) {
        RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
        return adminBiz.registryRemove(registryParam);
    } else {
        return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
    }
    

    最后进入到JobRegistryHelper.registry()方法中完成数据库的入库和更新操作。

    通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过。

    public ReturnT registry(RegistryParam registryParam) {
    	//.......
        // async execute
        registryOrRemoveThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                //更新注册表信息
                int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
                if (ret < 1) {
                    //保存执行器注册信息
                    XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    
                    // fresh 刷新执行器状态
                    freshGroupRegistryInfo(registryParam);
                }
            }
        });
    
        return ReturnT.SUCCESS;
    }
    

    至此执行器的注册流程分析完成。

    执行器中的任务执行过程#

    img

    部分流程图:

    img

    执行器中的任务流程比较简单,如果执行器启动的话,那么每次执行任务是通过JobThread通过Cron 表达式进行操作的。

    通过handler.execute()进行执行,是在框架内部通过反射机制调用作业处理器对象 handler 中的 execute() 方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了init()方法进行初始化。

    method.invoke() 方法使用反射机制调用指定对象 target 中的方法 method。在这个方法中,target 表示作业处理器对象,method 表示作业处理器中的 execute() 方法。

    通过上述方法,获取到SampleXxlJob.demoJobHandler的任务,然后开始进行任务逻辑操作。

  • 相关阅读:
    2023 彩虹全新 SUP 模板,卡卡云模板修复版
    025-第三代软件开发-实现需求长时间未操作返回登录界面
    QT--Opencv下报错Mat/imwrite/imread找不到文件
    Spring(IOC_DI)依赖注入(配置文件)2022/08/22
    VBA处理文件,发现打开的word文档分布在出现多个实例下,无法全部遍历
    自动驾驶激光雷达与域控制器
    【科研工具的使用】A
    初学者博主如何在WordPress网站上安装Google Analytics
    Linux入门教程||Linux系统目录结构
    ARM 汇编写启动代码之关看门狗
  • 原文地址:https://www.cnblogs.com/xbhog/p/17343603.html