• 工具篇--分布式定时任务springBoot 整合 elasticjob使用(3)



    前言

    本文对springBoot 整合 elasticjob 进行介绍。 本文环境 jdk:1.8 ; zookeeper : 3.7


    一、Springboot 整合:

    1.1 引入jar:

    
     <dependency>
         <groupId>org.apache.shardingsphere.elasticjobgroupId>
         <artifactId>elasticjob-lite-coreartifactId>
        
         <version>3.0.1version>
     dependency>
     <dependency>
         <groupId>org.yamlgroupId>
         <artifactId>snakeyamlartifactId>
         <version>1.27version>
     dependency>
     <dependency>
         <groupId>ch.qos.logbackgroupId>
         <artifactId>logback-classicartifactId>
         <version>1.2.12version>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    1.2 配置zookeeper 注册中心:

    ElasticJobZookeeper:

    import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ElasticJobZookeeper {
    
    
        @Bean(initMethod = "init")
        public  CoordinatorRegistryCenter createRegistryCenter() {
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job");
            zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);
            zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
            zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);
    
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
            return regCenter;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.3 定义job 业务类:

    MyJob:

    import lombok.extern.slf4j.Slf4j;
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    
    @Slf4j
    @Component
    public class MyJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
    
            // 分片参数 0=text,1=image,2=radio,3=vedio
            String  shardingParameter= shardingContext.getShardingParameter();
            String  jobParameter= shardingContext.getJobParameter();
    
            log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                    shardingContext.getShardingItem(), shardingParameter,jobParameter);
            if ("text".equals(jobParameter)) {
                // do something by sharding
            }
            switch (shardingContext.getShardingItem()) {
                case 0:
                    // do something by sharding item 0
                    break;
                case 1:
                    // do something by sharding item 1
                    break;
                case 2:
                    // do something by sharding item 2
                    break;
                // case n: ...
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    MyJob1:

    import lombok.extern.slf4j.Slf4j;
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    
    @Slf4j
    @Component
    public class MyJob1 implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            log.debug("job 执行 error,job名称:{},分片数量:{}",shardingContext.getJobName(),shardingContext.getShardingTotalCount());
            switch (shardingContext.getShardingItem()) {
                case 0:
                    // do something by sharding item 0
                    break;
                case 1:
                    // do something by sharding item 1
                    break;
                case 2:
                    // do something by sharding item 2
                    break;
                // case n: ...
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    1.4 job 注册到zookeeper:

    ElasticJobConfigure

    import com.example.springelasticjob.quickstart.MyJob;
    import com.example.springelasticjob.quickstart.MyJob1;
    import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
    import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
    import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ElasticJobConfigure implements InitializingBean {
        @Autowired
        private MyJob myJob;
    
        @Autowired
        private MyJob1 myJob1;
        @Autowired
        private CoordinatorRegistryCenter coordinatorRegistryCenter;
    
    
        public JobConfiguration createJobConfiguration(Class<? extends SimpleJob> JobClass, int shardingTotalCount, String cron, String shardingItemParameters) {
            // 创建作业配置
            JobConfiguration jobConfiguration = JobConfiguration.newBuilder(JobClass.getName(), shardingTotalCount).cron(cron).overwrite(true)
                    .shardingItemParameters(shardingItemParameters).jobListenerTypes().build();
    
    
            return jobConfiguration;
    
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            JobConfiguration jobConfiguration = createJobConfiguration(myJob.getClass(), 1, "0/10 * * * * ?", null);
            new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob, jobConfiguration).schedule();
            JobConfiguration jobConfiguration1 = createJobConfiguration(myJob1.getClass(), 1, "0/1 * * * * ?", null);
            new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob1, jobConfiguration1).schedule();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    1.5 项目启动:

    1.5.1 zookeeper 注册中心实例:

    在这里插入图片描述

    1.5.2 任务执行日志输出:

    在这里插入图片描述

    二、扩展:

    2.1 任务监听器

    1. 定义监听器:
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang.time.DateFormatUtils;
    import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
    import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
    
    import java.util.Date;
    
    @Slf4j
    public class MyElasticJobListener implements ElasticJobListener {
    
        private long beginTime = 0;
    
        @Override
        public void beforeJobExecuted(ShardingContexts shardingContexts) {
            beginTime = System.currentTimeMillis();
            log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(),  DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
        }
    
        @Override
        public void afterJobExecuted(ShardingContexts shardingContexts) {
            long endTime = System.currentTimeMillis();
            log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime);
        }
    
        @Override
        public String getType() {
            return "myElasticJobListener";
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    2) 在项目resources 新建文件夹: META-INF\services
    在这里插入图片描述
    3)新建文件,名称为:org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
    文集内容:

    # 监听器实现类的 类全路径
    com.example.springelasticjob.config.MyElasticJobListener
    
    • 1
    • 2

    4)job 配置增加监听器:

    // 创建作业配置
            JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?")
                    .overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")
                    .jobListenerTypes("myElasticJobListener")
                    .build();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    jobListenerTypes(“myElasticJobListener”) 中 “myElasticJobListener” 要和 MyElasticJobListener getType() 返回的保持一致,否则启动无法找到 监听器:
    在这里插入图片描述

    2. 2 DataflowJob 流工作:

    2.2.1 新建 DataflowJob:

    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 流任务
     */
    @Slf4j
    public class MyDataFlowJob implements DataflowJob {
        @Override
        public List fetchData(ShardingContext shardingContext) {
            // 抓取数据
            // 分片参数 0=text,1=image,2=radio,3=vedio
            String jobParameter = shardingContext.getJobParameter();
    
            log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), jobParameter);
            List list = new ArrayList(1);
            list.add("lgx");
            return list;
        }
    
        @Override
        public void processData(ShardingContext shardingContext, List list) {
            // 数据处理
            System.out.println("list.toString() = " + list.toString());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    2.2.2 streaming.process 属性配置:

     private static JobConfiguration createJobConfiguration() {
            JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-dataflow-param", 1).cron("0/30 * * * * ?")
                
                    .overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")
                  	//  streaming.process 流处理设置为true
                    .setProperty("streaming.process","true")
                    .build();
    
    
            return jobConfiguration;
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.2.3 执行效果:

    虽然任务是每隔30s 执行一次,但是因为 fetchData 可以一直获取到数据,使的 processData 方法可以一直被调用:
    在这里插入图片描述


    总结

    本文对 springBoot 整合 elasticjob 整合,监听器的使用,任务的流式处理做介绍。

  • 相关阅读:
    智能家电经营小程序商城的作用是什么
    使用curl.exe执行http get指令
    Java基础篇--XML简介
    Worthington丨Worthington核糖核酸酶A和B介绍
    基于Java web的校园滴滴代驾管理系统 毕业设计-附源码260839
    代码随想录算法训练营day52||674. 最长连续递增序列||718. 最长重复子数组||1143.最长公共子序列
    基于广角漏缆的5G室内低成本覆盖分析
    深入理解强化学习——标准强化学习和深度强化学习
    Python(黄金时代)—— 让文字来说话
    JavaWep开始
  • 原文地址:https://blog.csdn.net/l123lgx/article/details/136633232