• SpringBoot:使用Spring Batch实现批处理任务


    引言

    在这里插入图片描述

    在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

    项目初始化

    首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

    添加依赖

    pom.xml中添加以下依赖:

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-batchartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-data-jpaartifactId>
    dependency>
    <dependency>
        <groupId>org.hsqldbgroupId>
        <artifactId>hsqldbartifactId>
        <scope>runtimescope>
    dependency>
    

    配置Spring Batch

    基本配置

    Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

    spring.datasource.url=jdbc:hsqldb:mem:testdb
    spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
    spring.datasource.username=sa
    spring.datasource.password=
    spring.batch.initialize-schema=always
    
    创建批处理任务

    一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

    1. ItemReader:读取数据的接口。
    2. ItemProcessor:处理数据的接口。
    3. ItemWriter:写数据的接口。
    创建示例实体类

    创建一个示例实体类,用于演示批处理操作:

    import javax.persistence.Entity;
    import javax.persistence.GeneratedValue;
    import javax.persistence.GenerationType;
    import javax.persistence.Id;
    
    @Entity
    public class Person {
    
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;
        private String firstName;
        private String lastName;
    
        // getters and setters
    }
    
    创建ItemReader

    我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    
    @Configuration
    public class BatchConfiguration {
    
        @Bean
        public FlatFileItemReader<Person> reader() {
            return new FlatFileItemReaderBuilder<Person>()
                    .name("personItemReader")
                    .resource(new ClassPathResource("sample-data.csv"))
                    .delimited()
                    .names(new String[]{"firstName", "lastName"})
                    .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                        setTargetType(Person.class);
                    }})
                    .build();
        }
    }
    
    创建ItemProcessor

    创建一个简单的ItemProcessor,将读取的数据进行处理:

    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.stereotype.Component;
    
    @Component
    public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    
        @Override
        public Person process(Person person) throws Exception {
            final String firstName = person.getFirstName().toUpperCase();
            final String lastName = person.getLastName().toUpperCase();
    
            final Person transformedPerson = new Person();
            transformedPerson.setFirstName(firstName);
            transformedPerson.setLastName(lastName);
    
            return transformedPerson;
        }
    }
    
    创建ItemWriter

    我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
    
    @Configuration
    public class BatchConfiguration {
    
        @Bean
        public JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {
            return new JdbcBatchItemWriterBuilder<Person>()
                    .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                    .sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)")
                    .dataSource(jdbcTemplate.getJdbcTemplate().getDataSource())
                    .build();
        }
    }
    

    配置Job和Step

    一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class BatchConfiguration {
    
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
            return jobBuilderFactory.get("importUserJob")
                    .listener(listener)
                    .flow(step1)
                    .end()
                    .build();
        }
    
        @Bean
        public Step step1(JdbcBatchItemWriter<Person> writer) {
            return stepBuilderFactory.get("step1")
                    .<Person, Person>chunk(10)
                    .reader(reader())
                    .processor(processor())
                    .writer(writer)
                    .build();
        }
    }
    

    监听Job完成事件

    创建一个监听器,用于监听Job完成事件:

    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class JobCompletionNotificationListener implements JobExecutionListener {
    
        @Override
        public void beforeJob(JobExecution jobExecution) {
            System.out.println("Job Started");
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("Job Ended");
        }
    }
    

    测试与运行

    创建一个简单的CommandLineRunner,用于启动批处理任务:

    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class BatchApplication implements CommandLineRunner {
    
        @Autowired
        private JobLauncher jobLauncher;
    
        @Autowired
        private Job job;
    
        public static void main(String[] args) {
            SpringApplication.run(BatchApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            jobLauncher.run(job, new JobParameters());
        }
    }
    

    在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

    扩展功能

    在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

    • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
    • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
    • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
    • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
    多步骤批处理
    @Bean
    public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
        return jobBuilderFactory.get("multiStepJob")
                .listener(listener)
                .start(step1)
                .next(step2)
                .end()
                .build();
    }
    
    @Bean
    public Step step2(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step2")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }
    
    并行处理

    可以通过配置多个线程来实现并行处理:

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .taskExecutor(taskExecutor())
                .build();
    }
    
    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(10);
        return taskExecutor;
    }
    

    结论

    通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

    ,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

  • 相关阅读:
    Spring突击复习第二天
    说说 Redis 事务
    【毕业设计】基于javaEE+原生Servlet+MySql的物流信息网站设计与实现(毕业论文+程序源码)——物流信息网站
    4.4 C++ Boost 数据集序列化库
    [nlp] RuntimeError: Llama is supposed to be a BPE model!报错解决
    Atcoder abc131
    R语言时间序列数据提取:使用xts包的last函数提取时间序列中最后面一个月的数据(last 1 month)
    nodejs+vue音乐网站与分享平台
    Python中的循环与可迭代对象
    【MySQL知识体系】第1章 初识 MySQL
  • 原文地址:https://blog.csdn.net/Easonmax/article/details/139390031