• Spring Boot分段处理List集合多线程批量插入数据


    项目场景:

    大数据量的List集合,需要把List集合中的数据批量插入数据库中。


    解决方案:

    拆分list集合后,然后使用多线程批量插入数据库

    1.实体类

    1. package com.test.entity;
    2. import lombok.Data;
    3. @Data
    4. public class TestEntity {
    5. private String id;
    6. private String name;
    7. }

    2.Mapper

    如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。

    1. package com.test.mapper;
    2. import java.util.List;
    3. import org.apache.ibatis.annotations.Insert;
    4. import org.apache.ibatis.annotations.Param;
    5. import com.test.entity.TestEntity;
    6. public interface TestMapper {
    7. /**
    8. * 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式
    9. * 数据量很大,推荐这种方式
    10. */
    11. @Insert("insert into test(id, name) "
    12. + " values"
    13. + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
    14. void testInsert(TestEntity testEntity);
    15. /**
    16. * 2.使用foreach标签,批量保存
    17. * 数据量少可以使用这种方式
    18. */
    19. @Insert("insert into test(id, name) "
    20. + " values"
    21. + " "
    22. + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
    23. + " ")
    24. void testBatchInsert(@Param("list") List list);
    25. }

    3.spring容器注入线程池bean对象

    1. package com.test.config;
    2. import java.util.concurrent.Executor;
    3. import java.util.concurrent.ThreadPoolExecutor;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. import org.springframework.scheduling.annotation.EnableAsync;
    7. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    8. @Configuration
    9. @EnableAsync
    10. public class ExecutorConfig {
    11. /**
    12. * 异步任务自定义线程池
    13. */
    14. @Bean(name = "asyncServiceExecutor")
    15. public Executor asyncServiceExecutor() {
    16. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    17. //配置核心线程数
    18. executor.setCorePoolSize(50);
    19. //配置最大线程数
    20. executor.setMaxPoolSize(500);
    21. //配置队列大小
    22. executor.setQueueCapacity(300);
    23. //配置线程池中的线程的名称前缀
    24. executor.setThreadNamePrefix("testExecutor-");
    25. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    26. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    27. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    28. //调用shutdown()方法时等待所有的任务完成后再关闭
    29. executor.setWaitForTasksToCompleteOnShutdown(true);
    30. //等待所有任务完成后的最大等待时间
    31. executor.setAwaitTerminationSeconds(60);
    32. return executor;
    33. }
    34. }

    4.创建异步线程业务类

    1. package com.test.service;
    2. import java.util.List;
    3. import java.util.concurrent.CountDownLatch;
    4. import org.apache.ibatis.session.ExecutorType;
    5. import org.apache.ibatis.session.SqlSession;
    6. import org.apache.ibatis.session.SqlSessionFactory;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.scheduling.annotation.Async;
    9. import org.springframework.stereotype.Service;
    10. import com.test.entity.TestEntity;
    11. import com.test.mapper.TestMapper;
    12. @Service
    13. public class AsyncService {
    14. @Autowired
    15. private SqlSessionFactory sqlSessionFactory;
    16. @Async("asyncServiceExecutor")
    17. public void executeAsync(List logOutputResults, CountDownLatch countDownLatch) {
    18. try{
    19. //获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务
    20. SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
    21. TestMapper mapper = session.getMapper(TestMapper.class);
    22. //异步线程要做的事情
    23. for (int i = 0; i < logOutputResults.size(); i++) {
    24. System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i));
    25. TestEntity test = new TestEntity();
    26. //test.set()
    27. //.............
    28. //批量保存
    29. mapper.testInsert(test);
    30. //每1000条提交一次防止内存溢出
    31. if(i%1000==0){
    32. session.flushStatements();
    33. }
    34. }
    35. //提交剩下未处理的事务
    36. session.flushStatements();
    37. }finally {
    38. countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
    39. }
    40. }
    41. }

    5.拆分list调用异步的业务方法

    1. package com.test.service;
    2. import java.util.ArrayList;
    3. import java.util.List;
    4. import java.util.concurrent.CountDownLatch;
    5. import javax.annotation.Resource;
    6. import org.springframework.stereotype.Service;
    7. @Service
    8. public class TestService {
    9. @Resource
    10. private AsyncService asyncService;
    11. public int testMultiThread() {
    12. List logOutputResults = getTestData();
    13. //按线程数拆分后的list
    14. List> lists = splitList(logOutputResults);
    15. CountDownLatch countDownLatch = new CountDownLatch(lists.size());
    16. for (List listSub:lists) {
    17. asyncService.executeAsync(listSub, countDownLatch);
    18. }
    19. try {
    20. countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
    21. // 这样就可以在下面拿到所有线程执行完的集合结果
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. return logOutputResults.size();
    26. }
    27. public List getTestData() {
    28. List logOutputResults = new ArrayList();
    29. for (int i = 0; i < 3000; i++) {
    30. logOutputResults.add("测试数据"+i);
    31. }
    32. return logOutputResults;
    33. }
    34. public List> splitList(List logOutputResults) {
    35. List> results = new ArrayList>();
    36. /*动态线程数方式*/
    37. // 每500条数据开启一条线程
    38. int threadSize = 500;
    39. // 总数据条数
    40. int dataSize = logOutputResults.size();
    41. // 线程数,动态生成
    42. int threadNum = dataSize / threadSize + 1;
    43. /*固定线程数方式
    44. // 线程数
    45. int threadNum = 6;
    46. // 总数据条数
    47. int dataSize = logOutputResults.size();
    48. // 每一条线程处理多少条数据
    49. int threadSize = dataSize / (threadNum - 1);
    50. */
    51. // 定义标记,过滤threadNum为整数
    52. boolean special = dataSize % threadSize == 0;
    53. List cutList = null;
    54. // 确定每条线程的数据
    55. for (int i = 0; i < threadNum; i++) {
    56. if (i == threadNum - 1) {
    57. if (special) {
    58. break;
    59. }
    60. cutList = logOutputResults.subList(threadSize * i, dataSize);
    61. } else {
    62. cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
    63. }
    64. results.add(cutList);
    65. }
    66. return results;
    67. }
    68. }

    5.Controller测试

    1. @RestController
    2. public class TestController {
    3. @Resource
    4. private TestService testService;
    5. @RequestMapping(value = "/log", method = RequestMethod.GET)
    6. @ApiOperation(value = "测试")
    7. public String test() {
    8. testService.testMultiThread();
    9. return "success";
    10. }
    11. }

    总结:

    注意这里执行插入的数据是无序的。

    Java多线程分段处理List集合_java 每个list分配一个线程-CSDN博客 

  • 相关阅读:
    lv6 嵌入式开发-Flappy bird项目(功能实现)
    关于正则的两道笔试面试题
    Anaconda虚拟环境配置Python库与Spyder编译器
    搭建Android自动化python+appium环境
    常用HTML、CSS标签属性
    c语言基础:L1-059 敲笨钟
    Redis淘汰策略-架构案例2020(三十六)
    进阶JAVA篇-深入了解 Stream 流对象的创建与中间方法、终结方法
    DPAFNet:一种用于多模式脑肿瘤分割的残差双路径注意力融合卷积神经网络
    22Excel
  • 原文地址:https://blog.csdn.net/u011974797/article/details/138077156