大数据量的List集合,需要把List集合中的数据批量插入数据库中。
拆分list集合后,然后使用多线程批量插入数据库
- package com.test.entity;
-
- import lombok.Data;
-
- @Data
- public class TestEntity {
-
- private String id;
- private String name;
- }
如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。
- package com.test.mapper;
-
- import java.util.List;
-
- import org.apache.ibatis.annotations.Insert;
- import org.apache.ibatis.annotations.Param;
-
- import com.test.entity.TestEntity;
-
- public interface TestMapper {
-
- /**
- * 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式
- * 数据量很大,推荐这种方式
- */
- @Insert("insert into test(id, name) "
- + " values"
- + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
- void testInsert(TestEntity testEntity);
-
- /**
- * 2.使用foreach标签,批量保存
- * 数据量少可以使用这种方式
- */
- @Insert("insert into test(id, name) "
- + " values"
- + "
" - + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
- + " ")
- void testBatchInsert(@Param("list") List
list) ; - }
- package com.test.config;
-
- import java.util.concurrent.Executor;
- import java.util.concurrent.ThreadPoolExecutor;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
- @Configuration
- @EnableAsync
- public class ExecutorConfig {
- /**
- * 异步任务自定义线程池
- */
- @Bean(name = "asyncServiceExecutor")
- public Executor asyncServiceExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- //配置核心线程数
- executor.setCorePoolSize(50);
- //配置最大线程数
- executor.setMaxPoolSize(500);
- //配置队列大小
- executor.setQueueCapacity(300);
- //配置线程池中的线程的名称前缀
- executor.setThreadNamePrefix("testExecutor-");
- // rejection-policy:当pool已经达到max size的时候,如何处理新任务
- // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- //调用shutdown()方法时等待所有的任务完成后再关闭
- executor.setWaitForTasksToCompleteOnShutdown(true);
- //等待所有任务完成后的最大等待时间
- executor.setAwaitTerminationSeconds(60);
- return executor;
- }
- }
- package com.test.service;
-
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- import org.apache.ibatis.session.ExecutorType;
- import org.apache.ibatis.session.SqlSession;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
-
- import com.test.entity.TestEntity;
- import com.test.mapper.TestMapper;
-
- @Service
- public class AsyncService {
- @Autowired
- private SqlSessionFactory sqlSessionFactory;
-
- @Async("asyncServiceExecutor")
- public void executeAsync(List
logOutputResults, CountDownLatch countDownLatch) { - try{
- //获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务
- SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
- TestMapper mapper = session.getMapper(TestMapper.class);
-
- //异步线程要做的事情
- for (int i = 0; i < logOutputResults.size(); i++) {
- System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i));
-
- TestEntity test = new TestEntity();
- //test.set()
- //.............
- //批量保存
- mapper.testInsert(test);
- //每1000条提交一次防止内存溢出
- if(i%1000==0){
- session.flushStatements();
- }
- }
- //提交剩下未处理的事务
- session.flushStatements();
- }finally {
- countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
- }
- }
- }
5.拆分list调用异步的业务方法
- package com.test.service;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- import javax.annotation.Resource;
-
- import org.springframework.stereotype.Service;
-
-
- @Service
- public class TestService {
-
- @Resource
- private AsyncService asyncService;
-
- public int testMultiThread() {
- List
logOutputResults = getTestData(); - //按线程数拆分后的list
- List
> lists = splitList(logOutputResults);
- CountDownLatch countDownLatch = new CountDownLatch(lists.size());
- for (List
listSub:lists) { - asyncService.executeAsync(listSub, countDownLatch);
- }
- try {
- countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
- // 这样就可以在下面拿到所有线程执行完的集合结果
- } catch (Exception e) {
- e.printStackTrace();
- }
- return logOutputResults.size();
- }
-
- public List
getTestData() { - List
logOutputResults = new ArrayList(); - for (int i = 0; i < 3000; i++) {
- logOutputResults.add("测试数据"+i);
- }
- return logOutputResults;
- }
-
- public List
> splitList(List logOutputResults) {
- List
> results = new ArrayList>();
-
- /*动态线程数方式*/
- // 每500条数据开启一条线程
- int threadSize = 500;
- // 总数据条数
- int dataSize = logOutputResults.size();
- // 线程数,动态生成
- int threadNum = dataSize / threadSize + 1;
-
- /*固定线程数方式
- // 线程数
- int threadNum = 6;
- // 总数据条数
- int dataSize = logOutputResults.size();
- // 每一条线程处理多少条数据
- int threadSize = dataSize / (threadNum - 1);
- */
-
- // 定义标记,过滤threadNum为整数
- boolean special = dataSize % threadSize == 0;
-
- List
cutList = null; -
- // 确定每条线程的数据
- for (int i = 0; i < threadNum; i++) {
- if (i == threadNum - 1) {
- if (special) {
- break;
- }
- cutList = logOutputResults.subList(threadSize * i, dataSize);
- } else {
- cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
- }
-
- results.add(cutList);
- }
-
- return results;
- }
- }
5.Controller测试
- @RestController
- public class TestController {
-
- @Resource
- private TestService testService;
-
-
- @RequestMapping(value = "/log", method = RequestMethod.GET)
- @ApiOperation(value = "测试")
- public String test() {
- testService.testMultiThread();
- return "success";
- }
- }
注意这里执行插入的数据是无序的。